From 048411308ec8381529e7d917208e13cd628325f7 Mon Sep 17 00:00:00 2001 From: Swann Date: Mon, 11 Feb 2019 16:25:08 +0100 Subject: [PATCH] blender code refactoring --- client.py | 2 +- net_components.py | 163 ++++++++++++++++++++++++++++------------------ server.py | 5 +- 3 files changed, 102 insertions(+), 68 deletions(-) diff --git a/client.py b/client.py index 318fbc9..fd3ddae 100644 --- a/client.py +++ b/client.py @@ -40,7 +40,7 @@ def main(): break if updates in socks: - message = updates.recv_multipart() + message = updates.recv_multipart(zmq.NOBLOCK) print(message) # Send update diff --git a/net_components.py b/net_components.py index eb4a77d..6a59c10 100644 --- a/net_components.py +++ b/net_components.py @@ -10,6 +10,7 @@ import random logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) + class Session(): def __init__(self, host='127.0.0.1', port=5555, is_hosting=False): self.host = host @@ -18,10 +19,10 @@ class Session(): # init zmq context self.context = zmq.Context() self.socket = None - + self.msg = [] - - #self.listen.add_done_callback(self.close_success()) + + # self.listen.add_done_callback(self.close_success()) # TODO: Add a kill signal to destroy clients session # TODO: Add a join method @@ -32,7 +33,7 @@ class Session(): self.socket = self.context.socket(zmq.DEALER) self.socket.connect("tcp://localhost:5555") self.listen = asyncio.ensure_future(self.listen()) - + self.send("XXX connected") return True @@ -52,7 +53,7 @@ class Session(): self.listen = asyncio.ensure_future(self.listen()) return True except zmq.ZMQError: - logger.error("Error while creating session: ",zmq.ZMQError) + logger.error("Error while creating session: ", zmq.ZMQError) return False @@ -89,13 +90,14 @@ class Session(): del self.listen self.is_running = False + class Client_poller(): def __init__(self, id): - - self.id = id + + self.id = id self.listen = asyncio.ensure_future(self.listen()) logger.info("client initiated {}".format(self.id)) - + async def listen(self): context = zmq.Context() logger.info("...context initiated {}".format(self.id)) @@ -112,91 +114,124 @@ class Client_poller(): while True: await asyncio.sleep(0.016) sockets = dict(poll.poll(1)) - + if socket in sockets: msg = socket.recv(zmq.NOBLOCK) logger.info("{} received:{}".format(self.id, msg)) - def stop(self): logger.info("Stopping client {}".format(self.id)) self.listen.cancel() + class Client(): - def __init__(self, context=None): - - if context is None: - logger.info("client init default context") - self.context = zmq.Context() - else: - self.context = context + def __init__(self, context=zmq.Context(), id="default"): - self.task = asyncio.ensure_future(self.run()) - logger.info("client initiated") - - async def run(self): + self.context = context + self.pull_sock = None + self.push_sock = None + self.poller = None + + self.id = id + self.bind_ports() + # Main client loop registration + self.task = asyncio.ensure_future(self.main()) + + logger.info("{} client initialized".format(id)) + + def bind_ports(self): + # pull socket: get update FROM server + self.pull_sock = self.context.socket(zmq.SUB) + self.pull_sock.linger = 0 + self.pull_sock.connect("tcp://localhost:5555") + self.pull_sock.setsockopt_string(zmq.SUBSCRIBE, '') + + # push socket: push update TO server + self.push_sock = self.context.socket(zmq.DEALER) + self.push_sock.setsockopt(zmq.IDENTITY, self.id.encode()) + self.push_sock.linger = 0 + self.push_sock.connect("tcp://localhost:5556") + + # Sockets aggregator, not really used for now + self.poller = zmq.Poller() + self.poller.register(self.pull_sock, zmq.POLLIN) + + async def main(self): + logger.info("{} client launched".format(id)) # Prepare our context and publisher socket - logger.info("configuring:") - updates = self.context.socket(zmq.SUB) - logger.info("..socket") - updates.linger = 0 - logger.info("..linger") - updates.setsockopt(zmq.SUBSCRIBE,b"10001") - logger.info("client launched") - updates.connect("tcp://localhost:5556") - - kvmap = {} - sequence = 0 - while True: + # TODO: find a better way await asyncio.sleep(0.016) try: - kvmsg = kvsimple.KVMsg.recv(updates) - except: - break # Interrupted - kvmsg.store(kvmap) - sequence += 1 + socks = dict(self.poller.poll(1)) + except KeyboardInterrupt: + break + if self.pull_sock in socks: + message = self.pull_sock.recv_multipart(zmq.NOBLOCK) + print(message) + + def send_msg(self, msg): + self.push_sock.send_multipart(msg.encode()) def stop(self): logger.info("Stopping client") self.task.cancel() + self.push_sock.close() + self.pull_sock.close() + self.context.term() - - kvmap = {} - sequence = 0 - class Server(): - def __init__(self): - self.context = zmq.Context() - self.task = asyncio.ensure_future(self.run()) - logger.info("server initiated ") - - async def run(self): - publisher = self.context.socket(zmq.PUB) + def __init__(self, context=zmq.Context(), id="admin"): + self.context = context + self.pub_sock = None + self.pull_sock = None + self.poller = None - publisher.bind("tcp://*:5556") + self.id = id + self.bind_ports() + # Main client loop registration + self.task = asyncio.ensure_future(self.main()) + + logger.info("{} client initialized".format(id)) + + def bind_ports(self): + # Update all clients + self.pub_sock = self.context.socket(zmq.PUB) + self.pub_sock.bind("tcp://*:5555") time.sleep(0.2) - logger.info("server launched") - sequence = 0 - random.seed(time.time()) - kvmap = {} + # Update receiver + self.pull_sock = self.context.socket(zmq.ROUTER) + self.pull_sock.bind("tcp://*:5556") + # poller for socket aggregation + self.poller = zmq.Poller() + self.poller.register(self.pull_sock, zmq.POLLIN) + + async def main(self): + logger.info("{} server launched".format(id)) + # Prepare our context and publisher socket while True: - # Non blocking + # TODO: find a better way await asyncio.sleep(0.016) - # Distribute as key-value message - sequence += 1 - kvmsg = kvsimple.KVMsg(sequence) - kvmsg.key = "%d" % random.randint(1,10000) - kvmsg.body = "%d" % random.randint(1,1000000) - kvmsg.send(publisher) - kvmsg.store(kvmap) - + try: + socks = dict(self.poller.poll(1)) + except KeyboardInterrupt: + break + + if self.pull_sock in socks: + msg = self.pull_sock.recv_multipart(zmq.NOBLOCK) + print("{}:{}".format(msg[0].decode('ascii'), msg[1].decode())) + + # Update all clients + self.pub_sock.send_multipart(msg) def stop(self): logger.info("Stopping server") - self.task.cancel() \ No newline at end of file + self.task.cancel() + self.pub_sock.close() + self.pull_sock.close() + self.context.term() diff --git a/server.py b/server.py index f516fde..fa4590e 100644 --- a/server.py +++ b/server.py @@ -31,9 +31,8 @@ def main(): break if state_request in socks: - msg = state_request.recv_multipart() - print(msg[0].decode('ascii')) - print(msg[1].decode()) + msg = state_request.recv_multipart(zmq.NOBLOCK) + print("{}:{}".format(msg[0].decode('ascii'),msg[1].decode())) publisher.send(b'Server update') # publisher.send_string('test')