From 49e242d9ea927efb9af0b7c14a1e14e0acfad32b Mon Sep 17 00:00:00 2001 From: Swann Martinez Date: Thu, 2 May 2019 18:19:59 +0200 Subject: [PATCH] feat(client): append clean thread stop --- client.py | 48 ++++++++++++++++++++++++++++++++++-------------- operators.py | 4 ++-- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/client.py b/client.py index 314ccdd..02e4fa4 100644 --- a/client.py +++ b/client.py @@ -65,10 +65,10 @@ class RCFClient(object): self.serial_product = queue.Queue() self.serial_feed = queue.Queue() - + self.stop_event= threading.Event() # Database and connexion agent self.net_agent = threading.Thread( - target=rcf_client_worker, args=(self.ctx, self.store, peer, self.serial_product,self.serial_feed), name="net-agent") + target=rcf_client_worker, args=(self.ctx, self.store, peer, self.serial_product,self.serial_feed, self.stop_event), name="net-agent") self.net_agent.daemon = True self.net_agent.start() @@ -83,6 +83,12 @@ class RCFClient(object): serial_agent.start() self.serial_agents.append(serial_agent) + # Database and connexion agent + self.watchdog_agent = threading.Thread( + target=watchdog_worker, args=(self.serial_feed, .5, self.stop_event), name="watchdog-agent") + self.watchdog_agent.daemon = True + self.watchdog_agent.start() + def connect(self, id, address, port): self.pipe.send_multipart([b"CONNECT", (id.encode() if isinstance( id, str) else id), (address.encode() if isinstance( @@ -104,6 +110,7 @@ class RCFClient(object): self.pipe.send_multipart( [b"DISCONNECT"]) + def set(self, key, value=None, override=False): """Set new value in distributed hash table Sends [SET][key][value] to the agent @@ -142,13 +149,10 @@ class RCFClient(object): if self.net_agent.is_alive(): self.disconnect() - # Disconnect time - time.sleep(0.2) - - global stop - stop = True + self.stop_event.set() + for a in range(0,DUMP_AGENTS_NUMBER): - self.serial_feed.put(('exit',None,None)) + self.serial_feed.put(('STOP',None,None)) # READ-ONLY FUNCTIONS @@ -179,6 +183,7 @@ class RCFClient(object): return dump_list + def state(self): if not self.is_busy(): self.pipe.send_multipart([b"STATE"]) @@ -370,15 +375,14 @@ class RCFClientAgent(object): self.pipe.send(umsgpack.packb(self.state.value)) -def rcf_client_worker(ctx,store, pipe, serial_product, serial_feed): +def rcf_client_worker(ctx,store, pipe, serial_product, serial_feed, stop_event): agent = RCFClientAgent(ctx,store, pipe) server = None net_feed = serial_product net_product = serial_feed - global stop - while True: - if stop: - break + + while not stop_event.is_set(): + # logger.info("asdasd") poller = zmq.Poller() poller.register(agent.pipe, zmq.POLLIN) @@ -474,7 +478,7 @@ def serial_worker(product, feed): while True: command,key,value = feed.get() - if command == 'exit': + if command == 'STOP': break elif command == 'DUMP': value = helpers.dump(key) @@ -484,3 +488,19 @@ def serial_worker(product, feed): elif command == 'LOAD': if value: helpers.load(key, value) + + +def watchdog_worker(feed,interval, stop_event): + import bpy + global stop + + while not stop_event.is_set(): + + for datatype in helpers.SUPPORTED_TYPES: + for item in getattr(bpy.data, helpers.CORRESPONDANCE[datatype]): + if item.id == 'None': + item.id = bpy.context.scene.session_settings.username + key = "{}/{}".format(datatype, item.name) + feed.put(('DUMP',key,None)) + + time.sleep(interval) diff --git a/operators.py b/operators.py index 937509b..335e6f4 100644 --- a/operators.py +++ b/operators.py @@ -150,14 +150,14 @@ def sync(): def register_ticks(): # REGISTER Updaters - bpy.app.timers.register(sync) + # bpy.app.timers.register(sync) bpy.app.timers.register(default_tick) pass def unregister_ticks(): # REGISTER Updaters - bpy.app.timers.unregister(sync) + # bpy.app.timers.unregister(sync) bpy.app.timers.unregister(default_tick) pass