refactor(rcf): start big refactoring

This commit is contained in:
Swann Martinez 2019-04-08 12:53:14 +02:00
parent 3bf8da24db
commit fecc429ef1
No known key found for this signature in database
GPG Key ID: 414CCAFD8DA720E1
2 changed files with 138 additions and 67 deletions

View File

@ -1,4 +1,3 @@
import asyncio
import collections
import logging
from uuid import uuid4
@ -12,12 +11,29 @@ logging.basicConfig(level=logging.DEBUG)
CONNECT_TIMEOUT = 2
WAITING_TIME = 0.001
SERVER_MAX = 1
def zpipe(ctx):
"""build inproc pipe for talking to threads
mimic pipe used in czmq zthread_fork.
Returns a pair of PAIRs connected via inproc
"""
a = ctx.socket(zmq.PAIR)
b = ctx.socket(zmq.PAIR)
a.linger = b.linger = 0
a.hwm = b.hwm = 1
iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
a.bind(iface)
b.connect(iface)
return a,b
class RCFStatus(Enum):
IDLE = 1
CONNECTING = 2
CONNECTED = 3
class State(Enum):
INITIAL = 1
SYNCING = 2
ACTIVE = 3
class RCFFactory(object):
@ -160,6 +176,7 @@ class RCFMessage(object):
))
class RCFClient():
def __init__(
self,
@ -191,7 +208,7 @@ class RCFClient():
self.load_task = asyncio.ensure_future(self.load())
self.tick_task = None
self.property_map = RCFStore(custom_factory=factory)
logger.info("{} client initialized".format(id))
@ -222,60 +239,6 @@ class RCFClient():
time.sleep(0.1)
async def load(self):
self.status = RCFStatus.CONNECTING
logger.info("{} client syncing".format(id))
# Late join mecanism
logger.info("{} send snapshot request".format(id))
self.req_sock.send(b"SNAPSHOT_REQUEST")
while True:
try:
rcfmsg_snapshot = RCFMessage.recv(self.req_sock)
if rcfmsg_snapshot.key == "SNAPSHOT_END":
logger.info("snapshot complete")
break
else:
logger.info("received : {}".format(rcfmsg_snapshot.key))
rcfmsg_snapshot.store(self.property_map)
for f in self.on_recv:
f(rcfmsg_snapshot)
except:
await asyncio.sleep(0.001)
for f in self.on_post_init:
f()
logger.info("{} client running".format(id))
self.push_update(
"net/clients/{}".format(self.id.decode()), "client", None)
self.push_update(
"net/objects/{}".format(self.id.decode()), "clientObject", None)
self.tick_task = asyncio.ensure_future(self.tick())
self.status = RCFStatus.CONNECTED
async def tick(self):
# Main loop
while True:
# TODO: find a better way
socks = dict(self.poller.poll(1))
if self.pull_sock in socks:
rcfmsg = RCFMessage.recv(self.pull_sock)
if rcfmsg.id != self.id:
rcfmsg.store(self.property_map)
for f in self.on_recv:
f(rcfmsg)
else:
await asyncio.sleep(0.0001)
def push_update(self, key, mtype, body):
rcfmsg = RCFMessage(key=key, id=self.id,mtype=mtype, body=body)
@ -292,8 +255,116 @@ class RCFClient():
if self.tick_task:
self.tick_task.cancel()
class RCFServer(object):
address = None # Server address
port = None # Server port
snapshot = None # Snapshot socket
subscriber = None # Incoming updates
class RCFServer():
def __init__(self, ctx, address, port):
self.address = address
self.port = port
self.snapshot = ctx.socket(zmq.DEALER)
self.snapshot.linger = 0
self.snapshot.connect("%s:%i".format(address.decode(),port))
self.subscriber = ctx.socket(zmq.SUB)
self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
self.subscriber.connect("%s:%i".format(address.decode(),port+1))
self.subscriber.linger = 0
class RCFClientAgent(object):
ctx = None
pipe = None
property_map = None
publisher = None
id = None
state = State.INITIAL
server = None
def __init__(self, ctx, pipe, id):
self.ctx = None
self.pipe = None
self.property_map = None
self.publisher = None
self.id = None
self.state = State.INITIAL
self.server = None
self.publisher = self.context.socket(zmq.PUSH) # push update socket
self.publisher.setsockopt(zmq.IDENTITY, self.id)
self.publisher.setsockopt(zmq.SNDHWM, 60)
self.publisher.linger = 0
def control_message (self):
msg = self.pipe.recv_multipart()
command = msg.pop(0)
if command == b"CONNECT":
address = msg.pop(0)
port = int(msg.pop(0))
if len(self.servers) < SERVER_MAX:
self.server = RCFServer(self.ctx, address, port)
self.publisher.connect("tcp://{}:5557".format(address.decode()))
else:
logger.error("E: too many servers (max. %i)", SERVER_MAX)
def rcf_client_agent(ctx,pipe,id):
agent = RCFClientAgent(ctx,pipe,id)
server = None
while True:
poller = zmq.Poller()
poller.register(agent.pipe, zmq.POLLIN)
server_socket = None
if agent.state == State.INITIAL:
server = agent.server
if agent.servers:
logger.info ("I: waiting for server at %s:%d...",
server.address, server.port)
server.snapshot.send(b"SNAPSHOT_REQUEST")
agent.state = State.SYNCING
elif agent.state == State.SYNCING:
sever_socket = server.snapshot
elif agent.state == State.ACTIVE:
server_socket = server.subscriber
if server_socket:
poller.register(server_socket, zmq.POLLIN)
try:
items = dict(poller.poll())
except:
raise
break
if agent.pipe in items:
agent.control_message()
elif server_socket in items:
rcfmsg = RCFMessage.recv(server_socket)
if agent.state == State.SYNCING:
# Store snapshot
if rcfmsg.key == "SNAPSHOT_END":
logger.info("snapshot complete")
agent.state = State.ACTIVE
else:
rcfmsg.store(agent.property_map)
elif agent.state == State.ACTIVE:
if rcfmsg.id != agent.id:
rcfmsg.store(agent.property_map)
action = "update" if kvmsg.body else "delete"
logging.info ("I: received from %s:%d %s",
server.address, server.port, action)
else:
agent.state = State.INITIAL
class RCFServerAgent():
def __init__(self, context=zmq.Context(), id="admin"):
self.context = context
@ -306,7 +377,7 @@ class RCFServer():
self.id = id
self.bind_ports()
# Main client loop registration
self.task = asyncio.ensure_future(self.tick())
tick()
logger.info("{} client initialized".format(id))
@ -333,12 +404,12 @@ class RCFServer():
self.poller.register(self.request_sock, zmq.POLLIN)
self.poller.register(self.collector_sock, zmq.POLLIN)
async def tick(self):
def tick(self):
logger.info("{} server launched".format(id))
while True:
# Non blocking poller
socks = dict(self.poller.poll(1))
socks = dict(self.poller.poll())
# Snapshot system for late join (Server - Client)
if self.request_sock in socks:
@ -370,8 +441,6 @@ class RCFServer():
# Update all clients
msg.store(self.property_map)
msg.send(self.pub_sock)
else:
await asyncio.sleep(WAITING_TIME)
def stop(self):
logger.debug("Stopping server")
@ -379,6 +448,5 @@ class RCFServer():
self.pub_sock.close()
self.request_sock.close()
self.collector_sock.close()
self.task.cancel()
self.status = RCFStatus.IDLE

3
test_server.py Normal file
View File

@ -0,0 +1,3 @@
from net_components import RCFServerAgent
server = RCFServerAgent()