feat(client): append clean thread stop
This commit is contained in:
parent
c1119976cd
commit
49e242d9ea
46
client.py
46
client.py
@ -65,10 +65,10 @@ class RCFClient(object):
|
|||||||
|
|
||||||
self.serial_product = queue.Queue()
|
self.serial_product = queue.Queue()
|
||||||
self.serial_feed = queue.Queue()
|
self.serial_feed = queue.Queue()
|
||||||
|
self.stop_event= threading.Event()
|
||||||
# Database and connexion agent
|
# Database and connexion agent
|
||||||
self.net_agent = threading.Thread(
|
self.net_agent = threading.Thread(
|
||||||
target=rcf_client_worker, args=(self.ctx, self.store, peer, self.serial_product,self.serial_feed), name="net-agent")
|
target=rcf_client_worker, args=(self.ctx, self.store, peer, self.serial_product,self.serial_feed, self.stop_event), name="net-agent")
|
||||||
self.net_agent.daemon = True
|
self.net_agent.daemon = True
|
||||||
self.net_agent.start()
|
self.net_agent.start()
|
||||||
|
|
||||||
@ -83,6 +83,12 @@ class RCFClient(object):
|
|||||||
serial_agent.start()
|
serial_agent.start()
|
||||||
self.serial_agents.append(serial_agent)
|
self.serial_agents.append(serial_agent)
|
||||||
|
|
||||||
|
# Database and connexion agent
|
||||||
|
self.watchdog_agent = threading.Thread(
|
||||||
|
target=watchdog_worker, args=(self.serial_feed, .5, self.stop_event), name="watchdog-agent")
|
||||||
|
self.watchdog_agent.daemon = True
|
||||||
|
self.watchdog_agent.start()
|
||||||
|
|
||||||
def connect(self, id, address, port):
|
def connect(self, id, address, port):
|
||||||
self.pipe.send_multipart([b"CONNECT", (id.encode() if isinstance(
|
self.pipe.send_multipart([b"CONNECT", (id.encode() if isinstance(
|
||||||
id, str) else id), (address.encode() if isinstance(
|
id, str) else id), (address.encode() if isinstance(
|
||||||
@ -104,6 +110,7 @@ class RCFClient(object):
|
|||||||
self.pipe.send_multipart(
|
self.pipe.send_multipart(
|
||||||
[b"DISCONNECT"])
|
[b"DISCONNECT"])
|
||||||
|
|
||||||
|
|
||||||
def set(self, key, value=None, override=False):
|
def set(self, key, value=None, override=False):
|
||||||
"""Set new value in distributed hash table
|
"""Set new value in distributed hash table
|
||||||
Sends [SET][key][value] to the agent
|
Sends [SET][key][value] to the agent
|
||||||
@ -142,13 +149,10 @@ class RCFClient(object):
|
|||||||
if self.net_agent.is_alive():
|
if self.net_agent.is_alive():
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
|
|
||||||
# Disconnect time
|
self.stop_event.set()
|
||||||
time.sleep(0.2)
|
|
||||||
|
|
||||||
global stop
|
|
||||||
stop = True
|
|
||||||
for a in range(0,DUMP_AGENTS_NUMBER):
|
for a in range(0,DUMP_AGENTS_NUMBER):
|
||||||
self.serial_feed.put(('exit',None,None))
|
self.serial_feed.put(('STOP',None,None))
|
||||||
|
|
||||||
|
|
||||||
# READ-ONLY FUNCTIONS
|
# READ-ONLY FUNCTIONS
|
||||||
@ -179,6 +183,7 @@ class RCFClient(object):
|
|||||||
|
|
||||||
return dump_list
|
return dump_list
|
||||||
|
|
||||||
|
|
||||||
def state(self):
|
def state(self):
|
||||||
if not self.is_busy():
|
if not self.is_busy():
|
||||||
self.pipe.send_multipart([b"STATE"])
|
self.pipe.send_multipart([b"STATE"])
|
||||||
@ -370,15 +375,14 @@ class RCFClientAgent(object):
|
|||||||
self.pipe.send(umsgpack.packb(self.state.value))
|
self.pipe.send(umsgpack.packb(self.state.value))
|
||||||
|
|
||||||
|
|
||||||
def rcf_client_worker(ctx,store, pipe, serial_product, serial_feed):
|
def rcf_client_worker(ctx,store, pipe, serial_product, serial_feed, stop_event):
|
||||||
agent = RCFClientAgent(ctx,store, pipe)
|
agent = RCFClientAgent(ctx,store, pipe)
|
||||||
server = None
|
server = None
|
||||||
net_feed = serial_product
|
net_feed = serial_product
|
||||||
net_product = serial_feed
|
net_product = serial_feed
|
||||||
global stop
|
|
||||||
while True:
|
while not stop_event.is_set():
|
||||||
if stop:
|
|
||||||
break
|
|
||||||
# logger.info("asdasd")
|
# logger.info("asdasd")
|
||||||
poller = zmq.Poller()
|
poller = zmq.Poller()
|
||||||
poller.register(agent.pipe, zmq.POLLIN)
|
poller.register(agent.pipe, zmq.POLLIN)
|
||||||
@ -474,7 +478,7 @@ def serial_worker(product, feed):
|
|||||||
while True:
|
while True:
|
||||||
command,key,value = feed.get()
|
command,key,value = feed.get()
|
||||||
|
|
||||||
if command == 'exit':
|
if command == 'STOP':
|
||||||
break
|
break
|
||||||
elif command == 'DUMP':
|
elif command == 'DUMP':
|
||||||
value = helpers.dump(key)
|
value = helpers.dump(key)
|
||||||
@ -484,3 +488,19 @@ def serial_worker(product, feed):
|
|||||||
elif command == 'LOAD':
|
elif command == 'LOAD':
|
||||||
if value:
|
if value:
|
||||||
helpers.load(key, value)
|
helpers.load(key, value)
|
||||||
|
|
||||||
|
|
||||||
|
def watchdog_worker(feed,interval, stop_event):
|
||||||
|
import bpy
|
||||||
|
global stop
|
||||||
|
|
||||||
|
while not stop_event.is_set():
|
||||||
|
|
||||||
|
for datatype in helpers.SUPPORTED_TYPES:
|
||||||
|
for item in getattr(bpy.data, helpers.CORRESPONDANCE[datatype]):
|
||||||
|
if item.id == 'None':
|
||||||
|
item.id = bpy.context.scene.session_settings.username
|
||||||
|
key = "{}/{}".format(datatype, item.name)
|
||||||
|
feed.put(('DUMP',key,None))
|
||||||
|
|
||||||
|
time.sleep(interval)
|
||||||
|
@ -150,14 +150,14 @@ def sync():
|
|||||||
|
|
||||||
def register_ticks():
|
def register_ticks():
|
||||||
# REGISTER Updaters
|
# REGISTER Updaters
|
||||||
bpy.app.timers.register(sync)
|
# bpy.app.timers.register(sync)
|
||||||
bpy.app.timers.register(default_tick)
|
bpy.app.timers.register(default_tick)
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def unregister_ticks():
|
def unregister_ticks():
|
||||||
# REGISTER Updaters
|
# REGISTER Updaters
|
||||||
bpy.app.timers.unregister(sync)
|
# bpy.app.timers.unregister(sync)
|
||||||
bpy.app.timers.unregister(default_tick)
|
bpy.app.timers.unregister(default_tick)
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user