multi-user/client.py

523 lines
16 KiB
Python
Raw Normal View History

2019-04-10 17:01:21 +02:00
import binascii
2019-03-25 14:56:09 +01:00
import collections
2019-05-02 17:58:37 +02:00
import copy
import logging
import os
2019-05-02 17:58:37 +02:00
import queue
2019-04-10 11:21:10 +02:00
import sys
2019-04-10 17:01:21 +02:00
import threading
2019-02-11 15:48:07 +01:00
import time
2019-03-15 17:37:02 +01:00
from enum import Enum
2019-04-10 17:01:21 +02:00
from random import randint
2019-05-02 17:58:37 +02:00
from . import draw, helpers, message
from .libs import dump_anything, umsgpack, zmq
2019-04-26 17:18:55 +02:00
# import zmq
2019-04-11 18:07:59 +02:00
lock = threading.Lock()
logger = logging.getLogger(__name__)
2019-04-17 14:22:56 +02:00
logging.basicConfig(level=logging.INFO)
2019-05-02 14:46:31 +02:00
2019-03-25 14:56:09 +01:00
CONNECT_TIMEOUT = 2
2019-03-15 16:50:59 +01:00
WAITING_TIME = 0.001
2019-04-08 12:53:14 +02:00
SERVER_MAX = 1
2019-04-08 17:01:02 +02:00
stop = False
DUMP_AGENTS_NUMBER = 1
2019-04-10 17:01:21 +02:00
2019-05-02 14:46:31 +02:00
class State(Enum):
INITIAL = 1
SYNCING = 2
ACTIVE = 3
2019-05-03 11:32:14 +02:00
WORKING = 4
2019-05-02 14:46:31 +02:00
2019-04-08 12:53:14 +02:00
def zpipe(ctx):
"""build inproc pipe for talking to threads
2019-03-25 14:56:09 +01:00
2019-04-08 12:53:14 +02:00
mimic pipe used in czmq zthread_fork.
Returns a pair of PAIRs connected via inproc
"""
a = ctx.socket(zmq.PAIR)
b = ctx.socket(zmq.PAIR)
a.linger = b.linger = 0
a.hwm = b.hwm = 1
iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
a.bind(iface)
b.connect(iface)
return a, b
2019-04-08 12:53:14 +02:00
class RCFClient(object):
ctx = None
pipe = None
net_agent = None
2019-05-02 14:46:31 +02:00
store = None
2019-05-03 11:32:14 +02:00
active_tasks = None
def __init__(self):
self.ctx = zmq.Context()
self.pipe, peer = zpipe(self.ctx)
2019-05-02 14:46:31 +02:00
self.store = {}
self.serial_product = queue.Queue()
self.serial_feed = queue.Queue()
2019-05-02 18:19:59 +02:00
self.stop_event= threading.Event()
# Database and connexion agent
self.net_agent = threading.Thread(
2019-05-02 18:19:59 +02:00
target=rcf_client_worker, args=(self.ctx, self.store, peer, self.serial_product,self.serial_feed, self.stop_event), name="net-agent")
self.net_agent.daemon = True
self.net_agent.start()
# Local data translation agent
self.serial_agents = []
for a in range(0, DUMP_AGENTS_NUMBER):
serial_agent = threading.Thread(
target=serial_worker, args=(self.serial_product, self.serial_feed), name="serial-agent")
serial_agent.daemon = True
serial_agent.start()
self.serial_agents.append(serial_agent)
2019-04-18 15:05:48 +02:00
# Sync Watchdog
2019-05-02 18:19:59 +02:00
self.watchdog_agent = threading.Thread(
target=watchdog_worker, args=(self.serial_feed, 0.2, self.stop_event), name="watchdog-agent")
2019-05-02 18:19:59 +02:00
self.watchdog_agent.daemon = True
self.watchdog_agent.start()
2019-05-03 11:32:14 +02:00
# Status
self.active_tasks = 0
2019-04-10 18:26:39 +02:00
def connect(self, id, address, port):
self.pipe.send_multipart([b"CONNECT", (id.encode() if isinstance(
id, str) else id), (address.encode() if isinstance(
2019-04-18 15:05:48 +02:00
address, str) else address), b'%d' % port])
2019-05-02 16:05:06 +02:00
def init(self):
"""
Scene initialisation
"""
self.pipe.send_multipart(
[b"INIT"])
2019-04-30 17:18:41 +02:00
def disconnect(self):
"""
Disconnect
"""
self.pipe.send_multipart(
[b"DISCONNECT"])
2019-05-02 18:19:59 +02:00
2019-04-24 17:42:23 +02:00
def set(self, key, value=None, override=False):
"""Set new value in distributed hash table
2019-04-08 18:21:48 +02:00
Sends [SET][key][value] to the agent
"""
2019-05-02 14:46:31 +02:00
if value:
pass
self.pipe.send_multipart(
2019-04-24 17:42:23 +02:00
[b"SET", umsgpack.packb(key), (umsgpack.packb(value) if value else umsgpack.packb('None')),umsgpack.packb(override)])
2019-05-02 14:46:31 +02:00
else:
self.serial_feed.put(('DUMP',key,None))
2019-05-02 14:46:31 +02:00
# self.serial_pipe.send_multipart(
# [b"DUMP", umsgpack.packb(key)])
# self.pipe.send_multipart(
# [b"SET", umsgpack.packb(key), (umsgpack.packb(value) if value else umsgpack.packb('None')),umsgpack.packb(override)])
2019-04-08 12:53:14 +02:00
def add(self, key, value=None):
"""Set new value in distributed hash table
Sends [SET][key][value] to the agent
"""
2019-05-02 14:46:31 +02:00
self.serial_feed.put(key)
2019-05-02 14:46:31 +02:00
# self.pipe.send_multipart(
# [b"ADD", umsgpack.packb(key), (umsgpack.packb(value) if value else umsgpack.packb('None'))])
2019-05-03 11:32:14 +02:00
2019-05-02 14:46:31 +02:00
def is_busy(self):
2019-05-03 11:32:14 +02:00
self.active_tasks = self.serial_feed.qsize() + self.serial_product.qsize()
if self.active_tasks == 0:
2019-05-02 14:46:31 +02:00
return False
2019-04-08 18:21:48 +02:00
else:
2019-05-02 14:46:31 +02:00
return True
2019-04-08 18:21:48 +02:00
2019-05-02 16:05:06 +02:00
2019-04-08 17:01:02 +02:00
def exit(self):
if self.net_agent.is_alive():
2019-04-30 17:18:41 +02:00
self.disconnect()
2019-05-02 18:19:59 +02:00
self.stop_event.set()
for a in range(0,DUMP_AGENTS_NUMBER):
2019-05-02 18:19:59 +02:00
self.serial_feed.put(('STOP',None,None))
2019-03-25 14:56:09 +01:00
2019-05-02 14:46:31 +02:00
# READ-ONLY FUNCTIONS
def get(self, key):
"""Lookup value in distributed hash table
Sends [GET][key] to the agent and waits for a value response
If there is no clone available, will eventually return None.
"""
value = []
for k in self.store.keys():
if key in k:
value.append([k, self.store.get(k).body])
return value
2019-05-02 16:05:06 +02:00
2019-05-02 14:46:31 +02:00
2019-04-11 14:39:31 +02:00
def list(self):
2019-05-02 14:46:31 +02:00
dump_list = []
for k,v in self.store.items():
if 'Client' in k:
dump_list.append([k,v.id.decode()])
else:
try:
dump_list.append([k,v.body['id']])
except:
pass
return dump_list
2019-04-10 17:01:21 +02:00
2019-05-02 18:19:59 +02:00
def state(self):
2019-05-02 14:46:31 +02:00
if not self.is_busy():
self.pipe.send_multipart([b"STATE"])
try:
reply = self.pipe.recv_multipart()
except KeyboardInterrupt:
return
else:
return umsgpack.unpackb(reply[0])
else:
2019-05-02 14:46:31 +02:00
return None
2019-04-08 12:53:14 +02:00
class RCFServer(object):
address = None # Server address
port = None # Server port
snapshot = None # Snapshot socket
subscriber = None # Incoming updates
2019-04-10 17:01:21 +02:00
def __init__(self, ctx, address, port, id):
2019-04-08 12:53:14 +02:00
self.address = address
self.port = port
self.snapshot = ctx.socket(zmq.DEALER)
self.snapshot.linger = 0
self.snapshot.setsockopt(zmq.IDENTITY, id)
self.snapshot.connect("tcp://{}:{}".format(address.decode(), port))
2019-04-08 12:53:14 +02:00
self.subscriber = ctx.socket(zmq.SUB)
self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
self.subscriber.connect("tcp://{}:{}".format(address.decode(), port+1))
2019-04-08 12:53:14 +02:00
self.subscriber.linger = 0
print("connected on tcp://{}:{}".format(address.decode(), port))
2019-04-10 17:01:21 +02:00
2019-04-08 12:53:14 +02:00
class RCFClientAgent(object):
ctx = None
2019-04-08 12:53:14 +02:00
pipe = None
property_map = None
publisher = None
id = None
state = State.INITIAL
server = None
serial = None
serialisation_agent = None
2019-04-08 12:53:14 +02:00
2019-05-02 14:46:31 +02:00
def __init__(self, ctx, store, pipe):
self.ctx = ctx
self.pipe = pipe
2019-05-02 14:46:31 +02:00
self.property_map = store
self.id = b"test"
2019-04-08 12:53:14 +02:00
self.state = State.INITIAL
self.admin = False
2019-04-08 12:53:14 +02:00
self.server = None
self.publisher = self.ctx.socket(zmq.PUSH) # push update socket
2019-04-08 12:53:14 +02:00
self.publisher.setsockopt(zmq.IDENTITY, self.id)
self.publisher.setsockopt(zmq.SNDHWM, 60)
self.publisher.linger = 0
2019-04-22 15:58:32 +02:00
# self.serial_agent = threading.Thread(
# target=serialization_agent, args=(self.ctx, peer), name="serial-agent")
# self.serial_agent.daemon = True
# self.serial_agent.start()
def _add(self,key, value='None'):
if value == 'None':
# try to dump from bpy
# logging.info(key)
value = helpers.dump(key)
value['id'] = self.id.decode()
if value:
rcfmsg = message.RCFMessage(
key=key, id=self.id, body=value)
rcfmsg.store(self.property_map)
rcfmsg.send(self.publisher)
else:
logger.error("Fail to dump ")
2019-04-08 12:53:14 +02:00
def control_message(self):
2019-04-08 12:53:14 +02:00
msg = self.pipe.recv_multipart()
command = msg.pop(0)
if command == b"CONNECT":
2019-04-10 18:26:39 +02:00
self.id = msg.pop(0)
2019-04-08 12:53:14 +02:00
address = msg.pop(0)
port = int(msg.pop(0))
if self.server is None:
if address == '127.0.0.1' or address == 'localhost' :
self.admin = True
self.server = RCFServer(self.ctx, address, port, self.id)
2019-04-10 17:01:21 +02:00
self.publisher.connect(
"tcp://{}:{}".format(address.decode(), port+2))
2019-04-08 12:53:14 +02:00
else:
logger.error("E: too many servers (max. %i)", SERVER_MAX)
2019-04-10 17:01:21 +02:00
2019-04-30 17:18:41 +02:00
elif command == b"DISCONNECT":
if self.admin is False:
2019-04-30 17:18:41 +02:00
uid = self.id.decode()
delete_user = message.RCFMessage(
key="Client/{}".format(uid), id=self.id, body=None)
delete_user.send(self.publisher)
# TODO: Do we need to pass every object rights to the moderator ?
# for k,v in self.property_map.items():
# if v.body["id"] == uid:
# delete_msg = message.RCFMessage(
# key=k, id=self.id, body=None)
# # delete_msg.store(self.property_map)
# delete_msg.send(self.publisher)
2019-04-30 17:18:41 +02:00
elif command == b"SET":
2019-04-10 18:01:55 +02:00
key = umsgpack.unpackb(msg[0])
value = umsgpack.unpackb(msg[1])
2019-04-24 17:42:23 +02:00
override = umsgpack.unpackb(msg[2])
if key in self.property_map.keys():
2019-04-25 11:37:00 +02:00
if self.property_map[key].body['id'] == self.id.decode() or override:
if value == 'None':
value = helpers.dump(key)
value['id'] = self.id.decode()
if value:
2019-04-24 17:42:23 +02:00
key_id = self.id
2019-04-24 18:21:07 +02:00
# if override:
# key_id = value['id'].encode()
2019-04-24 17:42:23 +02:00
rcfmsg = message.RCFMessage(
2019-04-24 18:21:07 +02:00
key=key, id=key_id, body=value)
rcfmsg.store(self.property_map)
2019-04-24 18:21:07 +02:00
if override:
helpers.load(key,self.property_map[key].body)
rcfmsg.send(self.publisher)
else:
logger.error("Fail to dump ")
else:
helpers.load(key,self.property_map[key].body)
2019-04-24 17:42:23 +02:00
elif command == b"INIT":
d = helpers.get_all_datablocks()
for i in d:
self._add(i)
elif command == b"ADD":
key = umsgpack.unpackb(msg[0])
value = umsgpack.unpackb(msg[1])
2019-04-18 15:05:48 +02:00
if value == 'None':
# try to dump from bpy
2019-04-29 16:39:11 +02:00
# logging.info(key)
value = helpers.dump(key)
value['id'] = self.id.decode()
2019-04-10 18:01:55 +02:00
if value:
2019-04-18 15:05:48 +02:00
rcfmsg = message.RCFMessage(
2019-04-24 18:21:07 +02:00
key=key, id=self.id, body=value)
2019-04-18 15:05:48 +02:00
2019-04-10 18:01:55 +02:00
rcfmsg.store(self.property_map)
rcfmsg.send(self.publisher)
else:
logger.error("Fail to dump ")
2019-04-10 17:01:21 +02:00
2019-04-08 18:21:48 +02:00
elif command == b"GET":
value = []
2019-04-08 18:21:48 +02:00
key = umsgpack.unpackb(msg[0])
for k in self.property_map.keys():
if key in k:
2019-04-18 15:05:48 +02:00
value.append([k, self.property_map.get(k).body])
# value = [self.property_map.get(key) for key in keys]
# value = self.property_map.get(key)
2019-04-18 15:05:48 +02:00
self.pipe.send(umsgpack.packb(value)
if value else umsgpack.packb(''))
2019-04-11 14:39:31 +02:00
elif command == b"LIST":
2019-04-22 12:14:39 +02:00
dump_list = []
for k,v in self.property_map.items():
if 'Client' in k:
dump_list.append([k,v.id.decode()])
else:
2019-04-30 17:18:41 +02:00
try:
dump_list.append([k,v.body['id']])
except:
pass
2019-04-22 12:14:39 +02:00
self.pipe.send(umsgpack.packb(dump_list)
if dump_list else umsgpack.packb(''))
2019-04-10 17:01:21 +02:00
elif command == b"STATE":
self.pipe.send(umsgpack.packb(self.state.value))
2019-04-18 15:05:48 +02:00
2019-05-02 18:19:59 +02:00
def rcf_client_worker(ctx,store, pipe, serial_product, serial_feed, stop_event):
2019-05-02 14:46:31 +02:00
agent = RCFClientAgent(ctx,store, pipe)
2019-04-08 12:53:14 +02:00
server = None
net_feed = serial_product
net_product = serial_feed
2019-05-02 18:19:59 +02:00
while not stop_event.is_set():
# logger.info("asdasd")
2019-04-08 12:53:14 +02:00
poller = zmq.Poller()
poller.register(agent.pipe, zmq.POLLIN)
server_socket = None
if agent.state == State.INITIAL:
server = agent.server
if agent.server:
2019-04-10 18:26:39 +02:00
logger.info("%s: waiting for server at %s:%d...",
2019-04-18 15:05:48 +02:00
agent.id.decode(), server.address, server.port)
2019-04-08 12:53:14 +02:00
server.snapshot.send(b"SNAPSHOT_REQUEST")
agent.state = State.SYNCING
server_socket = server.snapshot
2019-04-08 12:53:14 +02:00
elif agent.state == State.SYNCING:
server_socket = server.snapshot
2019-04-08 12:53:14 +02:00
elif agent.state == State.ACTIVE:
server_socket = server.subscriber
if server_socket:
poller.register(server_socket, zmq.POLLIN)
try:
2019-04-08 17:01:02 +02:00
items = dict(poller.poll(1))
2019-04-08 12:53:14 +02:00
except:
2019-04-10 17:01:21 +02:00
raise
break
2019-04-08 12:53:14 +02:00
if agent.pipe in items:
agent.control_message()
elif server_socket in items:
2019-04-10 17:01:21 +02:00
rcfmsg = message.RCFMessage.recv(server_socket)
2019-04-18 15:05:48 +02:00
2019-04-08 12:53:14 +02:00
if agent.state == State.SYNCING:
# Store snapshot
if rcfmsg.key == "SNAPSHOT_END":
client_key = "Client/{}".format(agent.id.decode())
client_dict = {}
2019-04-17 15:48:20 +02:00
with lock:
client_dict = helpers.init_client(key=client_key)
client_dict['id'] = agent.id.decode()
2019-04-18 15:05:48 +02:00
client_store = message.RCFMessage(
key=client_key, id=agent.id, body=client_dict)
logger.info(client_store)
2019-04-30 17:18:41 +02:00
client_store.store(agent.property_map)
client_store.send(agent.publisher)
logger.info("snapshot complete")
2019-04-08 12:53:14 +02:00
agent.state = State.ACTIVE
else:
net_product.put(('LOAD',rcfmsg.key, rcfmsg.body))
# helpers.load(rcfmsg.key, rcfmsg.body)
2019-04-08 12:53:14 +02:00
rcfmsg.store(agent.property_map)
logger.info("snapshot from {} stored".format(rcfmsg.id))
2019-04-08 12:53:14 +02:00
elif agent.state == State.ACTIVE:
2019-04-26 17:57:53 +02:00
# IN
2019-04-08 12:53:14 +02:00
if rcfmsg.id != agent.id:
2019-05-02 14:46:31 +02:00
# with lock:
# helpers.load(rcfmsg.key, rcfmsg.body)
net_product.put(('LOAD',rcfmsg.key, rcfmsg.body))
2019-04-08 12:53:14 +02:00
rcfmsg.store(agent.property_map)
2019-05-02 14:46:31 +02:00
else:
2019-04-17 14:22:56 +02:00
logger.debug("{} nothing to do".format(agent.id))
2019-04-08 17:01:02 +02:00
2019-05-02 14:46:31 +02:00
# Serialisation thread input
if not net_feed.empty():
key, value = net_feed.get()
2019-05-02 14:46:31 +02:00
if value:
# Stamp with id
value['id'] = agent.id.decode()
2019-05-02 14:46:31 +02:00
# Format massage
rcfmsg = message.RCFMessage(
key=key, id=agent.id, body=value)
2019-04-26 17:57:53 +02:00
2019-05-02 14:46:31 +02:00
rcfmsg.store(agent.property_map)
rcfmsg.send(agent.publisher)
else:
logger.error("Fail to dump ")
2019-04-26 17:57:53 +02:00
2019-05-02 14:46:31 +02:00
2019-04-26 17:57:53 +02:00
2019-04-08 17:01:02 +02:00
logger.info("exit thread")
stop = False
2019-04-10 17:01:21 +02:00
# else: else
# agent.state = State.INITIAL
2019-04-08 12:53:14 +02:00
def serial_worker(product, feed):
2019-05-02 18:23:35 +02:00
logger.info("serial thread launched")
2019-04-10 17:01:21 +02:00
while True:
command,key,value = feed.get()
2019-05-02 18:19:59 +02:00
if command == 'STOP':
2019-04-10 17:01:21 +02:00
break
elif command == 'DUMP':
value = helpers.dump(key)
2019-04-10 17:01:21 +02:00
if value:
product.put((key,value))
elif command == 'LOAD':
if value:
helpers.load(key, value)
2019-05-02 18:19:59 +02:00
2019-05-02 18:23:35 +02:00
logger.info("serial thread stopped")
2019-05-02 18:19:59 +02:00
def watchdog_worker(feed,interval, stop_event):
import bpy
2019-05-02 18:23:35 +02:00
logger.info("watchdog thread launched with {} sec of interval".format(interval))
2019-05-02 18:19:59 +02:00
while not stop_event.is_set():
for datatype in helpers.SUPPORTED_TYPES:
for item in getattr(bpy.data, helpers.CORRESPONDANCE[datatype]):
2019-05-02 18:40:42 +02:00
key = "{}/{}".format(datatype, item.name)
try:
if item.id == 'None':
item.id = bpy.context.scene.session_settings.username
feed.put(('DUMP',key,None))
elif item.is_dirty:
logger.info("{} needs update".format(item.name))
feed.put(('DUMP',key,None))
item.is_dirty = False
except:
pass
2019-05-02 18:19:59 +02:00
time.sleep(interval)
2019-05-02 18:23:35 +02:00
logger.info("watchdog thread stopped")