diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index 990673f..0000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,27 +0,0 @@ -# This file is a template, and might need editing before it works on your project. -# Official language image. Look for the different tagged releases at: -# https://hub.docker.com/r/library/python/tags/ -image: python:latest - -# Change pip's cache directory to be inside the project directory since we can -# only cache local items. -variables: - PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip" - -# Pip's cache doesn't store the python packages -# https://pip.pypa.io/en/stable/reference/pip_install/#caching -# -# If you want to also cache the installed packages, you have to install -# them in a virtualenv and cache it as well. -cache: - paths: - - .cache/pip - - -before_script: - - python -V # Print out python version for debugging - - pip install zmq umsgpack - -test: - script: - - python -m unittest discover \ No newline at end of file diff --git a/replication.py b/replication.py deleted file mode 100644 index 5576a0a..0000000 --- a/replication.py +++ /dev/null @@ -1,208 +0,0 @@ -import json -import logging -import pickle -from enum import Enum -from uuid import uuid4 - -import zmq - -logger = logging.getLogger(__name__) - - -class RepState(Enum): - ADDED = 0 - COMMITED = 1 - STAGED = 2 - - -class ReplicatedDataFactory(object): - """ - Manage the data types implementations. - - """ - - def __init__(self): - self.supported_types = [] - - # Default registered types - self.register_type(str, RepCommand) - self.register_type(RepDeleteCommand, RepDeleteCommand) - - def register_type(self, dtype, implementation): - """ - Register a new replicated datatype implementation - """ - self.supported_types.append((dtype, implementation)) - - def match_type_by_instance(self, data): - """ - Find corresponding type to the given datablock - """ - for stypes, implementation in self.supported_types: - if isinstance(data, stypes): - return implementation - - print("type not supported for replication") - raise NotImplementedError - - def match_type_by_name(self, type_name): - for stypes, implementation in self.supported_types: - if type_name == implementation.__name__: - return implementation - print("type not supported for replication") - raise NotImplementedError - - def construct_from_dcc(self, data): - implementation = self.match_type_by_instance(data) - return implementation - - def construct_from_net(self, type_name): - """ - Reconstruct a new replicated value from serialized data - """ - return self.match_type_by_name(type_name) - - -class ReplicatedDatablock(object): - """ - Datablock definition that handle object replication logic. - PUSH: send the object over the wire - STORE: register the object on the given replication graph - LOAD: apply loaded changes by reference on the local copy - DUMP: get local changes - - """ - uuid = None # uuid used as key (string) - pointer = None # dcc data ref (DCC type) - buffer = None # raw data (json) - str_type = None # data type name (string) - deps = [None] # dependencies array (string) - owner = None # Data owner (string) - state = None # Data state (RepState) - - def __init__(self, owner=None, pointer=None, uuid=None, buffer=None): - self.uuid = uuid if uuid else str(uuid4()) - assert(owner) - self.owner = owner - - if pointer: - self.pointer = pointer - self.buffer = self.dump() - elif buffer: - self.buffer = buffer - - self.str_type = type(self).__name__ - - def push(self, socket): - """ - Here send data over the wire: - - serialize the data - - send them as a multipart frame thought the given socket - """ - assert(self.buffer) - - data = self.serialize(self.buffer) - assert(isinstance(data, bytes)) - owner = self.owner.encode() - key = self.uuid.encode() - type = self.str_type.encode() - - socket.send_multipart([key, owner, type, data]) - - @classmethod - def pull(cls, socket, factory): - """ - Here we reeceive data from the wire: - - read data from the socket - - reconstruct an instance - """ - uuid, owner, str_type, data = socket.recv_multipart(zmq.NOBLOCK) - - str_type = str_type.decode() - owner = owner.decode() - uuid = uuid.decode() - - instance = factory.construct_from_net(str_type)(owner=owner, uuid=uuid) - instance.buffer = instance.deserialize(data) - - return instance - - def store(self, dict, persistent=False): - """ - I want to store my replicated data. Persistent means into the disk - If uuid is none we delete the key from the volume - """ - if self.uuid is not None: - if self.buffer == 'None': - logger.debug("erasing key {}".format(self.uuid)) - del dict[self.uuid] - else: - dict[self.uuid] = self - - return self.uuid - - def deserialize(self, data): - """ - BUFFER -> JSON - """ - raise NotImplementedError - - def serialize(self, data): - """ - JSON -> BUFFER - """ - raise NotImplementedError - - def dump(self): - """ - DCC -> JSON - """ - assert(self.pointer) - - return json.dumps(self.pointer) - - def load(self, target=None): - """ - JSON -> DCC - """ - raise NotImplementedError - - def resolve(self): - """ - I want to resolve my orphan data to an existing one - = Assing my pointer - - """ - raise NotImplementedError - - def __repr__(self): - return "{uuid} - owner: {owner} - type: {type}".format( - uuid=self.uuid, - owner=self.owner, - type=self.str_type - ) - - -class RepCommand(ReplicatedDatablock): - def serialize(self, data): - return pickle.dumps(data) - - def deserialize(self, data): - return pickle.loads(data) - - def load(self, target): - target = self.pointer - - -class RepDeleteCommand(ReplicatedDatablock): - def serialize(self, data): - return pickle.dumps(data) - - def deserialize(self, data): - return pickle.loads(data) - - def store(self, rep_store): - assert(self.buffer) - - if rep_store and self.buffer in rep_store.keys(): - del rep_store[self.buffer] diff --git a/replication_client.py b/replication_client.py deleted file mode 100644 index fc29929..0000000 --- a/replication_client.py +++ /dev/null @@ -1,341 +0,0 @@ -import logging -import threading -import time - -import zmq - -from replication import RepCommand, RepDeleteCommand, ReplicatedDatablock -from replication_graph import ReplicationGraph - -logger = logging.getLogger(__name__) - -STATE_INITIAL = 0 -STATE_SYNCING = 1 -STATE_ACTIVE = 2 - - -class Client(object): - def __init__(self, factory=None, supervisor=False): - assert(factory) - - self._rep_store = ReplicationGraph() - self._net_client = ClientNetService( - store_reference=self._rep_store, - factory=factory) - self._factory = factory - self._is_supervisor = supervisor - self._id = None - - def connect(self, id="Default", address="127.0.0.1", port=5560): - """ - Connect to the server - """ - self._id = id - self._net_client.connect(id=id, address=address, port=port) - - def disconnect(self): - """ - Disconnect from server, reset the client - """ - self._net_client.stop() - - @property - def state(self): - """ - Return the client state - 0: STATE_INITIAL - 1: STATE_SYNCING - 2: STATE_ACTIVE - """ - return self._net_client.state - - def register(self, object): - """ - Register a new item for replication - TODO: Dig in the replication comportement, - find a better way to handle replication behavior - """ - assert(object) - - # Construct the coresponding replication type - new_item = self._factory.construct_from_dcc( - object)(owner=self._id, pointer=object) - - if new_item: - logger.debug("Registering {} on {}".format(object, new_item.uuid)) - new_item.store(self._rep_store) - - logger.debug("Pushing new registered value") - new_item.push(self._net_client.publish) - return new_item.uuid - - else: - raise TypeError("Type not supported") - - def unregister(self, object_uuid, clean=False): - """ - Unregister for replication the given - object. - The clean option purpose is to remove - the pointer data's - """ - - if object_uuid in self._rep_store.keys(): - delete_command = RepDeleteCommand( - owner='client', buffer=object_uuid) - # remove the key from our store - delete_command.store(self._rep_store) - delete_command.push(self._net_client.publish) - else: - raise KeyError("Cannot unregister key") - - def pull(self, object=None): - """ - Asynchonous pull - Here we want to pull all waiting changes and apply them - """ - pass - - def get(self, object_uuid): - pass - - -class ClientNetService(threading.Thread): - def __init__(self, store_reference=None, factory=None): - - # Threading - threading.Thread.__init__(self) - self.name = "ClientNetLink" - self.daemon = True - - self._exit_event = threading.Event() - self._factory = factory - self._store_reference = store_reference - self._id = "None" - - assert(self._factory) - - # Networking - self.context = zmq.Context.instance() - self.state = STATE_INITIAL - - def connect(self, id=None, address='127.0.0.1', port=5560): - """ - Network socket setup - """ - assert(id) - if self.state == STATE_INITIAL: - self._id = id - logger.debug("connecting on {}:{}".format(address, port)) - self.command = self.context.socket(zmq.DEALER) - self.command.setsockopt(zmq.IDENTITY, self._id.encode()) - self.command.connect("tcp://{}:{}".format(address, port)) - - self.subscriber = self.context.socket(zmq.DEALER) - self.subscriber.setsockopt(zmq.IDENTITY, self._id.encode()) - - # self.subscriber = self.context.socket(zmq.SUB) - # self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '') - - self.subscriber.connect("tcp://{}:{}".format(address, port+1)) - # self.subscriber.linger = 0 - time.sleep(.5) - - self.publish = self.context.socket(zmq.PUSH) - self.publish.connect("tcp://{}:{}".format(address, port+2)) - - self.start() - - def run(self): - logger.debug("{} online".format(self._id)) - poller = zmq.Poller() - poller.register(self.command, zmq.POLLIN) - poller.register(self.subscriber, zmq.POLLIN) - poller.register(self.publish, zmq.POLLOUT) - - while not self._exit_event.is_set(): - """NET OUT - Given the net state we do something: - INITIAL : Ask for snapshots - """ - if self.state == STATE_INITIAL: - logger.debug('{} : request snapshot'.format(self._id)) - self.command.send(b"SNAPSHOT_REQUEST") - self.state = STATE_SYNCING - - """NET IN - Given the net state we do something: - SYNCING : load snapshots - ACTIVE : listen for updates - """ - items = dict(poller.poll(1)) - - # COMMANDS - if self.command in items: - datablock = ReplicatedDatablock.pull( - self.command, self._factory) - - if self.state == STATE_SYNCING: - if 'SNAPSHOT_END' in datablock.buffer: - self.state = STATE_ACTIVE - logger.debug('{} : snapshot done'.format(self._id)) - else: - datablock.store(self._store_reference) - - - - # DATA - if self.subscriber in items: - if self.state == STATE_ACTIVE: - logger.debug( - "{} : Receiving changes from server".format(self._id)) - datablock = ReplicatedDatablock.pull( - self.subscriber, self._factory) - datablock.store(self._store_reference) - - if not items: - logger.error("No request ") - - self.command.close() - self.subscriber.close() - self.publish.close() - - self._exit_event.clear() - - def stop(self): - self._exit_event.set() - - # Wait the end of the run - while self._exit_event.is_set(): - time.sleep(.1) - - self.state = 0 - - -class Server(): - def __init__(self, config=None, factory=None): - self._rep_store = {} - self._net = ServerNetService( - store_reference=self._rep_store, factory=factory) - - def serve(self, port=5560): - self._net.listen(port=port) - - def state(self): - return self._net.state - - def stop(self): - self._net.stop() - - -class ServerNetService(threading.Thread): - def __init__(self, store_reference=None, factory=None): - # Threading - threading.Thread.__init__(self) - self.name = "ServerNetLink" - self.daemon = True - self._exit_event = threading.Event() - - # Networking - self._rep_store = store_reference - - self.context = zmq.Context.instance() - self.command = None - self.publisher = None - self.pull = None - self.state = 0 - self.factory = factory - self.clients = {} - - def listen(self, port=5560): - try: - # Update request - self.command = self.context.socket(zmq.ROUTER) - self.command.setsockopt(zmq.IDENTITY, b'SERVER') - self.command.setsockopt(zmq.RCVHWM, 60) - self.command.bind("tcp://*:{}".format(port)) - - # Update all clients - self.publisher = self.context.socket(zmq.ROUTER) - self.publisher.setsockopt(zmq.IDENTITY,b'SERVER_DATA') - self.publisher.bind("tcp://*:{}".format(port+1)) - self.publisher.setsockopt(zmq.SNDHWM, 60) - self.publisher.linger = 0 - - # Update collector - self.pull = self.context.socket(zmq.PULL) - self.pull.setsockopt(zmq.RCVHWM, 60) - self.pull.bind("tcp://*:{}".format(port+2)) - - self.start() - except zmq.error.ZMQError: - logger.error("Address already in use, change net config") - - def add_client(self, identity): - if identity in self.clients.keys(): - logger.debug("client already added") - else: - self.clients[identity.decode()] = identity - - def run(self): - logger.debug("Server is online") - poller = zmq.Poller() - poller.register(self.command, zmq.POLLIN) - poller.register(self.pull, zmq.POLLIN) - - self.state = STATE_ACTIVE - - while not self._exit_event.is_set(): - # Non blocking poller - socks = dict(poller.poll(1)) - - # Snapshot system for late join (Server - Client) - if self.command in socks: - msg = self.command.recv_multipart(zmq.DONTWAIT) - - identity = msg[0] - request = msg[1] - - self.add_client(identity) - - if request == b"SNAPSHOT_REQUEST": - # Sending snapshots - for key, item in self._rep_store.items(): - self.command.send(identity, zmq.SNDMORE) - item.push(self.command) - - # Snapshot end - self.command.send(identity, zmq.SNDMORE) - RepCommand(owner='server', pointer='SNAPSHOT_END').push( - self.command) - - # Regular update routing (Clients / Server / Clients) - if self.pull in socks: - - datablock = ReplicatedDatablock.pull(self.pull, self.factory) - logger.debug("SERVER: Receiving changes from {}".format(datablock.owner)) - datablock.store(self._rep_store) - - # Update all clients - for cli_name,cli_id in self.clients.items(): - if cli_name != datablock.owner: - logger.debug("SERVER: Broadcast changes to {}".format(cli_name)) - self.publisher.send(cli_id, zmq.SNDMORE) - datablock.push(self.publisher) - - # datablock.push(self.publisher) - - self.command.close() - self.pull.close() - self.publisher.close() - - self._exit_event.clear() - - def stop(self): - self._exit_event.set() - - # Wait the end of the run - while self._exit_event.is_set(): - time.sleep(.1) - - self.state = 0 diff --git a/replication_graph.py b/replication_graph.py deleted file mode 100644 index 014b12c..0000000 --- a/replication_graph.py +++ /dev/null @@ -1,36 +0,0 @@ -import collections -from replication import ReplicatedDatablock - -class ReplicationGraph(collections.MutableMapping): - """ - Structure to hold replicated data relation graph - """ - - def __init__(self, *args, **kwargs): - self.store = dict() - self.update(dict(*args, **kwargs)) # use the free update to set keys - - def __getitem__(self, key): - return self.store[key] - - def __setitem__(self, key, value): - self.store[key] = value - - def __delitem__(self, key): - del self.store[key] - - def __iter__(self): - return iter(self.store) - - def __len__(self): - return len(self.store) - - def __repr__(self): - str = "\n" - for key,item in self.store.items(): - str+=repr(item) - return str - - - - \ No newline at end of file diff --git a/test_replication.py b/test_replication.py deleted file mode 100644 index c257931..0000000 --- a/test_replication.py +++ /dev/null @@ -1,263 +0,0 @@ -import cProfile -import logging -import re -import time -import unittest -import umsgpack - -logging.basicConfig() -logging.getLogger().setLevel(logging.INFO) -logger = logging.getLogger(__name__) - - -from replication import ReplicatedDatablock, ReplicatedDataFactory -from replication_client import Client, Server - - -class SampleData(): - def __init__(self, map={"sample": bytearray(50000)}): - self.map = map - - -class RepSampleData(ReplicatedDatablock): - def serialize(self, data): - import pickle - - return pickle.dumps(data) - - def deserialize(self, data): - import pickle - - return pickle.loads(data) - - def dump(self): - import json - output = {} - output['map'] = umsgpack.packb(self.pointer.map) - return output - - def load(self, target=None): - import json - if target is None: - target = SampleData() - - target.map = umsgpack.unpackb(self.buffer['map']) - - -class TestDataFactory(unittest.TestCase): - def test_data_factory(self): - factory = ReplicatedDataFactory() - factory.register_type(SampleData, RepSampleData) - data_sample = SampleData() - rep_sample = factory.construct_from_dcc( - data_sample)(owner="toto", pointer=data_sample) - - self.assertEqual(isinstance(rep_sample, RepSampleData), True) - - -class TestClient(unittest.TestCase): - def __init__(self, methodName='runTest'): - unittest.TestCase.__init__(self, methodName) - - def test_empty_snapshot(self): - # Setup - factory = ReplicatedDataFactory() - factory.register_type(SampleData, RepSampleData) - - server = Server(factory=factory) - client = Client(factory=factory) - - server.serve(port=5570) - client.connect(port=5570, id="client_test_callback") - - test_state = client.state - - server.stop() - client.disconnect() - - self.assertNotEqual(test_state, 2) - - def test_filled_snapshot(self): - # Setup - factory = ReplicatedDataFactory() - factory.register_type(SampleData, RepSampleData) - - server = Server(factory=factory) - client = Client(factory=factory) - client2 = Client(factory=factory) - - server.serve(port=5575) - client.connect(port=5575,id="cli_test_filled_snapshot") - - # Test the key registering - data_sample_key = client.register(SampleData()) - - client2.connect(port=5575, id="client_2") - time.sleep(0.2) - rep_test_key = client2._rep_store[data_sample_key].uuid - - server.stop() - client.disconnect() - client2.disconnect() - - self.assertEqual(data_sample_key, rep_test_key) - - def test_register_client_data(self): - # Setup environment - - factory = ReplicatedDataFactory() - factory.register_type(SampleData, RepSampleData) - - server = Server(factory=factory) - server.serve(port=5580) - - client = Client(factory=factory) - client.connect(port=5580,id="cli_test_register_client_data") - - client2 = Client(factory=factory) - client2.connect(port=5580, id="cli2_test_register_client_data") - - # Test the key registering - data_sample_key = client.register(SampleData()) - - time.sleep(0.3) - # Waiting for server to receive the datas - rep_test_key = client2._rep_store[data_sample_key].uuid - - client.disconnect() - client2.disconnect() - server.stop() - - self.assertEqual(rep_test_key, data_sample_key) - - def test_client_data_intergity(self): - # Setup environment - factory = ReplicatedDataFactory() - factory.register_type(SampleData, RepSampleData) - - server = Server(factory=factory) - server.serve(port=5585) - - client = Client(factory=factory) - client.connect(port=5585, id="cli_test_client_data_intergity") - - client2 = Client(factory=factory) - client2.connect(port=5585, id="cli2_test_client_data_intergity") - - test_map = {"toto": "test"} - # Test the key registering - data_sample_key = client.register(SampleData(map=test_map)) - - test_map_result = SampleData() - # Waiting for server to receive the datas - time.sleep(1) - - client2._rep_store[data_sample_key].load(target=test_map_result) - - client.disconnect() - client2.disconnect() - server.stop() - - self.assertEqual(test_map_result.map["toto"], test_map["toto"]) - - def test_client_unregister_key(self): - # Setup environment - factory = ReplicatedDataFactory() - factory.register_type(SampleData, RepSampleData) - - server = Server(factory=factory) - server.serve(port=5590) - - client = Client(factory=factory) - client.connect(port=5590, id="cli_test_client_data_intergity") - - client2 = Client(factory=factory) - client2.connect(port=5590, id="cli2_test_client_data_intergity") - - test_map = {"toto": "test"} - # Test the key registering - data_sample_key = client.register(SampleData(map=test_map)) - - test_map_result = SampleData() - - # Waiting for server to receive the datas - time.sleep(.1) - - client2._rep_store[data_sample_key].load(target=test_map_result) - - client.unregister(data_sample_key) - time.sleep(.1) - - logger.debug("client store:") - logger.debug(client._rep_store) - logger.debug("client2 store:") - logger.debug(client2._rep_store) - logger.debug("server store:") - logger.debug(server._rep_store) - - client.disconnect() - client2.disconnect() - server.stop() - - self.assertFalse(data_sample_key in client._rep_store) - - def test_client_disconnect(self): - pass - - def test_client_change_rights(self): - pass - - -class TestStressClient(unittest.TestCase): - def test_stress_register(self): - total_time = 0 - # Setup - factory = ReplicatedDataFactory() - factory.register_type(SampleData, RepSampleData) - - server = Server(factory=factory) - client = Client(factory=factory) - client2 = Client(factory=factory) - - server.serve(port=5595) - client.connect(port=5595,id="cli_test_filled_snapshot") - client2.connect(port=5595,id="client_2") - - # Test the key registering - for i in range(10000): - client.register(SampleData()) - - while len(client2._rep_store.keys()) < 10000: - time.sleep(0.00001) - total_time += 0.00001 - - # test_num_items = len(client2._rep_store.keys()) - server.stop() - client.disconnect() - client2.disconnect() - logger.info("{} s for 10000 values".format(total_time)) - - self.assertLess(total_time, 1) - - -def suite(): - suite = unittest.TestSuite() - - # Data factory - suite.addTest(TestDataFactory('test_data_factory')) - - # Client - suite.addTest(TestClient('test_empty_snapshot')) - suite.addTest(TestClient('test_filled_snapshot')) - suite.addTest(TestClient('test_register_client_data')) - suite.addTest(TestClient('test_client_data_intergity')) - - # Stress test - suite.addTest(TestStressClient('test_stress_register')) - - return suite - - -if __name__ == '__main__': - runner = unittest.TextTestRunner(verbosity=2) - runner.run(suite())