multi-user/replication_client.py
2019-07-19 11:42:08 +02:00

279 lines
8.3 KiB
Python

import threading
import logging
import zmq
import time
from replication import ReplicatedDatablock, RepCommand
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
STATE_INITIAL = 0
STATE_SYNCING = 1
STATE_ACTIVE = 2
class Client(object):
def __init__(self,factory=None, id='default'):
assert(factory)
self._rep_store = {}
self._net_client = ClientNetService(
store_reference=self._rep_store,
factory=factory,
id=id)
self._factory = factory
def connect(self,address="127.0.0.1",port=5560):
self._net_client.connect(address=address,port=port)
def disconnect(self):
self._net_client.stop()
@property
def state(self):
return self._net_client.state
def register(self, object):
"""
Register a new item for replication
"""
assert(object)
new_item = self._factory.construct_from_dcc(object)(owner="client", data=object)
if new_item:
logger.info("Registering {} on {}".format(object,new_item.uuid))
new_item.store(self._rep_store)
logger.info("Pushing changes...")
new_item.push(self._net_client.publish)
return new_item.uuid
else:
raise TypeError("Type not supported")
def pull(self,object=None):
pass
def unregister(self,object):
pass
class ClientNetService(threading.Thread):
def __init__(self,store_reference=None, factory=None,id="default"):
# Threading
threading.Thread.__init__(self)
self.name = "ClientNetLink"
self.daemon = True
self._exit_event = threading.Event()
self._factory = factory
self._store_reference = store_reference
self._id = id
assert(self._factory)
# Networking
self.context = zmq.Context.instance()
self.state = STATE_INITIAL
def connect(self,address='127.0.0.1', port=5560):
if self.state == STATE_INITIAL:
logger.debug("connecting on {}:{}".format(address,port))
self.snapshot = self.context.socket(zmq.DEALER)
self.snapshot.setsockopt(zmq.IDENTITY, self._id.encode())
self.snapshot.connect("tcp://{}:{}".format(address, port))
self.subscriber = self.context.socket(zmq.SUB)
self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
self.subscriber.connect("tcp://{}:{}".format(address, port+1))
self.subscriber.linger = 0
self.publish = self.context.socket(zmq.PUSH)
self.publish.connect("tcp://{}:{}".format(address, port+2))
self.start()
def run(self):
logger.info("{} online".format(self._id))
poller = zmq.Poller()
poller.register(self.snapshot, zmq.POLLIN)
poller.register(self.subscriber, zmq.POLLIN)
poller.register(self.publish, zmq.POLLOUT)
while not self._exit_event.is_set():
"""NET OUT
Given the net state we do something:
SYNCING : Ask for snapshots
ACTIVE : Do nothing
"""
if self.state == STATE_INITIAL:
logger.debug('{} : request snapshot'.format(self._id))
self.snapshot.send(b"SNAPSHOT_REQUEST")
self.state = STATE_SYNCING
"""NET IN
Given the net state we do something:
SYNCING : Ask for snapshots
ACTIVE : Do nothing
"""
items = dict(poller.poll(10))
if self.snapshot in items:
if self.state == STATE_SYNCING:
datablock = ReplicatedDatablock.pull(self.snapshot, self._factory)
if datablock.buffer == 'SNAPSHOT_END':
self.state = STATE_ACTIVE
logger.debug('{} : snapshot done'.format(self._id))
# We receive updates from the server !
if self.subscriber in items:
if self.state == STATE_ACTIVE:
logger.debug("{} : Receiving changes from server".format(self._id))
datablock = ReplicatedDatablock.pull(self.subscriber, self._factory)
datablock.store(self._store_reference)
if not items:
logger.error("No request ")
self.snapshot.close()
self.subscriber.close()
self.publish.close()
self._exit_event.clear()
def setup(self,id="Client"):
pass
def stop(self):
self._exit_event.set()
#Wait the end of the run
while self._exit_event.is_set():
time.sleep(.1)
self.state = 0
class Server():
def __init__(self,config=None, factory=None):
self._rep_store = {}
self._net = ServerNetService(store_reference=self._rep_store, factory=factory)
def serve(self,port=5560):
self._net.listen(port=port)
def state(self):
return self._net.state
def stop(self):
self._net.stop()
class ServerNetService(threading.Thread):
def __init__(self,store_reference=None, factory=None):
# Threading
threading.Thread.__init__(self)
self.name = "ServerNetLink"
self.daemon = True
self._exit_event = threading.Event()
# Networking
self._rep_store = store_reference
self.context = zmq.Context.instance()
self.snapshot = None
self.publisher = None
self.pull = None
self.state = 0
self.factory = factory
self.clients = []
def listen(self, port=5560):
try:
# Update request
self.snapshot = self.context.socket(zmq.ROUTER)
self.snapshot.setsockopt(zmq.IDENTITY, b'SERVER')
self.snapshot.setsockopt(zmq.RCVHWM, 60)
self.snapshot.bind("tcp://*:{}".format(port))
# Update all clients
self.publisher = self.context.socket(zmq.PUB)
# self.publisher.setsockopt(zmq.IDENTITY,b'SERVER')
self.publisher.setsockopt(zmq.SNDHWM, 60)
self.publisher.bind("tcp://*:{}".format(port+1))
time.sleep(0.2)
# Update collector
self.pull = self.context.socket(zmq.PULL)
self.pull.setsockopt(zmq.RCVHWM, 60)
self.pull.bind("tcp://*:{}".format(port+2))
self.start()
except zmq.error.ZMQError:
logger.error("Address already in use, change net config")
def run(self):
logger.debug("Server is online")
poller = zmq.Poller()
poller.register(self.snapshot, zmq.POLLIN)
poller.register(self.pull, zmq.POLLIN)
self.state = STATE_ACTIVE
while not self._exit_event.is_set():
# Non blocking poller
socks = dict(poller.poll(1))
# Snapshot system for late join (Server - Client)
if self.snapshot in socks:
msg = self.snapshot.recv_multipart(zmq.DONTWAIT)
identity = msg[0]
request = msg[1]
if request == b"SNAPSHOT_REQUEST":
# Sending snapshots
for key, item in self._rep_store:
self.snapshot.send(identity, zmq.SNDMORE)
item.push(self.snapshot)
# Snapshot end
self.snapshot.send(identity, zmq.SNDMORE)
RepCommand(owner='server',data='SNAPSHOT_END').push(self.snapshot)
# Regular update routing (Clients / Client)
if self.pull in socks:
logger.debug("Receiving changes from client")
datablock = ReplicatedDatablock.pull(self.pull, self.factory)
datablock.store(self._rep_store)
# Update all clients
datablock.push(self.publisher)
self.snapshot.close()
self.pull.close()
self.publisher.close()
self._exit_event.clear()
def stop(self):
self._exit_event.set()
#Wait the end of the run
while self._exit_event.is_set():
time.sleep(.1)
self.state = 0