diff --git a/__init__.py b/__init__.py index 3dcf13b..c2b3ebf 100644 --- a/__init__.py +++ b/__init__.py @@ -32,14 +32,14 @@ logging.basicConfig(level=logging.INFO) # UTILITY FUNCTIONS def client_list_callback(scene, context): - from . import client + from operator import cli items = [("Common", "Common", "")] username = bpy.context.window_manager.session.username - if client.instance: - client_keys = client.instance.list() + if cli: + client_keys = cli.list() for k in client_keys: if 'Client' in k[0]: name = k[1] @@ -203,11 +203,11 @@ classes = ( ) -def register(): +def register(): environment.setup(DEPENDENCIES,bpy.app.binary_path_python) from . import operators - from . import ui + # from . import ui for cls in classes: bpy.utils.register_class(cls) @@ -220,14 +220,14 @@ def register(): bpy.context.window_manager.session.load() save_session_config(bpy.context.window_manager.session,bpy.context) operators.register() - ui.register() + # ui.register() def unregister(): from . import operators - from . import ui + # from . import ui - ui.unregister() + # ui.unregister() operators.unregister() del bpy.types.WindowManager.session diff --git a/client.py b/client.py deleted file mode 100644 index 10093bf..0000000 --- a/client.py +++ /dev/null @@ -1,499 +0,0 @@ -import binascii -import collections -import copy -import logging -import os -import queue -import sys -import threading -import time -from enum import Enum -from random import randint -import zmq -import json - -from . import environment,replication, helpers, message -from .libs import dump_anything, umsgpack - -CONNECT_TIMEOUT = 2 -WATCH_FREQUENCY = 0.1 -WAITING_TIME = 0.001 -SERVER_MAX = 1 -DUMP_AGENTS_NUMBER = 1 - -lock = threading.Lock() -logger = logging.getLogger(__name__) -logging.basicConfig(level=environment) -instance = None - - -class State(Enum): - INITIAL = 1 - SYNCING = 2 - ACTIVE = 3 - WORKING = 4 - - -def zpipe(ctx): - """build inproc pipe for talking to threads - - mimic pipe used in czmq zthread_fork. - - Returns a pair of PAIRs connected via inproc - """ - a = ctx.socket(zmq.PAIR) - b = ctx.socket(zmq.PAIR) - a.linger = b.linger = 0 - a.hwm = b.hwm = 1 - iface = "inproc://%s" % binascii.hexlify(os.urandom(8)) - a.bind(iface) - b.connect(iface) - return a, b - - -class Client(object): - ctx = None - pipe = None - net_agent = None - store = None - active_tasks = None - - def __init__(self, executor): - self.ctx = zmq.Context() - self.pipe, peer = zpipe(self.ctx) - self.store = {} - self.serial_product = queue.Queue() - self.serial_feed = queue.Queue() - self.stop_event = threading.Event() - self.external_tasks = executor - - # Net agent - self.net_agent = threading.Thread( - target=net_worker, - args=(self.ctx, self.store, peer, self.serial_product, self.serial_feed, self.stop_event,self.external_tasks), name="net-agent") - self.net_agent.daemon = True - self.net_agent.start() - - # Local data translation agent - self.serial_agents = [] - for a in range(0, DUMP_AGENTS_NUMBER): - serial_agent = threading.Thread( - target=serial_worker, args=(self.serial_product, self.serial_feed), name="serial-agent") - serial_agent.daemon = True - serial_agent.start() - self.serial_agents.append(serial_agent) - - # Sync agent - self.watchdog_agent = threading.Thread( - target=watchdog_worker, args=(self.serial_feed,WATCH_FREQUENCY, self.stop_event), name="watchdog-agent") - self.watchdog_agent.daemon = True - self.watchdog_agent.start() - - # Status - self.active_tasks = 0 - - def connect(self, id, address, port): - self.pipe.send_multipart([b"CONNECT", (id.encode() if isinstance( - id, str) else id), (address.encode() if isinstance( - address, str) else address), b'%d' % port]) - - def replicate(self, py_object): - """Entry point for python object replication - - Create object replication structure - - Add it to the distributed hash table - """ - pass - # node = Factory(py_object) - - # self.store - def init(self): - """ - Scene initialisation - """ - self.pipe.send_multipart( - [b"INIT"]) - - def disconnect(self): - """ - Disconnect - """ - self.pipe.send_multipart( - [b"DISCONNECT"]) - - def set(self, key, value=None, override=False): - """Set new value in distributed hash table - Sends [SET][key][value] to the agent - """ - - if value: - key = umsgpack.packb(key) - value = umsgpack.packb(value) if value else umsgpack.packb('None') - override = umsgpack.packb(override) - - self.pipe.send_multipart( - [b"SET", key, value, override]) - else: - self.serial_feed.put(('DUMP', key, None)) - - def add(self, key, value=None): - """Set new value in distributed hash table - """ - self.serial_feed.put(key) - - def is_busy(self): - self.active_tasks = self.serial_feed.qsize() + self.serial_product.qsize() - if self.active_tasks == 0: - return False - else: - return True - - def exit(self): - if self.net_agent.is_alive(): - self.disconnect() - - self.stop_event.set() - - for a in range(0, DUMP_AGENTS_NUMBER): - self.serial_feed.put(('STOP', None, None)) - - # READ-ONLY FUNCTIONS - def get(self, key): - """Lookup value in distributed hash table - Sends [GET][key] to the agent and waits for a value response - If there is no clone available, will eventually return None. - """ - value = [] - - for k in self.store.keys(): - if key in k: - value.append([k, self.store.get(k).body]) - - return value - - def exist(self, key): - """ - Fast key exist check - """ - - if key in self.store.keys(): - return True - else: - return False - - def list(self): - dump_list = [] - for k, v in self.store.items(): - if 'Client' in k: - dump_list.append([k, v.id.decode()]) - else: - try: - dump_list.append([k, v.body['id']]) - except: - pass - - return dump_list - - def state(self): - if self.net_agent is None or not self.net_agent.is_alive(): - return 1 #State.INITIAL - elif self.net_agent.is_alive() and self.store.keys(): - return 3 # State.ACTIVE - else: - return 2 #State.SYNCING - - - # SAVING FUNCTIONS - def dump(self, filepath): - with open('dump.json', "w") as fp: - for key, value in self.store.items(): - line = json.dumps(value.body) - fp.write(line) - - -class Server(object): - address = None # Server address - port = None # Server port - snapshot = None # Snapshot socket - subscriber = None # Incoming updates - - def __init__(self, ctx, address, port, id): - self.address = address - self.port = port - self.snapshot = ctx.socket(zmq.DEALER) - self.snapshot = self.context.socket(zmq.DEALER) - self.snapshot.setsockopt(zmq.IDENTITY, id) - self.snapshot.connect("tcp://{}:{}".format(address.decode(), port)) - self.subscriber = ctx.socket(zmq.SUB) - self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '') - self.subscriber.connect("tcp://{}:{}".format(address.decode(), port+1)) - self.subscriber.linger = 0 - print("connected on tcp://{}:{}".format(address.decode(), port)) - - -class ClientAgent(object): - ctx = None - pipe = None - property_map = None - publisher = None - id = None - state = None - server = None - serial = None - serialisation_agent = None - - def __init__(self, ctx, store, pipe): - self.ctx = ctx - self.pipe = pipe - self.property_map = store - self.id = b"test" - self.state = State.INITIAL - self.admin = False - self.server = None - self.publisher = self.ctx.socket(zmq.PUSH) # push update socket - self.publisher.setsockopt(zmq.IDENTITY, self.id) - self.publisher.setsockopt(zmq.SNDHWM, 60) - self.publisher.linger = 0 - - def control_message(self): - msg = self.pipe.recv_multipart() - command = msg.pop(0) - - if command == b"CONNECT": - self.id = msg.pop(0) - address = msg.pop(0) - port = int(msg.pop(0)) - - if self.server is None: - if address == '127.0.0.1' or address == 'localhost': - self.admin = True - self.server = Server(self.ctx, address, port, self.id) - self.publisher.connect( - "tcp://{}:{}".format(address.decode(), port+2)) - - else: - logger.error("E: too many servers (max. %i)", SERVER_MAX) - - elif command == b"DISCONNECT": - if self.admin is False: - uid = self.id.decode() - - delete_user = message.Message( - key="Client/{}".format(uid), id=self.id, body=None) - delete_user.send(self.publisher) - - # TODO: Do we need to pass every object rights to the moderator on disconnect? - # for k,v in self.property_map.items(): - # if v.body["id"] == uid: - # delete_msg = message.Message( - # key=k, id=self.id, body=None) - # # delete_msg.store(self.property_map) - # delete_msg.send(self.publisher) - - elif command == b"SET": - key = umsgpack.unpackb(msg[0]) - value = umsgpack.unpackb(msg[1]) - override = umsgpack.unpackb(msg[2]) - - if key in self.property_map.keys(): - if self.property_map[key].body['id'] == self.id.decode() or override: - if value == 'None': - value = helpers.dump(key) - value['id'] = self.id.decode() - if value: - key_id = self.id - msg = message.Message( - key=key, id=key_id, body=value) - - msg.store(self.property_map) - - if override: - helpers.load(key, self.property_map[key].body) - msg.send(self.publisher) - else: - logger.error("Fail to dump ") - else: - helpers.load(key, self.property_map[key].body) - - elif command == b"ADD": - key = umsgpack.unpackb(msg[0]) - value = umsgpack.unpackb(msg[1]) - - if value == 'None': - value = helpers.dump(key) - value['id'] = self.id.decode() - if value: - msg = message.Message( - key=key, id=self.id, body=value) - - msg.store(self.property_map) - msg.send(self.publisher) - else: - logger.error("Fail to dump ") - - elif command == b"GET": - value = [] - key = umsgpack.unpackb(msg[0]) - for k in self.property_map.keys(): - if key in k: - value.append([k, self.property_map.get(k).body]) - - self.pipe.send(umsgpack.packb(value) - if value else umsgpack.packb('')) - - elif command == b"LIST": - dump_list = [] - for k, v in self.property_map.items(): - if 'Client' in k: - dump_list.append([k, v.id.decode()]) - else: - try: - dump_list.append([k, v.body['id']]) - except: - pass - self.pipe.send(umsgpack.packb(dump_list) - if dump_list else umsgpack.packb('')) - - elif command == b"STATE": - self.pipe.send(umsgpack.packb(self.state.value)) - - -def net_worker(ctx, store, pipe, serial_product, serial_feed, stop_event,external_executor): - agent = ClientAgent(ctx, store, pipe) - server = None - net_feed = serial_product - net_product = serial_feed - external_executor = external_executor - while not stop_event.is_set(): - poller = zmq.Poller() - poller.register(agent.pipe, zmq.POLLIN) - server_socket = None - - if agent.state == State.INITIAL: - server = agent.server - if agent.server: - logger.debug("%s: waiting for server at %s:%d...", - agent.id.decode(), server.address, server.port) - server.snapshot.send(b"SNAPSHOT_REQUEST") - agent.state = State.SYNCING - server_socket = server.snapshot - elif agent.state == State.SYNCING: - server_socket = server.snapshot - elif agent.state == State.ACTIVE: - server_socket = server.subscriber - - if server_socket: - poller.register(server_socket, zmq.POLLIN) - - try: - items = dict(poller.poll(1)) - except: - raise - break - - if agent.pipe in items: - agent.control_message() - elif server_socket in items: - msg = message.Message.recv(server_socket) - - if agent.state == State.SYNCING: - # CLient snapshot - if msg.key == "SNAPSHOT_END": - client_key = "Client/{}".format(agent.id.decode()) - - client_dict = {} - client_dict = helpers.init_client(key=client_key) - client_dict['id'] = agent.id.decode() - - client_store = message.Message( - key=client_key, id=agent.id, body=client_dict) - client_store.store(agent.property_map) - client_store.send(agent.publisher) - - agent.state = State.ACTIVE - logger.debug("snapshot complete") - else: - net_product.put(('LOAD', msg.key, msg.body)) - - # helpers.load(msg.key, msg.body) - msg.store(agent.property_map) - logger.debug("snapshot from {} stored".format(msg.id)) - elif agent.state == State.ACTIVE: - if msg.id != agent.id: - - # with lock: - # helpers.load(msg.key, msg.body) - msg.store(agent.property_map) - # net_product.put(('LOAD', msg.key, msg.body)) - params = [] - params.append(msg.key) - params.append(msg.body) - external_executor.put((helpers.load,params)) - else: - logger.debug("{} nothing to do".format(agent.id)) - - # Serialisation thread => Net thread - if not net_feed.empty(): - key, value = net_feed.get() - if value: - # Stamp with id - value['id'] = agent.id.decode() - - # Format massage - msg = message.Message( - key=key, id=agent.id, body=value) - - msg.store(agent.property_map) - msg.send(agent.publisher) - else: - logger.error("Fail to dump ") - - logger.info("exit thread") - - -def serial_worker(serial_product, serial_feed): - logger.info("serial thread launched") - - while True: - command, key, value = serial_feed.get() - - if command == 'STOP': - break - elif command == 'DUMP': - try: - value = helpers.dump(key) - - if value: - serial_product.put((key, value)) - except Exception as e: - logger.error("{}".format(e)) - elif command == 'LOAD': - if value: - try: - helpers.load(key, value) - except Exception as e: - logger.error("{}".format(e)) - - logger.info("serial thread stopped") - - -def watchdog_worker(serial_feed, interval, stop_event): - import bpy - - logger.info( - "watchdog thread launched with {} sec of interval".format(interval)) - while not stop_event.is_set(): - for datatype in environment.rtypes: - for item in getattr(bpy.data, helpers.BPY_TYPES[datatype]): - key = "{}/{}".format(datatype, item.name) - try: - if item.is_dirty: - logger.debug("{} needs update".format(key)) - serial_feed.put(('DUMP', key, None)) - item.is_dirty = False - except: - pass - time.sleep(interval) - - logger.info("watchdog thread stopped") - - diff --git a/environment.py b/environment.py index a18dba2..4189f74 100644 --- a/environment.py +++ b/environment.py @@ -114,6 +114,7 @@ def setup(dependencies, python_path): PYTHON_PATH = Path(python_path) SUBPROCESS_DIR = PYTHON_PATH.parent + if not module_can_be_imported("pip"): install_pip() diff --git a/message.py b/message.py deleted file mode 100644 index 0ff280b..0000000 --- a/message.py +++ /dev/null @@ -1,75 +0,0 @@ -import logging -try: - from .libs import umsgpack - -except: - # Server import - from libs import umsgpack - -import zmq - -logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG) - -class Message(object): - """ - Message is formatted on wire as 2 frames: - frame 0: key (0MQ string) // property path - frame 1: id (0MQ string) // property path - frame 3: body (blob) // Could be any data - - """ - key = None # key (string) - id = None # User (string) - body = None # data blob - - - def __init__(self, key=None, id=None, body=None): - self.key = key - self.body = body - self.id = id - - def store(self, dikt): - """Store me in a dict if I have anything to store""" - # this currently erasing old value - if self.key is not None: - if self.body == 'None': - logger.info("erasing key {}".format(self.key)) - del dikt[self.key] - else: - dikt[self.key] = self - - def send(self, socket): - """Send key-value message to socket; any empty frames are sent as such.""" - key = ''.encode() if self.key is None else self.key.encode() - body = umsgpack.packb('None') if self.body is None else umsgpack.packb(self.body) - id = ''.encode() if self.id is None else self.id - - try: - socket.send_multipart([key, id, body]) - except: - print("Fail to send {} {} {}".format(key, id, body)) - - @classmethod - def recv(cls, socket): - """Reads key-value message from socket, returns new kvmsg instance.""" - key, id, body = socket.recv_multipart(zmq.NOBLOCK) - key = key.decode() if key else None - id = id if id else None - body = umsgpack.unpackb(body) if body else None - - return cls(key=key, id=id, body=body) - - def dump(self): - if self.body is None: - size = 0 - data = 'NULL' - else: - size = len(self.body) - data = repr(self.body) - print("[key:{key}][size:{size}] {data}".format( - key=self.key, - size=size, - data=data, - )) - diff --git a/operators.py b/operators.py index 5427c6e..8088ae0 100644 --- a/operators.py +++ b/operators.py @@ -14,12 +14,13 @@ from bpy_extras.io_utils import ExportHelper import mathutils from pathlib import Path -from . import environment, client, draw, helpers, ui +from . import environment, presence, ui from .libs import umsgpack +from .libs.replication import client logger = logging.getLogger(__name__) -# client_instance = None +cli = None server = None context = None diff --git a/draw.py b/presence.py similarity index 99% rename from draw.py rename to presence.py index 227cf3c..2ff5b94 100644 --- a/draw.py +++ b/presence.py @@ -212,6 +212,7 @@ class DrawFactory(object): except Exception as e: print("2D EXCEPTION") + def register(): global renderer renderer = DrawFactory() diff --git a/server.py b/server.py deleted file mode 100644 index a7a1a9f..0000000 --- a/server.py +++ /dev/null @@ -1,99 +0,0 @@ -import logging -import time -import environment -from operator import itemgetter - - -import zmq -import message - -logger = logging.getLogger("Server") -logging.basicConfig(level=logging.DEBUG) - -SUPPORTED_TYPES = ['Image','Client','Curve','Material','Texture', 'Light', 'Camera', 'Mesh','Armature', 'GreasePencil', 'Object', 'Action', 'Collection', 'Scene'] - -class ServerAgent(): - def __init__(self, context=zmq.Context.instance(), id="admin"): - self.context = context - self.config = environment.load_config() - self.port = int(self.config['port']) if "port" in self.config.keys() else 5555 - self.pub_sock = None - self.request_sock = None - self.collector_sock = None - self.poller = None - - self.property_map = {} - self.id = id - self.bind_ports() - # Main client loop registration - self.tick() - - logger.info("{} client initialized".format(id)) - - def bind_ports(self): - # Update all clients - self.pub_sock = self.context.socket(zmq.PUB) - self.pub_sock.setsockopt(zmq.SNDHWM, 60) - self.pub_sock.bind("tcp://*:"+str(self.port+1)) - time.sleep(0.2) - - # Update request - self.request_sock = self.context.socket(zmq.ROUTER) - self.request_sock.setsockopt(zmq.IDENTITY, b'SERVER') - self.request_sock.setsockopt(zmq.RCVHWM, 60) - self.request_sock.bind("tcp://*:"+str(self.port)) - - # Update collector - self.collector_sock = self.context.socket(zmq.PULL) - self.collector_sock.setsockopt(zmq.RCVHWM, 60) - self.collector_sock.bind("tcp://*:"+str(self.port+2)) - - # poller for socket aggregation - self.poller = zmq.Poller() - self.poller.register(self.request_sock, zmq.POLLIN) - self.poller.register(self.collector_sock, zmq.POLLIN) - - def tick(self): - logger.info("{} server launched on {}".format(id,self.port)) - - while True: - # Non blocking poller - socks = dict(self.poller.poll(1000)) - - # Snapshot system for late join (Server - Client) - if self.request_sock in socks: - msg = self.request_sock.recv_multipart(zmq.DONTWAIT) - - identity = msg[0] - request = msg[1] - - if request == b"SNAPSHOT_REQUEST": - pass - else: - logger.info("Bad snapshot request") - break - - ordered_props = [(SUPPORTED_TYPES.index(k.split('/')[0]),k,v) for k, v in self.property_map.items()] - ordered_props.sort(key=itemgetter(0)) - - for i, k, v in ordered_props: - logger.info( - "Sending {} snapshot to {}".format(k, identity)) - self.request_sock.send(identity, zmq.SNDMORE) - v.send(self.request_sock) - - msg_end_snapshot = message.Message(key="SNAPSHOT_END", id=identity) - self.request_sock.send(identity, zmq.SNDMORE) - msg_end_snapshot.send(self.request_sock) - logger.info("done") - - # Regular update routing (Clients / Client) - elif self.collector_sock in socks: - msg = message.Message.recv(self.collector_sock) - # logger.info("received object") - # Update all clients - msg.store(self.property_map) - msg.send(self.pub_sock) - -server = ServerAgent() - diff --git a/ui.py b/ui.py index 2fcadeb..41dc2c6 100644 --- a/ui.py +++ b/ui.py @@ -1,7 +1,5 @@ import bpy -from . import client -from . import operators - +from .libs.replication import client ICONS = {'Image': 'IMAGE_DATA', 'Curve':'CURVE_DATA', 'Client':'SOLO_ON','Collection': 'FILE_FOLDER', 'Mesh': 'MESH_DATA', 'Object': 'OBJECT_DATA', 'Material': 'MATERIAL_DATA',