From 0047094b2a52117254433c1d2af40b54547b2bc5 Mon Sep 17 00:00:00 2001 From: Swann Date: Tue, 23 Jul 2019 20:24:07 +0200 Subject: [PATCH] refactor: rename snpshot to command for explicity --- replication_client.py | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/replication_client.py b/replication_client.py index 6eeb8b8..9b04e66 100644 --- a/replication_client.py +++ b/replication_client.py @@ -120,9 +120,9 @@ class ClientNetService(threading.Thread): """ if self.state == STATE_INITIAL: logger.debug("connecting on {}:{}".format(address, port)) - self.snapshot = self.context.socket(zmq.DEALER) - self.snapshot.setsockopt(zmq.IDENTITY, self._id.encode()) - self.snapshot.connect("tcp://{}:{}".format(address, port)) + self.command = self.context.socket(zmq.DEALER) + self.command.setsockopt(zmq.IDENTITY, self._id.encode()) + self.command.connect("tcp://{}:{}".format(address, port)) self.subscriber = self.context.socket(zmq.SUB) self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '') @@ -139,7 +139,7 @@ class ClientNetService(threading.Thread): def run(self): logger.info("{} online".format(self._id)) poller = zmq.Poller() - poller.register(self.snapshot, zmq.POLLIN) + poller.register(self.command, zmq.POLLIN) poller.register(self.subscriber, zmq.POLLIN) poller.register(self.publish, zmq.POLLOUT) @@ -150,7 +150,7 @@ class ClientNetService(threading.Thread): """ if self.state == STATE_INITIAL: logger.debug('{} : request snapshot'.format(self._id)) - self.snapshot.send(b"SNAPSHOT_REQUEST") + self.command.send(b"SNAPSHOT_REQUEST") self.state = STATE_SYNCING """NET IN @@ -160,10 +160,10 @@ class ClientNetService(threading.Thread): """ items = dict(poller.poll(1)) - if self.snapshot in items: + if self.command in items: if self.state == STATE_SYNCING: datablock = ReplicatedDatablock.pull( - self.snapshot, self._factory) + self.command, self._factory) if 'SNAPSHOT_END' in datablock.buffer: self.state = STATE_ACTIVE @@ -183,7 +183,7 @@ class ClientNetService(threading.Thread): if not items: logger.error("No request ") - self.snapshot.close() + self.command.close() self.subscriber.close() self.publish.close() @@ -227,7 +227,7 @@ class ServerNetService(threading.Thread): self._rep_store = store_reference self.context = zmq.Context.instance() - self.snapshot = None + self.command = None self.publisher = None self.pull = None self.state = 0 @@ -237,10 +237,10 @@ class ServerNetService(threading.Thread): def listen(self, port=5560): try: # Update request - self.snapshot = self.context.socket(zmq.ROUTER) - self.snapshot.setsockopt(zmq.IDENTITY, b'SERVER') - self.snapshot.setsockopt(zmq.RCVHWM, 60) - self.snapshot.bind("tcp://*:{}".format(port)) + self.command = self.context.socket(zmq.ROUTER) + self.command.setsockopt(zmq.IDENTITY, b'SERVER') + self.command.setsockopt(zmq.RCVHWM, 60) + self.command.bind("tcp://*:{}".format(port)) # Update all clients self.publisher = self.context.socket(zmq.PUB) @@ -268,7 +268,7 @@ class ServerNetService(threading.Thread): def run(self): logger.debug("Server is online") poller = zmq.Poller() - poller.register(self.snapshot, zmq.POLLIN) + poller.register(self.command, zmq.POLLIN) poller.register(self.pull, zmq.POLLIN) self.state = STATE_ACTIVE @@ -278,8 +278,8 @@ class ServerNetService(threading.Thread): socks = dict(poller.poll(1)) # Snapshot system for late join (Server - Client) - if self.snapshot in socks: - msg = self.snapshot.recv_multipart(zmq.DONTWAIT) + if self.command in socks: + msg = self.command.recv_multipart(zmq.DONTWAIT) identity = msg[0] request = msg[1] @@ -289,13 +289,13 @@ class ServerNetService(threading.Thread): if request == b"SNAPSHOT_REQUEST": # Sending snapshots for key, item in self._rep_store.items(): - self.snapshot.send(identity, zmq.SNDMORE) - item.push(self.snapshot) + self.command.send(identity, zmq.SNDMORE) + item.push(self.command) # Snapshot end - self.snapshot.send(identity, zmq.SNDMORE) + self.command.send(identity, zmq.SNDMORE) RepCommand(owner='server', pointer='SNAPSHOT_END').push( - self.snapshot) + self.command) # Regular update routing (Clients / Server / Clients) if self.pull in socks: @@ -312,7 +312,7 @@ class ServerNetService(threading.Thread): datablock.push(self.publisher) - self.snapshot.close() + self.command.close() self.pull.close() self.publisher.close()