refactor: started replication integration

This commit is contained in:
Swann Martinez 2019-08-05 15:24:00 +02:00
parent 5e98b1f6ed
commit 716f462893
No known key found for this signature in database
GPG Key ID: 414CCAFD8DA720E1
8 changed files with 14 additions and 686 deletions

View File

@ -32,14 +32,14 @@ logging.basicConfig(level=logging.INFO)
# UTILITY FUNCTIONS
def client_list_callback(scene, context):
from . import client
from operator import cli
items = [("Common", "Common", "")]
username = bpy.context.window_manager.session.username
if client.instance:
client_keys = client.instance.list()
if cli:
client_keys = cli.list()
for k in client_keys:
if 'Client' in k[0]:
name = k[1]
@ -203,11 +203,11 @@ classes = (
)
def register():
def register():
environment.setup(DEPENDENCIES,bpy.app.binary_path_python)
from . import operators
from . import ui
# from . import ui
for cls in classes:
bpy.utils.register_class(cls)
@ -220,14 +220,14 @@ def register():
bpy.context.window_manager.session.load()
save_session_config(bpy.context.window_manager.session,bpy.context)
operators.register()
ui.register()
# ui.register()
def unregister():
from . import operators
from . import ui
# from . import ui
ui.unregister()
# ui.unregister()
operators.unregister()
del bpy.types.WindowManager.session

499
client.py
View File

@ -1,499 +0,0 @@
import binascii
import collections
import copy
import logging
import os
import queue
import sys
import threading
import time
from enum import Enum
from random import randint
import zmq
import json
from . import environment,replication, helpers, message
from .libs import dump_anything, umsgpack
CONNECT_TIMEOUT = 2
WATCH_FREQUENCY = 0.1
WAITING_TIME = 0.001
SERVER_MAX = 1
DUMP_AGENTS_NUMBER = 1
lock = threading.Lock()
logger = logging.getLogger(__name__)
logging.basicConfig(level=environment)
instance = None
class State(Enum):
INITIAL = 1
SYNCING = 2
ACTIVE = 3
WORKING = 4
def zpipe(ctx):
"""build inproc pipe for talking to threads
mimic pipe used in czmq zthread_fork.
Returns a pair of PAIRs connected via inproc
"""
a = ctx.socket(zmq.PAIR)
b = ctx.socket(zmq.PAIR)
a.linger = b.linger = 0
a.hwm = b.hwm = 1
iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
a.bind(iface)
b.connect(iface)
return a, b
class Client(object):
ctx = None
pipe = None
net_agent = None
store = None
active_tasks = None
def __init__(self, executor):
self.ctx = zmq.Context()
self.pipe, peer = zpipe(self.ctx)
self.store = {}
self.serial_product = queue.Queue()
self.serial_feed = queue.Queue()
self.stop_event = threading.Event()
self.external_tasks = executor
# Net agent
self.net_agent = threading.Thread(
target=net_worker,
args=(self.ctx, self.store, peer, self.serial_product, self.serial_feed, self.stop_event,self.external_tasks), name="net-agent")
self.net_agent.daemon = True
self.net_agent.start()
# Local data translation agent
self.serial_agents = []
for a in range(0, DUMP_AGENTS_NUMBER):
serial_agent = threading.Thread(
target=serial_worker, args=(self.serial_product, self.serial_feed), name="serial-agent")
serial_agent.daemon = True
serial_agent.start()
self.serial_agents.append(serial_agent)
# Sync agent
self.watchdog_agent = threading.Thread(
target=watchdog_worker, args=(self.serial_feed,WATCH_FREQUENCY, self.stop_event), name="watchdog-agent")
self.watchdog_agent.daemon = True
self.watchdog_agent.start()
# Status
self.active_tasks = 0
def connect(self, id, address, port):
self.pipe.send_multipart([b"CONNECT", (id.encode() if isinstance(
id, str) else id), (address.encode() if isinstance(
address, str) else address), b'%d' % port])
def replicate(self, py_object):
"""Entry point for python object replication
- Create object replication structure
- Add it to the distributed hash table
"""
pass
# node = Factory(py_object)
# self.store
def init(self):
"""
Scene initialisation
"""
self.pipe.send_multipart(
[b"INIT"])
def disconnect(self):
"""
Disconnect
"""
self.pipe.send_multipart(
[b"DISCONNECT"])
def set(self, key, value=None, override=False):
"""Set new value in distributed hash table
Sends [SET][key][value] to the agent
"""
if value:
key = umsgpack.packb(key)
value = umsgpack.packb(value) if value else umsgpack.packb('None')
override = umsgpack.packb(override)
self.pipe.send_multipart(
[b"SET", key, value, override])
else:
self.serial_feed.put(('DUMP', key, None))
def add(self, key, value=None):
"""Set new value in distributed hash table
"""
self.serial_feed.put(key)
def is_busy(self):
self.active_tasks = self.serial_feed.qsize() + self.serial_product.qsize()
if self.active_tasks == 0:
return False
else:
return True
def exit(self):
if self.net_agent.is_alive():
self.disconnect()
self.stop_event.set()
for a in range(0, DUMP_AGENTS_NUMBER):
self.serial_feed.put(('STOP', None, None))
# READ-ONLY FUNCTIONS
def get(self, key):
"""Lookup value in distributed hash table
Sends [GET][key] to the agent and waits for a value response
If there is no clone available, will eventually return None.
"""
value = []
for k in self.store.keys():
if key in k:
value.append([k, self.store.get(k).body])
return value
def exist(self, key):
"""
Fast key exist check
"""
if key in self.store.keys():
return True
else:
return False
def list(self):
dump_list = []
for k, v in self.store.items():
if 'Client' in k:
dump_list.append([k, v.id.decode()])
else:
try:
dump_list.append([k, v.body['id']])
except:
pass
return dump_list
def state(self):
if self.net_agent is None or not self.net_agent.is_alive():
return 1 #State.INITIAL
elif self.net_agent.is_alive() and self.store.keys():
return 3 # State.ACTIVE
else:
return 2 #State.SYNCING
# SAVING FUNCTIONS
def dump(self, filepath):
with open('dump.json', "w") as fp:
for key, value in self.store.items():
line = json.dumps(value.body)
fp.write(line)
class Server(object):
address = None # Server address
port = None # Server port
snapshot = None # Snapshot socket
subscriber = None # Incoming updates
def __init__(self, ctx, address, port, id):
self.address = address
self.port = port
self.snapshot = ctx.socket(zmq.DEALER)
self.snapshot = self.context.socket(zmq.DEALER)
self.snapshot.setsockopt(zmq.IDENTITY, id)
self.snapshot.connect("tcp://{}:{}".format(address.decode(), port))
self.subscriber = ctx.socket(zmq.SUB)
self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
self.subscriber.connect("tcp://{}:{}".format(address.decode(), port+1))
self.subscriber.linger = 0
print("connected on tcp://{}:{}".format(address.decode(), port))
class ClientAgent(object):
ctx = None
pipe = None
property_map = None
publisher = None
id = None
state = None
server = None
serial = None
serialisation_agent = None
def __init__(self, ctx, store, pipe):
self.ctx = ctx
self.pipe = pipe
self.property_map = store
self.id = b"test"
self.state = State.INITIAL
self.admin = False
self.server = None
self.publisher = self.ctx.socket(zmq.PUSH) # push update socket
self.publisher.setsockopt(zmq.IDENTITY, self.id)
self.publisher.setsockopt(zmq.SNDHWM, 60)
self.publisher.linger = 0
def control_message(self):
msg = self.pipe.recv_multipart()
command = msg.pop(0)
if command == b"CONNECT":
self.id = msg.pop(0)
address = msg.pop(0)
port = int(msg.pop(0))
if self.server is None:
if address == '127.0.0.1' or address == 'localhost':
self.admin = True
self.server = Server(self.ctx, address, port, self.id)
self.publisher.connect(
"tcp://{}:{}".format(address.decode(), port+2))
else:
logger.error("E: too many servers (max. %i)", SERVER_MAX)
elif command == b"DISCONNECT":
if self.admin is False:
uid = self.id.decode()
delete_user = message.Message(
key="Client/{}".format(uid), id=self.id, body=None)
delete_user.send(self.publisher)
# TODO: Do we need to pass every object rights to the moderator on disconnect?
# for k,v in self.property_map.items():
# if v.body["id"] == uid:
# delete_msg = message.Message(
# key=k, id=self.id, body=None)
# # delete_msg.store(self.property_map)
# delete_msg.send(self.publisher)
elif command == b"SET":
key = umsgpack.unpackb(msg[0])
value = umsgpack.unpackb(msg[1])
override = umsgpack.unpackb(msg[2])
if key in self.property_map.keys():
if self.property_map[key].body['id'] == self.id.decode() or override:
if value == 'None':
value = helpers.dump(key)
value['id'] = self.id.decode()
if value:
key_id = self.id
msg = message.Message(
key=key, id=key_id, body=value)
msg.store(self.property_map)
if override:
helpers.load(key, self.property_map[key].body)
msg.send(self.publisher)
else:
logger.error("Fail to dump ")
else:
helpers.load(key, self.property_map[key].body)
elif command == b"ADD":
key = umsgpack.unpackb(msg[0])
value = umsgpack.unpackb(msg[1])
if value == 'None':
value = helpers.dump(key)
value['id'] = self.id.decode()
if value:
msg = message.Message(
key=key, id=self.id, body=value)
msg.store(self.property_map)
msg.send(self.publisher)
else:
logger.error("Fail to dump ")
elif command == b"GET":
value = []
key = umsgpack.unpackb(msg[0])
for k in self.property_map.keys():
if key in k:
value.append([k, self.property_map.get(k).body])
self.pipe.send(umsgpack.packb(value)
if value else umsgpack.packb(''))
elif command == b"LIST":
dump_list = []
for k, v in self.property_map.items():
if 'Client' in k:
dump_list.append([k, v.id.decode()])
else:
try:
dump_list.append([k, v.body['id']])
except:
pass
self.pipe.send(umsgpack.packb(dump_list)
if dump_list else umsgpack.packb(''))
elif command == b"STATE":
self.pipe.send(umsgpack.packb(self.state.value))
def net_worker(ctx, store, pipe, serial_product, serial_feed, stop_event,external_executor):
agent = ClientAgent(ctx, store, pipe)
server = None
net_feed = serial_product
net_product = serial_feed
external_executor = external_executor
while not stop_event.is_set():
poller = zmq.Poller()
poller.register(agent.pipe, zmq.POLLIN)
server_socket = None
if agent.state == State.INITIAL:
server = agent.server
if agent.server:
logger.debug("%s: waiting for server at %s:%d...",
agent.id.decode(), server.address, server.port)
server.snapshot.send(b"SNAPSHOT_REQUEST")
agent.state = State.SYNCING
server_socket = server.snapshot
elif agent.state == State.SYNCING:
server_socket = server.snapshot
elif agent.state == State.ACTIVE:
server_socket = server.subscriber
if server_socket:
poller.register(server_socket, zmq.POLLIN)
try:
items = dict(poller.poll(1))
except:
raise
break
if agent.pipe in items:
agent.control_message()
elif server_socket in items:
msg = message.Message.recv(server_socket)
if agent.state == State.SYNCING:
# CLient snapshot
if msg.key == "SNAPSHOT_END":
client_key = "Client/{}".format(agent.id.decode())
client_dict = {}
client_dict = helpers.init_client(key=client_key)
client_dict['id'] = agent.id.decode()
client_store = message.Message(
key=client_key, id=agent.id, body=client_dict)
client_store.store(agent.property_map)
client_store.send(agent.publisher)
agent.state = State.ACTIVE
logger.debug("snapshot complete")
else:
net_product.put(('LOAD', msg.key, msg.body))
# helpers.load(msg.key, msg.body)
msg.store(agent.property_map)
logger.debug("snapshot from {} stored".format(msg.id))
elif agent.state == State.ACTIVE:
if msg.id != agent.id:
# with lock:
# helpers.load(msg.key, msg.body)
msg.store(agent.property_map)
# net_product.put(('LOAD', msg.key, msg.body))
params = []
params.append(msg.key)
params.append(msg.body)
external_executor.put((helpers.load,params))
else:
logger.debug("{} nothing to do".format(agent.id))
# Serialisation thread => Net thread
if not net_feed.empty():
key, value = net_feed.get()
if value:
# Stamp with id
value['id'] = agent.id.decode()
# Format massage
msg = message.Message(
key=key, id=agent.id, body=value)
msg.store(agent.property_map)
msg.send(agent.publisher)
else:
logger.error("Fail to dump ")
logger.info("exit thread")
def serial_worker(serial_product, serial_feed):
logger.info("serial thread launched")
while True:
command, key, value = serial_feed.get()
if command == 'STOP':
break
elif command == 'DUMP':
try:
value = helpers.dump(key)
if value:
serial_product.put((key, value))
except Exception as e:
logger.error("{}".format(e))
elif command == 'LOAD':
if value:
try:
helpers.load(key, value)
except Exception as e:
logger.error("{}".format(e))
logger.info("serial thread stopped")
def watchdog_worker(serial_feed, interval, stop_event):
import bpy
logger.info(
"watchdog thread launched with {} sec of interval".format(interval))
while not stop_event.is_set():
for datatype in environment.rtypes:
for item in getattr(bpy.data, helpers.BPY_TYPES[datatype]):
key = "{}/{}".format(datatype, item.name)
try:
if item.is_dirty:
logger.debug("{} needs update".format(key))
serial_feed.put(('DUMP', key, None))
item.is_dirty = False
except:
pass
time.sleep(interval)
logger.info("watchdog thread stopped")

View File

@ -114,6 +114,7 @@ def setup(dependencies, python_path):
PYTHON_PATH = Path(python_path)
SUBPROCESS_DIR = PYTHON_PATH.parent
if not module_can_be_imported("pip"):
install_pip()

View File

@ -1,75 +0,0 @@
import logging
try:
from .libs import umsgpack
except:
# Server import
from libs import umsgpack
import zmq
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
class Message(object):
"""
Message is formatted on wire as 2 frames:
frame 0: key (0MQ string) // property path
frame 1: id (0MQ string) // property path
frame 3: body (blob) // Could be any data
"""
key = None # key (string)
id = None # User (string)
body = None # data blob
def __init__(self, key=None, id=None, body=None):
self.key = key
self.body = body
self.id = id
def store(self, dikt):
"""Store me in a dict if I have anything to store"""
# this currently erasing old value
if self.key is not None:
if self.body == 'None':
logger.info("erasing key {}".format(self.key))
del dikt[self.key]
else:
dikt[self.key] = self
def send(self, socket):
"""Send key-value message to socket; any empty frames are sent as such."""
key = ''.encode() if self.key is None else self.key.encode()
body = umsgpack.packb('None') if self.body is None else umsgpack.packb(self.body)
id = ''.encode() if self.id is None else self.id
try:
socket.send_multipart([key, id, body])
except:
print("Fail to send {} {} {}".format(key, id, body))
@classmethod
def recv(cls, socket):
"""Reads key-value message from socket, returns new kvmsg instance."""
key, id, body = socket.recv_multipart(zmq.NOBLOCK)
key = key.decode() if key else None
id = id if id else None
body = umsgpack.unpackb(body) if body else None
return cls(key=key, id=id, body=body)
def dump(self):
if self.body is None:
size = 0
data = 'NULL'
else:
size = len(self.body)
data = repr(self.body)
print("[key:{key}][size:{size}] {data}".format(
key=self.key,
size=size,
data=data,
))

View File

@ -14,12 +14,13 @@ from bpy_extras.io_utils import ExportHelper
import mathutils
from pathlib import Path
from . import environment, client, draw, helpers, ui
from . import environment, presence, ui
from .libs import umsgpack
from .libs.replication import client
logger = logging.getLogger(__name__)
# client_instance = None
cli = None
server = None
context = None

View File

@ -212,6 +212,7 @@ class DrawFactory(object):
except Exception as e:
print("2D EXCEPTION")
def register():
global renderer
renderer = DrawFactory()

View File

@ -1,99 +0,0 @@
import logging
import time
import environment
from operator import itemgetter
import zmq
import message
logger = logging.getLogger("Server")
logging.basicConfig(level=logging.DEBUG)
SUPPORTED_TYPES = ['Image','Client','Curve','Material','Texture', 'Light', 'Camera', 'Mesh','Armature', 'GreasePencil', 'Object', 'Action', 'Collection', 'Scene']
class ServerAgent():
def __init__(self, context=zmq.Context.instance(), id="admin"):
self.context = context
self.config = environment.load_config()
self.port = int(self.config['port']) if "port" in self.config.keys() else 5555
self.pub_sock = None
self.request_sock = None
self.collector_sock = None
self.poller = None
self.property_map = {}
self.id = id
self.bind_ports()
# Main client loop registration
self.tick()
logger.info("{} client initialized".format(id))
def bind_ports(self):
# Update all clients
self.pub_sock = self.context.socket(zmq.PUB)
self.pub_sock.setsockopt(zmq.SNDHWM, 60)
self.pub_sock.bind("tcp://*:"+str(self.port+1))
time.sleep(0.2)
# Update request
self.request_sock = self.context.socket(zmq.ROUTER)
self.request_sock.setsockopt(zmq.IDENTITY, b'SERVER')
self.request_sock.setsockopt(zmq.RCVHWM, 60)
self.request_sock.bind("tcp://*:"+str(self.port))
# Update collector
self.collector_sock = self.context.socket(zmq.PULL)
self.collector_sock.setsockopt(zmq.RCVHWM, 60)
self.collector_sock.bind("tcp://*:"+str(self.port+2))
# poller for socket aggregation
self.poller = zmq.Poller()
self.poller.register(self.request_sock, zmq.POLLIN)
self.poller.register(self.collector_sock, zmq.POLLIN)
def tick(self):
logger.info("{} server launched on {}".format(id,self.port))
while True:
# Non blocking poller
socks = dict(self.poller.poll(1000))
# Snapshot system for late join (Server - Client)
if self.request_sock in socks:
msg = self.request_sock.recv_multipart(zmq.DONTWAIT)
identity = msg[0]
request = msg[1]
if request == b"SNAPSHOT_REQUEST":
pass
else:
logger.info("Bad snapshot request")
break
ordered_props = [(SUPPORTED_TYPES.index(k.split('/')[0]),k,v) for k, v in self.property_map.items()]
ordered_props.sort(key=itemgetter(0))
for i, k, v in ordered_props:
logger.info(
"Sending {} snapshot to {}".format(k, identity))
self.request_sock.send(identity, zmq.SNDMORE)
v.send(self.request_sock)
msg_end_snapshot = message.Message(key="SNAPSHOT_END", id=identity)
self.request_sock.send(identity, zmq.SNDMORE)
msg_end_snapshot.send(self.request_sock)
logger.info("done")
# Regular update routing (Clients / Client)
elif self.collector_sock in socks:
msg = message.Message.recv(self.collector_sock)
# logger.info("received object")
# Update all clients
msg.store(self.property_map)
msg.send(self.pub_sock)
server = ServerAgent()

4
ui.py
View File

@ -1,7 +1,5 @@
import bpy
from . import client
from . import operators
from .libs.replication import client
ICONS = {'Image': 'IMAGE_DATA', 'Curve':'CURVE_DATA', 'Client':'SOLO_ON','Collection': 'FILE_FOLDER', 'Mesh': 'MESH_DATA', 'Object': 'OBJECT_DATA', 'Material': 'MATERIAL_DATA',