From 84023938626c67f16579491792270b8734151d1f Mon Sep 17 00:00:00 2001 From: Swann Date: Mon, 11 Feb 2019 15:48:07 +0100 Subject: [PATCH] client/server chat draft --- client.py | 59 ++++++++++++++++++++ libs/kvsimple.py | 67 +++++++++++++++++++++++ net_components.py | 137 +++++++++++++++++++++++++++++++++++++++++++--- net_operators.py | 55 ++++++++++++++++++- net_ui.py | 2 +- server.py | 43 +++++++++++++++ 6 files changed, 353 insertions(+), 10 deletions(-) create mode 100644 client.py create mode 100644 libs/kvsimple.py create mode 100644 server.py diff --git a/client.py b/client.py new file mode 100644 index 0000000..318fbc9 --- /dev/null +++ b/client.py @@ -0,0 +1,59 @@ + + +""" +Clone Client Model One + +Author: Min RK + +""" + +import random +import time +import msgpack +import zmq + +from libs import kvsimple + +def main(): + # Prepare our context and publisher socket + ctx = zmq.Context() + + # Update socket binding + updates = ctx.socket(zmq.SUB) + updates.linger = 0 + updates.connect("tcp://localhost:5555") + updates.setsockopt_string(zmq.SUBSCRIBE, '') + + state_request = ctx.socket(zmq.DEALER) + state_request.setsockopt(zmq.IDENTITY, b"PEER2") + state_request.linger = 0 + state_request.connect("tcp://localhost:5556") + + # poller for socket aggregation + poller = zmq.Poller() + poller.register(updates, zmq.POLLIN) + + while True: + try: + socks = dict(poller.poll(10)) + except KeyboardInterrupt: + break + + if updates in socks: + message = updates.recv_multipart() + print(message) + + # Send update + + new_state= b"test" + state_request.send(new_state) + print("Sending {}".format(new_state)) + time.sleep(1) + + + + +if __name__ == '__main__': + main() + + diff --git a/libs/kvsimple.py b/libs/kvsimple.py new file mode 100644 index 0000000..361e77b --- /dev/null +++ b/libs/kvsimple.py @@ -0,0 +1,67 @@ + + +""" + +kvsimple - simple key-value message class for example applications + +Author: Min RK + +""" + +import struct # for packing integers +import sys + +import zmq + +class KVMsg(object): + """ + Message is formatted on wire as 3 frames: + frame 0: key (0MQ string) + frame 1: sequence (8 bytes, network order) + frame 2: body (blob) + """ + key = None # key (string) + sequence = 0 # int + body = None # blob + + def __init__(self, sequence, key=None, body=None): + assert isinstance(sequence, int) + self.sequence = sequence + self.key = key + self.body = body + + def store(self, dikt): + """Store me in a dict if I have anything to store""" + # this seems weird to check, but it's what the C example does + if self.key is not None and self.body is not None: + dikt[self.key] = self + + def send(self, socket): + """Send key-value message to socket; any empty frames are sent as such.""" + key = b'' if self.key is None else self.key.encode() + seq_s = struct.pack('!l', self.sequence) + body = b'' if self.body is None else self.body.encode() + socket.send_multipart([ key, seq_s, body ]) + + @classmethod + def recv(cls, socket): + """Reads key-value message from socket, returns new kvmsg instance.""" + key, seq_s, body = socket.recv_multipart(zmq.NOBLOCK) + key = key.decode() if key else None + seq = struct.unpack('!l',seq_s)[0] + body = body.decode() if body else None + return cls(seq, key=key, body=body) + + def dump(self): + if self.body is None: + size = 0 + data='NULL' + else: + size = len(self.body) + data=repr(self.body) + print >> sys.stderr, "[seq:{seq}][key:{key}][size:{size}] {data}".format( + seq=self.sequence, + key=self.key, + size=size, + data=data, + ) diff --git a/net_components.py b/net_components.py index 7d494c2..eb4a77d 100644 --- a/net_components.py +++ b/net_components.py @@ -2,6 +2,10 @@ import zmq import asyncio import logging from .libs import umsgpack +from .libs import kvsimple +import time +import random + logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) @@ -14,7 +18,7 @@ class Session(): # init zmq context self.context = zmq.Context() self.socket = None - + self.msg = [] #self.listen.add_done_callback(self.close_success()) @@ -25,9 +29,11 @@ class Session(): def join(self): logger.info("joinning {}:{}".format(self.host, self.port)) try: - self.socket = self.context.socket(zmq.REQ) + self.socket = self.context.socket(zmq.DEALER) self.socket.connect("tcp://localhost:5555") self.listen = asyncio.ensure_future(self.listen()) + + self.send("XXX connected") return True except zmq.ZMQError: @@ -40,7 +46,7 @@ class Session(): def create(self): logger.info("Creating session") try: - self.socket = self.context.socket(zmq.REP) + self.socket = self.context.socket(zmq.ROUTER) self.socket.bind("tcp://*:5555") self.listen = asyncio.ensure_future(self.listen()) @@ -58,17 +64,20 @@ class Session(): # Ungly blender workaround to prevent blocking... await asyncio.sleep(0.016) try: - msg = self.socket.recv(zmq.NOBLOCK) - # self.msg.append(umsgpack.unpackb(message)) - print(msg) - logger.info(msg) + buffer = self.socket.recv(zmq.NOBLOCK) + + message = umsgpack.unpackb(buffer) + if message is not 0: + self.socket.send(umsgpack.packb(0)) + self.msg.append() except zmq.ZMQError: pass def send(self, msg): logger.info("Sending {} to {}:{} ".format(msg, self.host, self.port)) + self.msg.append(msg) bin = umsgpack.packb(msg) - self.socket.send(bin,zmq.NOBLOCK) + self.socket.send(bin) async def close_success(self): self.is_running = False @@ -79,3 +88,115 @@ class Session(): self.listen.cancel() del self.listen self.is_running = False + +class Client_poller(): + def __init__(self, id): + + self.id = id + self.listen = asyncio.ensure_future(self.listen()) + logger.info("client initiated {}".format(self.id)) + + async def listen(self): + context = zmq.Context() + logger.info("...context initiated {}".format(self.id)) + socket = context.socket(zmq.DEALER) + identity = self.id + socket.identity = identity.encode('ascii') + logger.info("...socket initiated {}".format(self.id)) + logger.info("client {} started".format(self.id)) + poll = zmq.Poller() + poll.register(socket, zmq.POLLIN) + + await asyncio.sleep(1) + + while True: + await asyncio.sleep(0.016) + sockets = dict(poll.poll(1)) + + if socket in sockets: + msg = socket.recv(zmq.NOBLOCK) + logger.info("{} received:{}".format(self.id, msg)) + + + def stop(self): + logger.info("Stopping client {}".format(self.id)) + self.listen.cancel() + +class Client(): + def __init__(self, context=None): + + if context is None: + logger.info("client init default context") + self.context = zmq.Context() + else: + self.context = context + + self.task = asyncio.ensure_future(self.run()) + logger.info("client initiated") + + async def run(self): + # Prepare our context and publisher socket + logger.info("configuring:") + updates = self.context.socket(zmq.SUB) + logger.info("..socket") + updates.linger = 0 + logger.info("..linger") + updates.setsockopt(zmq.SUBSCRIBE,b"10001") + logger.info("client launched") + updates.connect("tcp://localhost:5556") + + kvmap = {} + sequence = 0 + + while True: + await asyncio.sleep(0.016) + try: + kvmsg = kvsimple.KVMsg.recv(updates) + except: + break # Interrupted + kvmsg.store(kvmap) + sequence += 1 + + + def stop(self): + logger.info("Stopping client") + self.task.cancel() + + + + kvmap = {} + sequence = 0 + +class Server(): + def __init__(self): + self.context = zmq.Context() + self.task = asyncio.ensure_future(self.run()) + logger.info("server initiated ") + + async def run(self): + publisher = self.context.socket(zmq.PUB) + + publisher.bind("tcp://*:5556") + time.sleep(0.2) + logger.info("server launched") + + sequence = 0 + random.seed(time.time()) + kvmap = {} + + while True: + # Non blocking + await asyncio.sleep(0.016) + + # Distribute as key-value message + sequence += 1 + kvmsg = kvsimple.KVMsg(sequence) + kvmsg.key = "%d" % random.randint(1,10000) + kvmsg.body = "%d" % random.randint(1,1000000) + kvmsg.send(publisher) + kvmsg.store(kvmap) + + + def stop(self): + logger.info("Stopping server") + self.task.cancel() \ No newline at end of file diff --git a/net_operators.py b/net_operators.py index c5ccd44..7dbcf53 100644 --- a/net_operators.py +++ b/net_operators.py @@ -1,8 +1,13 @@ import bpy from . import net_components +import time session = None +client = None +server = None +context = None +# SESSION Operators class join(bpy.types.Operator): bl_idname = "session.join" @@ -81,18 +86,66 @@ class close(bpy.types.Operator): bpy.ops.asyncio.stop() return {"FINISHED"} +# CLIENT-SERVER + +class client_connect(bpy.types.Operator): + bl_idname = "client.connect" + bl_label = "connect" + bl_description = "connect to a net server" + bl_options = {"REGISTER"} + + @classmethod + def poll(cls, context): + return True + + def execute(self, context): + global client + + client = net_components.Client() + + time.sleep(1) + + bpy.ops.asyncio.loop() + + + return {"FINISHED"} + +class server_connect(bpy.types.Operator): + bl_idname = "server.run" + bl_label = "connect" + bl_description = "connect to a net server" + bl_options = {"REGISTER"} + + @classmethod + def poll(cls, context): + return True + + def execute(self, context): + global server + + server = net_components.Server() + + time.sleep(1) + + bpy.ops.asyncio.loop() + + + return {"FINISHED"} classes = ( join, create, close, send, + client_connect, + server_connect, ) def register(): global session - session = net_components.Session() + # session = net_components.Session() + from bpy.utils import register_class for cls in classes: diff --git a/net_ui.py b/net_ui.py index 287a4bc..c032c5b 100644 --- a/net_ui.py +++ b/net_ui.py @@ -48,7 +48,7 @@ class SessionPanel(bpy.types.Panel): classes = ( - SessionPanel, + # SessionPanel, ) diff --git a/server.py b/server.py new file mode 100644 index 0000000..f516fde --- /dev/null +++ b/server.py @@ -0,0 +1,43 @@ +""" +Clone server Model One + +""" +import time +import zmq + + +def main(): + # Prepare our context and publisher socket + ctx = zmq.Context() + + # Update all clients + publisher = ctx.socket(zmq.PUB) + publisher.bind("tcp://*:5555") + time.sleep(0.2) + + # Update receiver + state_request = ctx.socket(zmq.ROUTER) + state_request.bind("tcp://*:5556") + + # poller for socket aggregation + poller = zmq.Poller() + poller.register(state_request, zmq.POLLIN) + + + while True: + try: + socks = dict(poller.poll(1)) + except KeyboardInterrupt: + break + + if state_request in socks: + msg = state_request.recv_multipart() + print(msg[0].decode('ascii')) + print(msg[1].decode()) + publisher.send(b'Server update') + + # publisher.send_string('test') + # print('msg') + # time.sleep(1) +if __name__ == '__main__': + main()