fix(rcf): Update nomenclature

feat(rcf): Added HWM for each socks
This commit is contained in:
Swann 2019-02-14 18:56:53 +01:00
parent b2e6d27ec7
commit 92c6da4f42
No known key found for this signature in database
GPG Key ID: B02D0B41F8B6D2EE

View File

@ -48,7 +48,7 @@ class RCFMessage(object):
@classmethod @classmethod
def recv(cls, socket): def recv(cls, socket):
"""Reads key-value message from socket, returns new kvmsg instance.""" """Reads key-value message from socket, returns new kvmsg instance."""
key,id,mtype, body = socket.recv_multipart(zmq.NOBLOCK) key,id,mtype, body = socket.recv_multipart(zmq.DONTWAIT)
key = key.decode() if key else None key = key.decode() if key else None
id = id if id else None id = id if id else None
mtype = mtype.decode() if body else None mtype = mtype.decode() if body else None
@ -98,6 +98,7 @@ class Client():
# request socket: send request/message over all peers throught the server # request socket: send request/message over all peers throught the server
self.req_sock = self.context.socket(zmq.DEALER) self.req_sock = self.context.socket(zmq.DEALER)
self.req_sock.setsockopt(zmq.IDENTITY, self.id) self.req_sock.setsockopt(zmq.IDENTITY, self.id)
self.req_sock.setsockopt(zmq.SNDHWM , 60)
self.req_sock.linger = 0 self.req_sock.linger = 0
self.req_sock.connect("tcp://localhost:5556") self.req_sock.connect("tcp://localhost:5556")
@ -106,6 +107,7 @@ class Client():
self.push_sock.setsockopt(zmq.IDENTITY, self.id) self.push_sock.setsockopt(zmq.IDENTITY, self.id)
self.push_sock.linger = 0 self.push_sock.linger = 0
self.push_sock.connect("tcp://localhost:5557") self.push_sock.connect("tcp://localhost:5557")
self.push_sock.setsockopt(zmq.SNDHWM , 60)
# Sockets aggregator, not really used for now # Sockets aggregator, not really used for now
self.poller = zmq.Poller() self.poller = zmq.Poller()
@ -165,15 +167,18 @@ class Server():
def bind_ports(self): def bind_ports(self):
# Update all clients # Update all clients
self.pub_sock = self.context.socket(zmq.PUB) self.pub_sock = self.context.socket(zmq.PUB)
self.pub_sock.setsockopt(zmq.SNDHWM , 60)
self.pub_sock.bind("tcp://*:5555") self.pub_sock.bind("tcp://*:5555")
time.sleep(0.2) time.sleep(0.2)
# Update request # Update request
self.request_sock = self.context.socket(zmq.ROUTER) self.request_sock = self.context.socket(zmq.ROUTER)
self.request_sock.setsockopt(zmq.RCVHWM, 60)
self.request_sock.bind("tcp://*:5556") self.request_sock.bind("tcp://*:5556")
# Update collector # Update collector
self.collector_sock = self.context.socket(zmq.PULL) self.collector_sock = self.context.socket(zmq.PULL)
self.collector_sock.setsockopt(zmq.RCVHWM, 60)
self.collector_sock.bind("tcp://*:5557") self.collector_sock.bind("tcp://*:5557")
# poller for socket aggregation # poller for socket aggregation
@ -194,13 +199,13 @@ class Server():
break break
if self.request_sock in socks: if self.request_sock in socks:
msg = self.request_sock.recv_multipart(zmq.NOBLOCK) msg = self.request_sock.recv_multipart(zmq.DONTWAIT)
# Update all clients # Update all clients
self.pub_sock.send_multipart(msg) self.pub_sock.send_multipart(msg)
if self.collector_sock in socks: if self.collector_sock in socks:
msg = self.collector_sock.recv_multipart(zmq.NOBLOCK) msg = self.collector_sock.recv_multipart(zmq.DONTWAIT)
# Update all clients # Update all clients
self.pub_sock.send_multipart(msg) self.pub_sock.send_multipart(msg)