feat(rcf): apprend basic multithreaded client api

This commit is contained in:
Swann 2019-04-08 15:54:21 +02:00
parent fecc429ef1
commit c137971606
No known key found for this signature in database
GPG Key ID: B02D0B41F8B6D2EE
6 changed files with 163 additions and 221 deletions

View File

@ -1,11 +1,17 @@
import collections
import logging
import threading
from uuid import uuid4
import binascii
import os
from random import randint
import time
from enum import Enum
from .libs import umsgpack, zmq
try:
from .libs import umsgpack, zmq
except:
from libs import umsgpack, zmq
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
@ -13,6 +19,7 @@ CONNECT_TIMEOUT = 2
WAITING_TIME = 0.001
SERVER_MAX = 1
def zpipe(ctx):
"""build inproc pipe for talking to threads
@ -27,7 +34,7 @@ def zpipe(ctx):
iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
a.bind(iface)
b.connect(iface)
return a,b
return a, b
class State(Enum):
@ -36,52 +43,10 @@ class State(Enum):
ACTIVE = 3
class RCFFactory(object):
"""
Abstract layer used to bridge external and inter
"""
def init(self, data):
"""
set the RCFMessage pointer to local data
"""
print("Default setter")
# Setup data accessor
data.get = self.load_getter(data)
data.set = self.load_setter(data)
# TODO: Setup local pointer
def load_getter(self, data):
"""
local program > rcf
"""
print("Default getter")
return None
def load_setter(self, data):
"""
rcf > local program
"""
print("Default setter")
return None
def apply(self, data):
pass
def diff(self, data):
"""
Verify data integrity
"""
pass
class RCFStore(collections.MutableMapping, dict):
def __init__(self, custom_factory=RCFFactory()):
def __init__(self):
super().__init__()
self.factory = custom_factory
def __getitem__(self, key):
return dict.__getitem__(self, key)
@ -117,22 +82,20 @@ class RCFMessage(object):
body = None # data blob
uuid = None
def __init__(self, key=None,uuid= None, id=None, mtype=None, body=None):
def __init__(self, key=None, uuid=None, id=None, mtype=None, body=None):
if uuid is None:
uuid = uuid4()
uuid = uuid4().bytes
self.key = key
self.uuid = uuid
self.mtype = mtype
self.body = body
self.id = id
def store(self, dikt):
"""Store me in a dict if I have anything to store"""
# this currently erasing old value
if self.key is not None :
if self.key is not None:
dikt[self.key] = self
# elif self.key in dikt:
# del dikt[self.key]
@ -140,26 +103,25 @@ class RCFMessage(object):
def send(self, socket):
"""Send key-value message to socket; any empty frames are sent as such."""
key = ''.encode() if self.key is None else self.key.encode()
print(self.mtype)
mtype = ''.encode() if self.mtype is None else self.mtype.encode()
body = ''.encode() if self.body is None else umsgpack.packb(self.body)
id = ''.encode() if self.id is None else self.id
try:
socket.send_multipart([key,self.uuid, id, mtype, body])
socket.send_multipart([key, id, mtype, body])
except:
logger.info("Fail to send {}".format(key))
logger.info("Fail to send {} {}".format(key,id))
@classmethod
def recv(cls, socket):
"""Reads key-value message from socket, returns new kvmsg instance."""
key,uuid, id, mtype, body = socket.recv_multipart(zmq.DONTWAIT)
key, id, mtype, body = socket.recv_multipart(zmq.DONTWAIT)
key = key.decode() if key else None
id = id if id else None
mtype = mtype.decode() if body else None
body = umsgpack.unpackb(body) if body else None
return cls(key=key,uuid=uuid, id=id, mtype=mtype, body=body)
return cls(key=key, id=id, mtype=mtype, body=body)
def dump(self):
if self.body is None:
@ -176,84 +138,29 @@ class RCFMessage(object):
))
class RCFClient(object):
ctx = None
pipe = None
agent = None
class RCFClient():
def __init__(
self,
context=zmq.Context(),
id="default",
on_recv=None,
on_post_init=None,
is_admin=False,
factory=None,
address="localhost"):
def __init__(self):
self.ctx = zmq.Context()
self.pipe, peer = zpipe(self.ctx)
self.agent = threading.Thread(
target=rcf_client_agent, args=(self.ctx, peer))
self.agent.daemon = True
self.agent.start()
# 0MQ vars
self.context = context
self.pull_sock = None
self.req_sock = None
self.poller = None
def connect(self, address, port):
self.pipe.send_multipart([b"CONNECT", (address.encode() if isinstance(
address, str) else address), b'%d' % port])
# Client configuration
self.id = id.encode()
self.on_recv = on_recv
self.on_post_init = on_post_init
self.status = RCFStatus.IDLE
self.is_admin = is_admin
self.address = address
def set(self, key, value):
"""Set new value in distributed hash table
Sends [SET][key][value][ttl] to the agent
"""
self.pipe.send_multipart([b"SET", umsgpack.packb(key), umsgpack.packb(value)])
self.bind_ports()
# client routine registration
self.load_task = asyncio.ensure_future(self.load())
self.tick_task = None
logger.info("{} client initialized".format(id))
def bind_ports(self):
# pull socket: get update FROM server
self.pull_sock = self.context.socket(zmq.SUB)
self.pull_sock.linger = 0
self.pull_sock.connect("tcp://{}:5555".format(self.address))
self.pull_sock.setsockopt_string(zmq.SUBSCRIBE, '')
# request socket: send request/message over all peers throught the server
self.req_sock = self.context.socket(zmq.DEALER)
self.req_sock.setsockopt(zmq.IDENTITY, self.id)
# self.req_sock.setsockopt(zmq.SNDHWM, 60)
self.req_sock.linger = 0
self.req_sock.connect("tcp://{}:5556".format(self.address))
# push update socket
self.push_sock = self.context.socket(zmq.PUSH)
self.push_sock.setsockopt(zmq.IDENTITY, self.id)
self.push_sock.linger = 0
self.push_sock.connect("tcp://{}:5557".format(self.address))
self.push_sock.setsockopt(zmq.SNDHWM, 60)
# Sockets aggregator, not really used for now
self.poller = zmq.Poller()
self.poller.register(self.pull_sock, zmq.POLLIN)
time.sleep(0.1)
def push_update(self, key, mtype, body):
rcfmsg = RCFMessage(key=key, id=self.id,mtype=mtype, body=body)
rcfmsg.send(self.push_sock)
def stop(self):
logger.debug("Stopping client")
self.poller.unregister(self.pull_sock)
self.req_sock.close()
self.push_sock.close()
self.pull_sock.close()
self.load_task.cancel()
if self.tick_task:
self.tick_task.cancel()
class RCFServer(object):
address = None # Server address
@ -261,19 +168,22 @@ class RCFServer(object):
snapshot = None # Snapshot socket
subscriber = None # Incoming updates
def __init__(self, ctx, address, port):
def __init__(self, ctx, address, port,id):
self.address = address
self.port = port
self.snapshot = ctx.socket(zmq.DEALER)
self.snapshot.linger = 0
self.snapshot.connect("%s:%i".format(address.decode(),port))
self.snapshot.connect("tcp://{}:{}".format(address.decode(), port))
self.snapshot.setsockopt(zmq.IDENTITY, id)
self.subscriber = ctx.socket(zmq.SUB)
self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
self.subscriber.connect("%s:%i".format(address.decode(),port+1))
self.subscriber.connect("tcp://{}:{}".format(address.decode(), port+1))
self.subscriber.linger = 0
print("connected on tcp://{}:{}".format(address.decode(), port))
class RCFClientAgent(object):
ctx = None
ctx = None
pipe = None
property_map = None
publisher = None
@ -281,21 +191,19 @@ class RCFClientAgent(object):
state = State.INITIAL
server = None
def __init__(self, ctx, pipe, id):
self.ctx = None
self.pipe = None
self.property_map = None
self.publisher = None
self.id = None
def __init__(self, ctx, pipe):
self.ctx = ctx
self.pipe = pipe
self.property_map = RCFStore()
self.id = b"test"
self.state = State.INITIAL
self.server = None
self.publisher = self.context.socket(zmq.PUSH) # push update socket
self.publisher = self.ctx.socket(zmq.PUSH) # push update socket
self.publisher.setsockopt(zmq.IDENTITY, self.id)
self.publisher.setsockopt(zmq.SNDHWM, 60)
self.publisher.linger = 0
def control_message (self):
def control_message(self):
msg = self.pipe.recv_multipart()
command = msg.pop(0)
@ -303,32 +211,43 @@ class RCFClientAgent(object):
address = msg.pop(0)
port = int(msg.pop(0))
if len(self.servers) < SERVER_MAX:
self.server = RCFServer(self.ctx, address, port)
self.publisher.connect("tcp://{}:5557".format(address.decode()))
if self.server is None:
self.server = RCFServer(self.ctx, address, port, self.id)
self.publisher.connect("tcp://{}:{}".format(address.decode(), port+2))
else:
logger.error("E: too many servers (max. %i)", SERVER_MAX)
elif command == b"SET":
key,value = msg
# Send key-value pair on to server
rcfmsg = RCFMessage(key=umsgpack.unpackb(key),id=self.id ,mtype="",body=umsgpack.unpackb(value))
rcfmsg.store(self.property_map)
rcfmsg.send(self.publisher)
def rcf_client_agent(ctx,pipe,id):
agent = RCFClientAgent(ctx,pipe,id)
def rcf_client_agent(ctx, pipe):
agent = RCFClientAgent(ctx, pipe)
server = None
while True:
# logger.info("asdasd")
poller = zmq.Poller()
poller.register(agent.pipe, zmq.POLLIN)
server_socket = None
if agent.state == State.INITIAL:
server = agent.server
if agent.servers:
logger.info ("I: waiting for server at %s:%d...",
server.address, server.port)
if agent.server:
logger.info("I: waiting for server at %s:%d...",
server.address, server.port)
server.snapshot.send(b"SNAPSHOT_REQUEST")
agent.state = State.SYNCING
server_socket = server.snapshot
elif agent.state == State.SYNCING:
sever_socket = server.snapshot
server_socket = server.snapshot
elif agent.state == State.ACTIVE:
server_socket = server.subscriber
@ -338,14 +257,12 @@ def rcf_client_agent(ctx,pipe,id):
try:
items = dict(poller.poll())
except:
raise
break
pass
if agent.pipe in items:
agent.control_message()
elif server_socket in items:
rcfmsg = RCFMessage.recv(server_socket)
if agent.state == State.SYNCING:
# Store snapshot
if rcfmsg.key == "SNAPSHOT_END":
@ -356,16 +273,15 @@ def rcf_client_agent(ctx,pipe,id):
elif agent.state == State.ACTIVE:
if rcfmsg.id != agent.id:
rcfmsg.store(agent.property_map)
action = "update" if kvmsg.body else "delete"
logging.info ("I: received from %s:%d %s",
server.address, server.port, action)
else:
agent.state = State.INITIAL
action = "update" if rcfmsg.body else "delete"
logging.info("I: received from {}:{},{} {}".format(server.address,rcfmsg.body.id, server.port, action))
else:
logger.info("IDLE")
# else: else
# agent.state = State.INITIAL
class RCFServerAgent():
def __init__(self, context=zmq.Context(), id="admin"):
def __init__(self, context=zmq.Context.instance(), id="admin"):
self.context = context
self.pub_sock = None
@ -373,11 +289,11 @@ class RCFServerAgent():
self.collector_sock = None
self.poller = None
self.property_map = RCFStore()
self.property_map = {}
self.id = id
self.bind_ports()
# Main client loop registration
tick()
self.tick()
logger.info("{} client initialized".format(id))
@ -385,14 +301,14 @@ class RCFServerAgent():
# Update all clients
self.pub_sock = self.context.socket(zmq.PUB)
self.pub_sock.setsockopt(zmq.SNDHWM, 60)
self.pub_sock.bind("tcp://*:5555")
self.pub_sock.bind("tcp://*:5556")
time.sleep(0.2)
# Update request
self.request_sock = self.context.socket(zmq.ROUTER)
self.request_sock.setsockopt(zmq.IDENTITY, b'SERVER')
self.request_sock.setsockopt(zmq.RCVHWM, 60)
self.request_sock.bind("tcp://*:5556")
self.request_sock.bind("tcp://*:5555")
# Update collector
self.collector_sock = self.context.socket(zmq.PULL)
@ -409,7 +325,7 @@ class RCFServerAgent():
while True:
# Non blocking poller
socks = dict(self.poller.poll())
socks = dict(self.poller.poll(1000))
# Snapshot system for late join (Server - Client)
if self.request_sock in socks:
@ -417,7 +333,7 @@ class RCFServerAgent():
identity = msg[0]
request = msg[1]
print("asdasd")
if request == b"SNAPSHOT_REQUEST":
pass
else:
@ -437,16 +353,8 @@ class RCFServerAgent():
# Regular update routing (Clients / Client)
elif self.collector_sock in socks:
msg = RCFMessage.recv(self.collector_sock)
msg = RCFMessage.recv(self.collector_sock)
# Update all clients
msg.store(self.property_map)
msg.send(self.pub_sock)
def stop(self):
logger.debug("Stopping server")
self.poller.unregister(self.request_sock)
self.pub_sock.close()
self.request_sock.close()
self.collector_sock.close()
self.status = RCFStatus.IDLE

View File

@ -672,8 +672,8 @@ class session_join(bpy.types.Operator):
net_settings = context.scene.session_settings
# Scene setup
if net_settings.session_mode == "CONNECT" and net_settings.clear_scene:
clean_scene()
# if net_settings.session_mode == "CONNECT" and net_settings.clear_scene:
# clean_scene()
# Session setup
if net_settings.username == "DefaultUser":
@ -682,23 +682,21 @@ class session_join(bpy.types.Operator):
username = str(context.scene.session_settings.username)
client = net_components.RCFClient(
id=username,
on_recv=recv_callbacks,
on_post_init=post_init_callbacks,
address=net_settings.ip,
is_admin=net_settings.session_mode == "HOST")
bpy.ops.asyncio.loop()
client = net_components.RCFClient()
client.connect("127.0.0.1",5555)
client.set('key', 1)
net_settings.is_running = True
drawer = net_draw.HUD(client_instance=client)
# net_settings.is_running = True
register_ticks()
# drawer = net_draw.HUD(client_instance=client)
# register_ticks()
return {"FINISHED"}
class session_add_property(bpy.types.Operator):
bl_idname = "session.add_prop"
bl_label = "add"
@ -715,21 +713,22 @@ class session_add_property(bpy.types.Operator):
def execute(self, context):
global client
item = resolve_bpy_path(self.property_path)
client.set('key', 1)
# item = resolve_bpy_path(self.property_path)
print(item)
# print(item)
if item:
key = self.property_path
# if item:
# key = self.property_path
dumper = dump_anything.Dumper()
dumper.type_subset = dumper.match_subset_all
dumper.depth = self.depth
# dumper = dump_anything.Dumper()
# dumper.type_subset = dumper.match_subset_all
# dumper.depth = self.depth
data = dumper.dump(item)
data_type = item.__class__.__name__
# data = dumper.dump(item)
# data_type = item.__class__.__name__
client.push_update(key, data_type, data)
# client.push_update(key, data_type, data)
return {"FINISHED"}
@ -771,7 +770,7 @@ class session_create(bpy.types.Operator):
global server
global client
server = net_components.RCFServer()
server = net_components.RCFServerAgent()
time.sleep(0.1)
bpy.ops.session.join()
@ -983,11 +982,11 @@ def unregister():
pass
if server:
server.stop()
# server.stop()
del server
server = None
if client:
client.stop()
# client.stop()
del client
client = None

View File

@ -55,17 +55,17 @@ class SessionSettingsPanel(bpy.types.Panel):
row = layout.row()
row.operator("session.join", text="CONNECT")
else:
# else:
if net_operators.client.status is net_components.RCFStatus.CONNECTED:
row.label(text="Net frequency:")
row.prop(net_settings, "update_frequency", text="")
row = layout.row()
row.operator("session.stop", icon='QUIT', text="Exit")
elif net_operators.client.status is net_components.RCFStatus.CONNECTING:
row.label(text="connecting...")
row = layout.row()
row.operator("session.stop", icon='QUIT', text="CANCEL")
# if net_operators.client.status is net_components.RCFStatus.CONNECTED:
# row.label(text="Net frequency:")
# row.prop(net_settings, "update_frequency", text="")
# row = layout.row()
# row.operator("session.stop", icon='QUIT', text="Exit")
# elif net_operators.client.status is net_components.RCFStatus.CONNECTING:
# row.label(text="connecting...")
# row = layout.row()
# row.operator("session.stop", icon='QUIT', text="CANCEL")
row = layout.row()
@ -198,9 +198,9 @@ class SessionTaskPanel(bpy.types.Panel):
classes = (
SessionSettingsPanel,
SessionUsersPanel,
SessionPropertiesPanel,
SessionTaskPanel,
# SessionUsersPanel,
# SessionPropertiesPanel,
# SessionTaskPanel,
)

20
rcf_server.py Normal file
View File

@ -0,0 +1,20 @@
import collections
import logging
import threading
from uuid import uuid4
import binascii
import os
from random import randint
import time
from enum import Enum
from libs import umsgpack, zmq
from net_components import RCFMessage
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
CONNECT_TIMEOUT = 2
WAITING_TIME = 0.001
SERVER_MAX = 1

14
test_client.py Normal file
View File

@ -0,0 +1,14 @@
from net_components import RCFClient
import time
client = RCFClient()
client.connect("127.0.0.1",5555)
try:
while True:
client.set('key', 1)
# Distribute as key-value message
time.sleep(1)
except KeyboardInterrupt:
pass

View File

@ -1,3 +1,4 @@
from net_components import RCFServerAgent
server = RCFServerAgent()
server = RCFServerAgent()