multi-user/replication_client.py

329 lines
10 KiB
Python
Raw Normal View History

2019-07-05 18:07:16 +02:00
import logging
2019-07-23 20:18:51 +02:00
import threading
2019-07-05 18:07:16 +02:00
import time
2019-07-23 20:18:51 +02:00
import zmq
from replication import RepCommand, RepDeleteCommand, ReplicatedDatablock
from replication_graph import ReplicationGraph
2019-07-05 18:07:16 +02:00
2019-07-18 16:38:13 +02:00
logger = logging.getLogger(__name__)
2019-07-05 18:07:16 +02:00
2019-07-08 20:05:12 +02:00
STATE_INITIAL = 0
STATE_SYNCING = 1
STATE_ACTIVE = 2
2019-07-05 18:07:16 +02:00
class Client(object):
2019-07-23 20:18:51 +02:00
def __init__(self, factory=None, id='default'):
2019-07-07 12:22:45 +02:00
assert(factory)
2019-07-18 16:38:13 +02:00
self._rep_store = ReplicationGraph()
2019-07-18 16:38:13 +02:00
self._net_client = ClientNetService(
store_reference=self._rep_store,
2019-07-18 18:15:01 +02:00
factory=factory,
id=id)
2019-07-07 12:22:45 +02:00
self._factory = factory
2019-07-05 18:07:16 +02:00
2019-07-23 20:18:51 +02:00
def connect(self, address="127.0.0.1", port=5560):
2019-07-22 18:18:11 +02:00
"""
Connect to the server
"""
2019-07-23 20:18:51 +02:00
self._net_client.connect(address=address, port=port)
2019-07-05 18:47:40 +02:00
2019-07-07 12:22:45 +02:00
def disconnect(self):
2019-07-22 18:18:11 +02:00
"""
Disconnect from server, reset the client
"""
2019-07-18 16:38:13 +02:00
self._net_client.stop()
@property
2019-07-18 16:38:13 +02:00
def state(self):
2019-07-22 18:18:11 +02:00
"""
Return the client state
0: STATE_INITIAL
1: STATE_SYNCING
2: STATE_ACTIVE
"""
2019-07-18 16:38:13 +02:00
return self._net_client.state
2019-07-06 12:52:15 +02:00
def register(self, object):
2019-07-07 12:22:45 +02:00
"""
Register a new item for replication
TODO: Dig in the replication comportement,
find a better way to handle replication behavior
2019-07-07 12:22:45 +02:00
"""
assert(object)
2019-07-23 20:18:51 +02:00
# Construct the coresponding replication type
2019-07-23 20:18:51 +02:00
new_item = self._factory.construct_from_dcc(
object)(owner="client", pointer=object)
2019-07-05 18:47:40 +02:00
if new_item:
2019-07-23 20:18:51 +02:00
logger.info("Registering {} on {}".format(object, new_item.uuid))
new_item.store(self._rep_store)
2019-07-23 20:18:51 +02:00
2019-07-22 18:18:11 +02:00
logger.info("Pushing new registered value")
2019-07-18 16:38:13 +02:00
new_item.push(self._net_client.publish)
return new_item.uuid
2019-07-23 20:18:51 +02:00
else:
raise TypeError("Type not supported")
2019-07-23 20:18:51 +02:00
def unregister(self, object_uuid, clean=False):
2019-07-22 18:18:11 +02:00
"""
Unregister for replication the given
2019-07-23 11:34:43 +02:00
object.
The clean option purpose is to remove
the pointer data's
2019-07-22 18:18:11 +02:00
"""
2019-07-23 11:34:43 +02:00
if object_uuid in self._rep_store.keys():
2019-07-23 20:18:51 +02:00
delete_command = RepDeleteCommand(
owner='client', buffer=object_uuid)
# remove the key from our store
2019-07-23 11:34:43 +02:00
delete_command.store(self._rep_store)
delete_command.push(self._net_client.publish)
else:
raise KeyError("Cannot unregister key")
2019-07-23 20:18:51 +02:00
def pull(self, object=None):
2019-07-23 18:44:17 +02:00
"""
Asynchonous pull
Here we want to pull all waiting changes and apply them
"""
pass
2019-07-18 16:38:13 +02:00
2019-07-23 20:18:51 +02:00
2019-07-05 18:07:16 +02:00
class ClientNetService(threading.Thread):
2019-07-23 20:18:51 +02:00
def __init__(self, store_reference=None, factory=None, id="default"):
2019-07-18 16:38:13 +02:00
2019-07-05 18:07:16 +02:00
# Threading
threading.Thread.__init__(self)
2019-07-07 12:22:45 +02:00
self.name = "ClientNetLink"
2019-07-05 18:07:16 +02:00
self.daemon = True
2019-07-23 20:18:51 +02:00
2019-07-18 16:38:13 +02:00
self._exit_event = threading.Event()
self._factory = factory
self._store_reference = store_reference
2019-07-18 18:15:01 +02:00
self._id = id
2019-07-18 16:38:13 +02:00
assert(self._factory)
2019-07-05 18:07:16 +02:00
# Networking
self.context = zmq.Context.instance()
self.state = STATE_INITIAL
2019-07-05 18:07:16 +02:00
2019-07-23 20:18:51 +02:00
def connect(self, address='127.0.0.1', port=5560):
2019-07-23 10:38:14 +02:00
"""
Network socket setup
"""
if self.state == STATE_INITIAL:
2019-07-23 20:18:51 +02:00
logger.debug("connecting on {}:{}".format(address, port))
self.command = self.context.socket(zmq.DEALER)
self.command.setsockopt(zmq.IDENTITY, self._id.encode())
self.command.connect("tcp://{}:{}".format(address, port))
2019-07-23 20:18:51 +02:00
self.subscriber = self.context.socket(zmq.SUB)
self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
# self.subscriber.setsockopt(zmq.IDENTITY, self._id.encode())
self.subscriber.connect("tcp://{}:{}".format(address, port+1))
self.subscriber.linger = 0
time.sleep(.5)
2019-07-05 18:07:16 +02:00
self.publish = self.context.socket(zmq.PUSH)
self.publish.connect("tcp://{}:{}".format(address, port+2))
2019-07-05 18:07:16 +02:00
self.start()
2019-07-05 18:07:16 +02:00
def run(self):
2019-07-18 18:15:01 +02:00
logger.info("{} online".format(self._id))
2019-07-05 18:07:16 +02:00
poller = zmq.Poller()
poller.register(self.command, zmq.POLLIN)
2019-07-05 18:07:16 +02:00
poller.register(self.subscriber, zmq.POLLIN)
poller.register(self.publish, zmq.POLLOUT)
2019-07-18 16:38:13 +02:00
while not self._exit_event.is_set():
"""NET OUT
Given the net state we do something:
INITIAL : Ask for snapshots
2019-07-18 16:38:13 +02:00
"""
if self.state == STATE_INITIAL:
2019-07-18 18:15:01 +02:00
logger.debug('{} : request snapshot'.format(self._id))
self.command.send(b"SNAPSHOT_REQUEST")
2019-07-18 16:38:13 +02:00
self.state = STATE_SYNCING
"""NET IN
Given the net state we do something:
SYNCING : load snapshots
ACTIVE : listen for updates
2019-07-18 16:38:13 +02:00
"""
items = dict(poller.poll(1))
2019-07-18 16:38:13 +02:00
if self.command in items:
2019-07-18 16:38:13 +02:00
if self.state == STATE_SYNCING:
2019-07-23 20:18:51 +02:00
datablock = ReplicatedDatablock.pull(
self.command, self._factory)
2019-07-18 16:38:13 +02:00
if 'SNAPSHOT_END' in datablock.buffer:
2019-07-18 18:15:01 +02:00
self.state = STATE_ACTIVE
logger.debug('{} : snapshot done'.format(self._id))
2019-07-23 11:49:38 +02:00
else:
datablock.store(self._store_reference)
2019-07-05 18:47:40 +02:00
2019-07-18 16:38:13 +02:00
# We receive updates from the server !
if self.subscriber in items:
if self.state == STATE_ACTIVE:
2019-07-23 20:18:51 +02:00
logger.debug(
"{} : Receiving changes from server".format(self._id))
datablock = ReplicatedDatablock.pull(
self.subscriber, self._factory)
2019-07-18 16:38:13 +02:00
datablock.store(self._store_reference)
2019-07-05 18:07:16 +02:00
if not items:
2019-07-18 16:38:13 +02:00
logger.error("No request ")
2019-07-05 18:07:16 +02:00
self.command.close()
self.subscriber.close()
self.publish.close()
self._exit_event.clear()
2019-07-23 20:18:51 +02:00
2019-07-05 18:07:16 +02:00
def stop(self):
2019-07-18 16:38:13 +02:00
self._exit_event.set()
2019-07-05 18:07:16 +02:00
2019-07-23 20:18:51 +02:00
# Wait the end of the run
while self._exit_event.is_set():
time.sleep(.1)
2019-07-07 12:22:45 +02:00
2019-07-05 18:47:40 +02:00
self.state = 0
class Server():
2019-07-23 20:18:51 +02:00
def __init__(self, config=None, factory=None):
self._rep_store = {}
2019-07-23 20:18:51 +02:00
self._net = ServerNetService(
store_reference=self._rep_store, factory=factory)
2019-07-05 18:47:40 +02:00
2019-07-23 20:18:51 +02:00
def serve(self, port=5560):
self._net.listen(port=port)
2019-07-05 18:47:40 +02:00
def state(self):
return self._net.state
2019-07-05 18:47:40 +02:00
def stop(self):
self._net.stop()
2019-07-05 18:47:40 +02:00
class ServerNetService(threading.Thread):
2019-07-23 20:18:51 +02:00
def __init__(self, store_reference=None, factory=None):
2019-07-05 18:47:40 +02:00
# Threading
threading.Thread.__init__(self)
2019-07-07 12:22:45 +02:00
self.name = "ServerNetLink"
2019-07-05 18:47:40 +02:00
self.daemon = True
2019-07-18 16:38:13 +02:00
self._exit_event = threading.Event()
# Networking
2019-07-23 20:18:51 +02:00
self._rep_store = store_reference
2019-07-05 18:47:40 +02:00
self.context = zmq.Context.instance()
self.command = None
2019-07-07 12:22:45 +02:00
self.publisher = None
self.pull = None
2019-07-05 18:47:40 +02:00
self.state = 0
self.factory = factory
self.clients = {}
2019-07-05 18:47:40 +02:00
def listen(self, port=5560):
2019-07-07 12:22:45 +02:00
try:
# Update request
self.command = self.context.socket(zmq.ROUTER)
self.command.setsockopt(zmq.IDENTITY, b'SERVER')
self.command.setsockopt(zmq.RCVHWM, 60)
self.command.bind("tcp://*:{}".format(port))
2019-07-07 12:22:45 +02:00
# Update all clients
self.publisher = self.context.socket(zmq.PUB)
# self.publisher.setsockopt(zmq.IDENTITY,b'SERVER_DATA')
2019-07-07 12:22:45 +02:00
self.publisher.setsockopt(zmq.SNDHWM, 60)
self.publisher.bind("tcp://*:{}".format(port+1))
self.publisher.setsockopt(zmq.SNDHWM, 60)
self.publisher.linger = 0
2019-07-07 12:22:45 +02:00
# Update collector
self.pull = self.context.socket(zmq.PULL)
self.pull.setsockopt(zmq.RCVHWM, 60)
self.pull.bind("tcp://*:{}".format(port+2))
2019-07-23 20:18:51 +02:00
self.start()
2019-07-07 12:22:45 +02:00
except zmq.error.ZMQError:
2019-07-18 16:38:13 +02:00
logger.error("Address already in use, change net config")
def add_client(self, identity):
if identity in self.clients.keys():
logger.debug("client already added")
else:
self.clients[identity.decode()] = identity
2019-07-07 12:22:45 +02:00
2019-07-05 18:47:40 +02:00
def run(self):
2019-07-18 16:38:13 +02:00
logger.debug("Server is online")
2019-07-05 18:47:40 +02:00
poller = zmq.Poller()
poller.register(self.command, zmq.POLLIN)
2019-07-05 18:47:40 +02:00
poller.register(self.pull, zmq.POLLIN)
2019-07-08 20:05:12 +02:00
self.state = STATE_ACTIVE
2019-07-05 18:47:40 +02:00
2019-07-18 16:38:13 +02:00
while not self._exit_event.is_set():
2019-07-08 20:05:12 +02:00
# Non blocking poller
socks = dict(poller.poll(1))
2019-07-08 20:05:12 +02:00
# Snapshot system for late join (Server - Client)
if self.command in socks:
msg = self.command.recv_multipart(zmq.DONTWAIT)
2019-07-18 16:38:13 +02:00
identity = msg[0]
request = msg[1]
self.add_client(identity)
2019-07-18 16:38:13 +02:00
if request == b"SNAPSHOT_REQUEST":
2019-07-19 11:42:08 +02:00
# Sending snapshots
2019-07-19 11:46:43 +02:00
for key, item in self._rep_store.items():
self.command.send(identity, zmq.SNDMORE)
item.push(self.command)
2019-07-23 20:18:51 +02:00
2019-07-19 11:42:08 +02:00
# Snapshot end
self.command.send(identity, zmq.SNDMORE)
2019-07-23 20:18:51 +02:00
RepCommand(owner='server', pointer='SNAPSHOT_END').push(
self.command)
2019-07-08 20:05:12 +02:00
# Regular update routing (Clients / Server / Clients)
2019-07-08 20:05:12 +02:00
if self.pull in socks:
logger.debug("SERVER: Receiving changes from client")
2019-07-18 16:38:13 +02:00
datablock = ReplicatedDatablock.pull(self.pull, self.factory)
2019-07-23 20:18:51 +02:00
2019-07-18 16:38:13 +02:00
datablock.store(self._rep_store)
2019-07-23 20:18:51 +02:00
2019-07-18 16:38:13 +02:00
# Update all clients
# for cli_name,cli_id in self.clients.items():
# logger.debug("SERVER: Broadcast changes to {}".format(cli_name))
# self.publisher.send(cli_id, zmq.SNDMORE)
# datablock.push(self.publisher)
2019-07-23 20:18:51 +02:00
2019-07-18 16:38:13 +02:00
datablock.push(self.publisher)
2019-07-23 20:18:51 +02:00
self.command.close()
2019-07-07 12:22:45 +02:00
self.pull.close()
self.publisher.close()
self._exit_event.clear()
def stop(self):
self._exit_event.set()
2019-07-23 20:18:51 +02:00
# Wait the end of the run
while self._exit_event.is_set():
time.sleep(.1)
2019-07-23 20:18:51 +02:00
self.state = 0