diff --git a/client.py b/client.py index 58bbefd..cff60ea 100644 --- a/client.py +++ b/client.py @@ -30,13 +30,20 @@ except: logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) + + CONNECT_TIMEOUT = 2 WAITING_TIME = 0.001 SERVER_MAX = 1 - stop = False +class State(Enum): + INITIAL = 1 + SYNCING = 2 + ACTIVE = 3 + + def zpipe(ctx): """build inproc pipe for talking to threads @@ -54,33 +61,28 @@ def zpipe(ctx): return a, b -class State(Enum): - INITIAL = 1 - SYNCING = 2 - ACTIVE = 3 - - class RCFClient(object): ctx = None pipe = None net_agent = None - + store = None def __init__(self): self.ctx = zmq.Context() self.pipe, peer = zpipe(self.ctx) - self.serial_pipe, serial_peer = zpipe(self.ctx) - serial_in, net_in = zpipe(self.ctx) - self.tasks = queue.Queue() + self.store = {} + + self.serial_product = queue.Queue() + self.serial_feed = queue.Queue() # Database and connexion agent self.net_agent = threading.Thread( - target=rcf_client_agent, args=(self.ctx, peer, self.tasks), name="net-agent") + target=rcf_client_agent, args=(self.ctx, self.store, peer, self.serial_product), name="net-agent") self.net_agent.daemon = True self.net_agent.start() # Local data translation agent self.serial_agent = threading.Thread( - target=serialization_agent, args=(self.ctx, serial_peer, self.tasks), name="serial-agent") + target=serialization_agent, args=(self.serial_product, self.serial_feed), name="serial-agent") self.serial_agent.daemon = True self.serial_agent.start() @@ -109,29 +111,32 @@ class RCFClient(object): Sends [SET][key][value] to the agent """ - self.pipe.send_multipart( + if value: + pass + self.pipe.send_multipart( [b"SET", umsgpack.packb(key), (umsgpack.packb(value) if value else umsgpack.packb('None')),umsgpack.packb(override)]) + else: + self.serial_feed.put(key) + # self.serial_pipe.send_multipart( + # [b"DUMP", umsgpack.packb(key)]) + # self.pipe.send_multipart( + # [b"SET", umsgpack.packb(key), (umsgpack.packb(value) if value else umsgpack.packb('None')),umsgpack.packb(override)]) def add(self, key, value=None): """Set new value in distributed hash table Sends [SET][key][value] to the agent """ - self.pipe.send_multipart( - [b"ADD", umsgpack.packb(key), (umsgpack.packb(value) if value else umsgpack.packb('None'))]) + self.serial_feed.put(key) - def get(self, key): - """Lookup value in distributed hash table - Sends [GET][key] to the agent and waits for a value response - If there is no clone available, will eventually return None. - """ + # self.pipe.send_multipart( + # [b"ADD", umsgpack.packb(key), (umsgpack.packb(value) if value else umsgpack.packb('None'))]) - self.pipe.send_multipart([b"GET", umsgpack.packb(key)]) - try: - reply = self.pipe.recv_multipart() - except KeyboardInterrupt: - return + + def is_busy(self): + if self.serial_feed.qsize() == 0 and self.serial_product.qsize() == 0: + return False else: - return umsgpack.unpackb(reply[0]) + return True def exit(self): if self.net_agent.is_alive(): @@ -143,23 +148,64 @@ class RCFClient(object): global stop stop = True + # READ-ONLY FUNCTIONS + def get(self, key): + """Lookup value in distributed hash table + Sends [GET][key] to the agent and waits for a value response + If there is no clone available, will eventually return None. + """ + value = [] + + for k in self.store.keys(): + if key in k: + value.append([k, self.store.get(k).body]) + + return value + # if not self.is_busy(): + # self.pipe.send_multipart([b"GET", umsgpack.packb(key)]) + # try: + # reply = self.pipe.recv_multipart() + # except KeyboardInterrupt: + # return + # else: + # return umsgpack.unpackb(reply[0]) + # else: + # return None + def list(self): - self.pipe.send_multipart([b"LIST"]) - try: - reply = self.pipe.recv_multipart() - except KeyboardInterrupt: - return - else: - return umsgpack.unpackb(reply[0]) + dump_list = [] + for k,v in self.store.items(): + if 'Client' in k: + dump_list.append([k,v.id.decode()]) + else: + try: + dump_list.append([k,v.body['id']]) + except: + pass + + return dump_list + # if not self.is_busy(): + # self.pipe.send_multipart([b"LIST"]) + # try: + # reply = self.pipe.recv_multipart() + # except KeyboardInterrupt: + # return + # else: + # return umsgpack.unpackb(reply[0]) + # else: + # return None def state(self): - self.pipe.send_multipart([b"STATE"]) - try: - reply = self.pipe.recv_multipart() - except KeyboardInterrupt: - return + if not self.is_busy(): + self.pipe.send_multipart([b"STATE"]) + try: + reply = self.pipe.recv_multipart() + except KeyboardInterrupt: + return + else: + return umsgpack.unpackb(reply[0]) else: - return umsgpack.unpackb(reply[0]) + return None class RCFServer(object): @@ -193,10 +239,10 @@ class RCFClientAgent(object): serial = None serialisation_agent = None - def __init__(self, ctx, pipe): + def __init__(self, ctx, store, pipe): self.ctx = ctx self.pipe = pipe - self.property_map = {} + self.property_map = store self.id = b"test" self.state = State.INITIAL self.admin = False @@ -205,8 +251,6 @@ class RCFClientAgent(object): self.publisher.setsockopt(zmq.IDENTITY, self.id) self.publisher.setsockopt(zmq.SNDHWM, 60) self.publisher.linger = 0 - self.serial, peer = zpipe(self.ctx) - self.updates = queue.Queue() # self.serial_agent = threading.Thread( # target=serialization_agent, args=(self.ctx, peer), name="serial-agent") # self.serial_agent.daemon = True @@ -341,10 +385,10 @@ class RCFClientAgent(object): elif command == b"STATE": self.pipe.send(umsgpack.packb(self.state.value)) -def rcf_client_agent(ctx, pipe, queue): - agent = RCFClientAgent(ctx, pipe) +def rcf_client_agent(ctx,store, pipe, queue): + agent = RCFClientAgent(ctx,store, pipe) server = None - update_queue = queue + net_feed = queue global stop while True: if stop: @@ -399,54 +443,37 @@ def rcf_client_agent(ctx, pipe, queue): agent.state = State.ACTIVE else: helpers.load(rcfmsg.key, rcfmsg.body) - rcfmsg.store(agent.property_map) logger.info("snapshot from {} stored".format(rcfmsg.id)) elif agent.state == State.ACTIVE: # IN if rcfmsg.id != agent.id: - # update_queue.put((rcfmsg.key,rcfmsg.body)) - - # try: - # logger.info(rcfmsg.body['id']) - # except: - # pass - + with lock: helpers.load(rcfmsg.key, rcfmsg.body) rcfmsg.store(agent.property_map) - # elif rcfmsg.id == agent.property_map[rcfmsg.key].id: - # with lock: - # helpers.load(rcfmsg.key, rcfmsg.body) - # logger.info("load") - # agent.serial.send_multipart([b"LOAD", umsgpack.packb(rcfmsg.key), umsgpack.packb(rcfmsg.body)]) - # reply = agent.serial.recv_multipart() - - # if reply == b"DONE": - # rcfmsg.store(agent.property_map) - # action = "update" if rcfmsg.body else "delete" - # logging.info("{}: received from {}:{},{} {}".format(rcfmsg.key, - # server.address, rcfmsg.id, server.port, action)) else: logger.debug("{} nothing to do".format(agent.id)) - # LOCAL SYNC - # if not update_queue.empty(): - # key = update_queue.get() + # Serialisation thread input + if not net_feed.empty(): + key, value = net_feed.get() - # value = helpers.dump(key) - # value['id'] = agent.id.decode() - # if value: - # rcfmsg = message.RCFMessage( - # key=key, id=agent.id, body=value) + if value: + # Stamp with id + value['id'] = agent.id.decode() - # rcfmsg.store(agent.property_map) - # rcfmsg.send(agent.publisher) - # else: - # logger.error("Fail to dump ") + # Format massage + rcfmsg = message.RCFMessage( + key=key, id=agent.id, body=value) + rcfmsg.store(agent.property_map) + rcfmsg.send(agent.publisher) + else: + logger.error("Fail to dump ") + logger.info("exit thread") @@ -459,13 +486,15 @@ class SerializationAgent(object): ctx = None pipe = None - def __init__(self, ctx, pipe): + def __init__(self, ctx, pipe, product, feed): self.ctx = ctx self.pipe = pipe + self.product = product + self.feed = feed logger.info("serialisation service launched") def control_message(self): - msg = self.pipe.recv_multipart() + msg = self.pipe.recv_multipart(zmq.NOBLOCK) command = msg.pop(0) if command == b"DUMP": @@ -473,7 +502,8 @@ class SerializationAgent(object): value = helpers.dump(key) - self.pipe.send_multipart(umsgpack.packb(value)) + self.product.put((key,value)) + # self.pipe.send_multipart(umsgpack.packb(value)) elif command == b"LOAD": @@ -485,25 +515,32 @@ class SerializationAgent(object): self.pipe.send_multipart([b"DONE"]) -def serialization_agent(ctx, pipe, tasks): - agent = SerializationAgent(ctx, pipe) +def serialization_agent(product, feed ): + # agent = SerializationAgent(ctx, pipe, feed) global stop + while True: if stop: break - poller = zmq.Poller() - poller.register(agent.pipe, zmq.POLLIN) + + key = feed.get() + value = helpers.dump(key) - try: - items = dict(poller.poll(1)) - except: - raise - break + if value: + product.put((key,value)) + # poller = zmq.Poller() + # poller.register(agent.pipe, zmq.POLLIN) - if agent.pipe in items: - agent.control_message() + # try: + # items = dict(poller.poll(1)) + # except: + # raise + # break + + # if agent.pipe in items: + # agent.control_message() # TODO : Fill tasks diff --git a/draw.py b/draw.py index 3daa00a..62ccc8e 100644 --- a/draw.py +++ b/draw.py @@ -120,72 +120,75 @@ class HUD(object): def draw_selected_object(self): clients = self.client.get("Client") - for client in clients: - name = client[0].split('/')[1] - local_username = bpy.context.scene.session_settings.username + if clients: + for client in clients: + name = client[0].split('/')[1] + local_username = bpy.context.scene.session_settings.username - if name != local_username: - if client[1]['active_objects']: - for select_ob in client[1]['active_objects']: - indices = ( - (0, 1), (1, 2), (2, 3), (0, 3), - (4, 5), (5, 6), (6, 7), (4, 7), - (0, 4), (1, 5), (2, 6), (3, 7) - ) + if name != local_username: + if client[1]['active_objects']: + for select_ob in client[1]['active_objects']: + indices = ( + (0, 1), (1, 2), (2, 3), (0, 3), + (4, 5), (5, 6), (6, 7), (4, 7), + (0, 4), (1, 5), (2, 6), (3, 7) + ) - if select_ob in bpy.data.objects.keys(): - ob = bpy.data.objects[select_ob] - else: - return - bbox_corners = [ob.matrix_world @ mathutils.Vector(corner) for corner in ob.bound_box] + if select_ob in bpy.data.objects.keys(): + ob = bpy.data.objects[select_ob] + else: + return - coords = [(point.x, point.y, point.z) - for point in bbox_corners] + bbox_corners = [ob.matrix_world @ mathutils.Vector(corner) for corner in ob.bound_box] - shader = gpu.shader.from_builtin('3D_UNIFORM_COLOR') + coords = [(point.x, point.y, point.z) + for point in bbox_corners] - color = client[1]['color'] + shader = gpu.shader.from_builtin('3D_UNIFORM_COLOR') - batch = batch_for_shader( - shader, 'LINES', {"pos": coords}, indices=indices) + color = client[1]['color'] - self.d3d_items["{}/{}".format(client[0], - select_ob)] = (shader, batch, color) - else: - key_to_remove = [] - for k in self.d3d_items.keys(): - if "{}/".format(client[0]) in k: - key_to_remove.append(k) - - for k in key_to_remove: - del self.d3d_items[k] + batch = batch_for_shader( + shader, 'LINES', {"pos": coords}, indices=indices) + + self.d3d_items["{}/{}".format(client[0], + select_ob)] = (shader, batch, color) + else: + key_to_remove = [] + for k in self.d3d_items.keys(): + if "{}/".format(client[0]) in k: + key_to_remove.append(k) + + for k in key_to_remove: + del self.d3d_items[k] def draw_clients(self): clients = self.client.get("Client") - for client in clients: - name = client[0].split('/')[1] - local_username = bpy.context.scene.session_settings.username + if clients: + for client in clients: + name = client[0].split('/')[1] + local_username = bpy.context.scene.session_settings.username - if name != local_username: - try: - indices = ( - (1, 3), (2, 1), (3, 0), (2, 0),(4, 5) - ) + if name != local_username: + try: + indices = ( + (1, 3), (2, 1), (3, 0), (2, 0),(4, 5) + ) - shader = gpu.shader.from_builtin('3D_UNIFORM_COLOR') - position = client[1]['location'] - color = client[1]['color'] + shader = gpu.shader.from_builtin('3D_UNIFORM_COLOR') + position = client[1]['location'] + color = client[1]['color'] - batch = batch_for_shader( - shader, 'LINES', {"pos": position}, indices=indices) + batch = batch_for_shader( + shader, 'LINES', {"pos": position}, indices=indices) - self.d3d_items[client[0]] = (shader, batch, color) - self.d2d_items[client[0]] = (position[1], name, color) + self.d3d_items[client[0]] = (shader, batch, color) + self.d2d_items[client[0]] = (position[1], name, color) - except Exception as e: - print("Draw client exception {}".format(e)) + except Exception as e: + print("Draw client exception {}".format(e)) def draw3d_callback(self): bgl.glLineWidth(2) diff --git a/operators.py b/operators.py index d29e6bb..2ce14e9 100644 --- a/operators.py +++ b/operators.py @@ -106,11 +106,11 @@ def update_selected_object(context): if len(selected_objects) > 0: for obj in selected_objects: - if obj not in client_data[0][1]['active_objects']: - client_data[0][1]['active_objects'] = selected_objects + # if obj not in client_data[0][1]['active_objects']: + client_data[0][1]['active_objects'] = selected_objects - client_instance.set(client_key,client_data[0][1]) - break + client_instance.set(client_key,client_data[0][1]) + break elif client_data and client_data[0][1]['active_objects']: client_data[0][1]['active_objects'] = [] @@ -166,7 +166,7 @@ def init_datablocks(): for item in getattr(bpy.data, helpers.CORRESPONDANCE[datatype]): item.id= bpy.context.scene.session_settings.username key = "{}/{}".format(datatype, item.name) - client_instance.add(key) + client_instance.set(key) def default_tick(): @@ -212,16 +212,16 @@ def register_ticks(): bpy.app.timers.register(draw_tick) bpy.app.timers.register(sync) bpy.app.timers.register(default_tick) - + pass def unregister_ticks(): # REGISTER Updaters global drawer drawer.unregister_handlers() bpy.app.timers.unregister(draw_tick) - + bpy.app.timers.unregister(sync) bpy.app.timers.unregister(default_tick) - + pass # OPERATORS class session_join(bpy.types.Operator): @@ -276,9 +276,15 @@ class session_refresh(bpy.types.Operator): def execute(self, context): global client_instance, client_keys,client_state + + keys = client_instance.list() - client_keys = client_instance.list() - client_state = client_instance.state() + if keys: + client_keys= keys + state = client_instance.state() + + if state: + client_state = state return {"FINISHED"} @@ -366,10 +372,10 @@ class session_create(bpy.types.Operator): bpy.ops.session.join() - # if net_settings.init_scene: - # init_datablocks() + if net_settings.init_scene: + init_datablocks() - client_instance.init() + # client_instance.init() net_settings.is_admin = True return {"FINISHED"}