feat: use DEALER/ROUTER for data routing

This commit is contained in:
Swann 2019-07-23 21:41:47 +02:00
parent 1d0009fd35
commit 7fc7254336

View File

@ -24,11 +24,13 @@ class Client(object):
factory=factory)
self._factory = factory
self._is_supervisor = supervisor
self._id = None
def connect(self, id="Default", address="127.0.0.1", port=5560):
"""
Connect to the server
"""
self._id = id
self._net_client.connect(id=id, address=address, port=port)
def disconnect(self):
@ -57,13 +59,13 @@ class Client(object):
# Construct the coresponding replication type
new_item = self._factory.construct_from_dcc(
object)(owner="client", pointer=object)
object)(owner=self._id, pointer=object)
if new_item:
logger.info("Registering {} on {}".format(object, new_item.uuid))
logger.debug("Registering {} on {}".format(object, new_item.uuid))
new_item.store(self._rep_store)
logger.info("Pushing new registered value")
logger.debug("Pushing new registered value")
new_item.push(self._net_client.publish)
return new_item.uuid
@ -97,6 +99,7 @@ class Client(object):
def get(self, object_uuid):
pass
class ClientNetService(threading.Thread):
def __init__(self, store_reference=None, factory=None):
@ -128,11 +131,14 @@ class ClientNetService(threading.Thread):
self.command.setsockopt(zmq.IDENTITY, self._id.encode())
self.command.connect("tcp://{}:{}".format(address, port))
self.subscriber = self.context.socket(zmq.SUB)
self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
# self.subscriber.setsockopt(zmq.IDENTITY, self._id.encode())
self.subscriber = self.context.socket(zmq.DEALER)
self.subscriber.setsockopt(zmq.IDENTITY, self._id.encode())
# self.subscriber = self.context.socket(zmq.SUB)
# self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
self.subscriber.connect("tcp://{}:{}".format(address, port+1))
self.subscriber.linger = 0
# self.subscriber.linger = 0
time.sleep(.5)
self.publish = self.context.socket(zmq.PUSH)
@ -141,7 +147,7 @@ class ClientNetService(threading.Thread):
self.start()
def run(self):
logger.info("{} online".format(self._id))
logger.debug("{} online".format(self._id))
poller = zmq.Poller()
poller.register(self.command, zmq.POLLIN)
poller.register(self.subscriber, zmq.POLLIN)
@ -250,9 +256,8 @@ class ServerNetService(threading.Thread):
self.command.bind("tcp://*:{}".format(port))
# Update all clients
self.publisher = self.context.socket(zmq.PUB)
# self.publisher.setsockopt(zmq.IDENTITY,b'SERVER_DATA')
self.publisher.setsockopt(zmq.SNDHWM, 60)
self.publisher = self.context.socket(zmq.ROUTER)
self.publisher.setsockopt(zmq.IDENTITY,b'SERVER_DATA')
self.publisher.bind("tcp://*:{}".format(port+1))
self.publisher.setsockopt(zmq.SNDHWM, 60)
self.publisher.linger = 0
@ -306,18 +311,19 @@ class ServerNetService(threading.Thread):
# Regular update routing (Clients / Server / Clients)
if self.pull in socks:
logger.debug("SERVER: Receiving changes from client")
datablock = ReplicatedDatablock.pull(self.pull, self.factory)
logger.debug("SERVER: Receiving changes from {}".format(datablock.owner))
datablock.store(self._rep_store)
# Update all clients
# for cli_name,cli_id in self.clients.items():
# logger.debug("SERVER: Broadcast changes to {}".format(cli_name))
# self.publisher.send(cli_id, zmq.SNDMORE)
# datablock.push(self.publisher)
for cli_name,cli_id in self.clients.items():
if cli_name != datablock.owner:
logger.debug("SERVER: Broadcast changes to {}".format(cli_name))
self.publisher.send(cli_id, zmq.SNDMORE)
datablock.push(self.publisher)
datablock.push(self.publisher)
# datablock.push(self.publisher)
self.command.close()
self.pull.close()