diff --git a/net_components.py b/net_components.py index 11e5b70..70e38b9 100644 --- a/net_components.py +++ b/net_components.py @@ -21,8 +21,8 @@ class RCFMessage(object): """ key = None # key (string) - id = None # User (string) - mtype = None # data mtype (string) + id = None # User (string) + mtype = None # data mtype (string) body = None # data blob def __init__(self, key=None, id=None, mtype=None, body=None): @@ -43,18 +43,18 @@ class RCFMessage(object): key = '' if self.key is None else self.key.encode() mtype = '' if self.mtype is None else self.mtype.encode() body = '' if self.body is None else umsgpack.packb(self.body) - socket.send_multipart([key,self.id,mtype, body]) + socket.send_multipart([key, self.id, mtype, body]) @classmethod def recv(cls, socket): """Reads key-value message from socket, returns new kvmsg instance.""" - key,id,mtype, body = socket.recv_multipart(zmq.DONTWAIT) - key = key.decode() if key else None - id = id if id else None - mtype = mtype.decode() if body else None + key, id, mtype, body = socket.recv_multipart(zmq.DONTWAIT) + key = key.decode() if key else None + id = id if id else None + mtype = mtype.decode() if body else None body = umsgpack.unpackb(body) if body else None - - return cls(key=key,id=id,mtype=mtype, body=body) + + return cls(key=key, id=id, mtype=mtype, body=body) def dump(self): if self.body is None: @@ -85,7 +85,7 @@ class Client(): self.task = asyncio.ensure_future(self.main()) self.property_map = {} - + logger.info("{} client initialized".format(id)) def bind_ports(self): @@ -98,7 +98,7 @@ class Client(): # request socket: send request/message over all peers throught the server self.req_sock = self.context.socket(zmq.DEALER) self.req_sock.setsockopt(zmq.IDENTITY, self.id) - self.req_sock.setsockopt(zmq.SNDHWM , 60) + self.req_sock.setsockopt(zmq.SNDHWM, 60) self.req_sock.linger = 0 self.req_sock.connect("tcp://localhost:5556") @@ -107,7 +107,7 @@ class Client(): self.push_sock.setsockopt(zmq.IDENTITY, self.id) self.push_sock.linger = 0 self.push_sock.connect("tcp://localhost:5557") - self.push_sock.setsockopt(zmq.SNDHWM , 60) + self.push_sock.setsockopt(zmq.SNDHWM, 60) # Sockets aggregator, not really used for now self.poller = zmq.Poller() @@ -119,23 +119,21 @@ class Client(): # Prepare our context and publisher socket while True: # TODO: find a better way - await asyncio.sleep(0.016) - try: - socks = dict(self.poller.poll(1)) - except KeyboardInterrupt: - break + socks = dict(self.poller.poll(1)) if self.pull_sock in socks: rcfmsg = RCFMessage.recv(self.pull_sock) rcfmsg.store(self.property_map) # rcfmsg.dump() - + for f in self.recv_callback: f(rcfmsg) + else: + await asyncio.sleep(0.016) - def push_update(self, key,mtype,body): - rcfmsg = RCFMessage(key,self.id,mtype,body) + def push_update(self, key, mtype, body): + rcfmsg = RCFMessage(key, self.id, mtype, body) rcfmsg.send(self.push_sock) # self.push_sock.send_multipart() @@ -167,7 +165,7 @@ class Server(): def bind_ports(self): # Update all clients self.pub_sock = self.context.socket(zmq.PUB) - self.pub_sock.setsockopt(zmq.SNDHWM , 60) + self.pub_sock.setsockopt(zmq.SNDHWM, 60) self.pub_sock.bind("tcp://*:5555") time.sleep(0.2) @@ -190,25 +188,19 @@ class Server(): logger.info("{} server launched".format(id)) while True: - # TODO: find a better way - await asyncio.sleep(0.0001) - - try: - socks = dict(self.poller.poll(1)) - except KeyboardInterrupt: - break + # TODO: Listener on anoter process linked with PAIR/PAIR ? + socks = dict(self.poller.poll(1)) if self.request_sock in socks: msg = self.request_sock.recv_multipart(zmq.DONTWAIT) - # Update all clients self.pub_sock.send_multipart(msg) - - if self.collector_sock in socks: + elif self.collector_sock in socks: msg = self.collector_sock.recv_multipart(zmq.DONTWAIT) - # Update all clients self.pub_sock.send_multipart(msg) + else: + await asyncio.sleep(0.016) def stop(self): logger.debug("Stopping server") diff --git a/net_operators.py b/net_operators.py index 29c80d3..183e54b 100644 --- a/net_operators.py +++ b/net_operators.py @@ -83,7 +83,6 @@ def observer(): client.push_update(key,value_type,value) except: pass - return 0.16