multi-user/client.py

439 lines
13 KiB
Python
Raw Normal View History

2019-04-10 17:01:21 +02:00
import binascii
2019-03-25 14:56:09 +01:00
import collections
import logging
import os
2019-04-10 11:21:10 +02:00
import sys
2019-04-10 17:01:21 +02:00
import threading
2019-02-11 15:48:07 +01:00
import time
2019-03-15 17:37:02 +01:00
from enum import Enum
2019-04-10 17:01:21 +02:00
from random import randint
2019-04-11 18:07:59 +02:00
import copy
import queue
lock = threading.Lock()
try:
2019-04-10 11:21:10 +02:00
from .libs import umsgpack
from .libs import zmq
2019-04-10 17:01:21 +02:00
from .libs import dump_anything
from . import helpers
from . import message
except:
2019-04-10 11:21:10 +02:00
# Server import
from libs import umsgpack
from libs import zmq
2019-04-10 17:01:21 +02:00
from libs import dump_anything
import helpers
import message
2019-04-10 11:21:10 +02:00
logger = logging.getLogger(__name__)
2019-04-17 14:22:56 +02:00
logging.basicConfig(level=logging.INFO)
2019-03-25 14:56:09 +01:00
CONNECT_TIMEOUT = 2
2019-03-15 16:50:59 +01:00
WAITING_TIME = 0.001
2019-04-08 12:53:14 +02:00
SERVER_MAX = 1
2019-02-22 16:46:35 +01:00
2019-04-08 17:01:02 +02:00
stop = False
2019-04-10 17:01:21 +02:00
2019-04-08 12:53:14 +02:00
def zpipe(ctx):
"""build inproc pipe for talking to threads
2019-03-25 14:56:09 +01:00
2019-04-08 12:53:14 +02:00
mimic pipe used in czmq zthread_fork.
Returns a pair of PAIRs connected via inproc
"""
a = ctx.socket(zmq.PAIR)
b = ctx.socket(zmq.PAIR)
a.linger = b.linger = 0
a.hwm = b.hwm = 1
iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
a.bind(iface)
b.connect(iface)
return a, b
2019-04-08 12:53:14 +02:00
class State(Enum):
INITIAL = 1
SYNCING = 2
ACTIVE = 3
2019-03-15 17:37:02 +01:00
class RCFClient(object):
ctx = None
pipe = None
agent = None
def __init__(self):
self.ctx = zmq.Context()
self.pipe, peer = zpipe(self.ctx)
2019-04-11 18:07:59 +02:00
self.queue = queue.Queue()
self.agent = threading.Thread(
2019-04-18 15:05:48 +02:00
target=rcf_client_agent, args=(self.ctx, peer, self.queue), name="net-agent")
self.agent.daemon = True
self.agent.start()
2019-04-18 15:05:48 +02:00
2019-04-10 18:26:39 +02:00
def connect(self, id, address, port):
self.pipe.send_multipart([b"CONNECT", (id.encode() if isinstance(
id, str) else id), (address.encode() if isinstance(
2019-04-18 15:05:48 +02:00
address, str) else address), b'%d' % port])
def set(self, key, value=None):
"""Set new value in distributed hash table
2019-04-08 18:21:48 +02:00
Sends [SET][key][value] to the agent
"""
2019-04-10 17:01:21 +02:00
self.pipe.send_multipart(
2019-04-18 15:05:48 +02:00
[b"SET", umsgpack.packb(key), (umsgpack.packb(value) if value else umsgpack.packb('None'))])
2019-04-08 12:53:14 +02:00
def add(self, key, value=None):
"""Set new value in distributed hash table
Sends [SET][key][value] to the agent
"""
self.pipe.send_multipart(
[b"ADD", umsgpack.packb(key), (umsgpack.packb(value) if value else umsgpack.packb('None'))])
2019-04-08 18:21:48 +02:00
def get(self, key):
"""Lookup value in distributed hash table
Sends [GET][key] to the agent and waits for a value response
If there is no clone available, will eventually return None.
"""
self.pipe.send_multipart([b"GET", umsgpack.packb(key)])
try:
reply = self.pipe.recv_multipart()
except KeyboardInterrupt:
return
else:
return umsgpack.unpackb(reply[0])
2019-04-08 17:01:02 +02:00
def exit(self):
if self.agent.is_alive():
global stop
stop = True
2019-03-25 14:56:09 +01:00
2019-04-11 14:39:31 +02:00
def list(self):
self.pipe.send_multipart([b"LIST"])
try:
reply = self.pipe.recv_multipart()
except KeyboardInterrupt:
return
else:
return umsgpack.unpackb(reply[0])
2019-04-10 17:01:21 +02:00
def state(self):
self.pipe.send_multipart([b"STATE"])
try:
reply = self.pipe.recv_multipart()
except KeyboardInterrupt:
return
else:
return umsgpack.unpackb(reply[0])
2019-04-08 12:53:14 +02:00
class RCFServer(object):
address = None # Server address
port = None # Server port
snapshot = None # Snapshot socket
subscriber = None # Incoming updates
2019-04-10 17:01:21 +02:00
def __init__(self, ctx, address, port, id):
2019-04-08 12:53:14 +02:00
self.address = address
self.port = port
self.snapshot = ctx.socket(zmq.DEALER)
self.snapshot.linger = 0
self.snapshot.setsockopt(zmq.IDENTITY, id)
self.snapshot.connect("tcp://{}:{}".format(address.decode(), port))
2019-04-08 12:53:14 +02:00
self.subscriber = ctx.socket(zmq.SUB)
self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
self.subscriber.connect("tcp://{}:{}".format(address.decode(), port+1))
2019-04-08 12:53:14 +02:00
self.subscriber.linger = 0
print("connected on tcp://{}:{}".format(address.decode(), port))
2019-04-10 17:01:21 +02:00
2019-04-08 12:53:14 +02:00
class RCFClientAgent(object):
ctx = None
2019-04-08 12:53:14 +02:00
pipe = None
property_map = None
publisher = None
id = None
state = State.INITIAL
server = None
serial = None
serialisation_agent = None
2019-04-08 12:53:14 +02:00
def __init__(self, ctx, pipe):
self.ctx = ctx
self.pipe = pipe
2019-04-08 18:21:48 +02:00
self.property_map = {}
self.id = b"test"
2019-04-08 12:53:14 +02:00
self.state = State.INITIAL
self.admin = False
2019-04-08 12:53:14 +02:00
self.server = None
self.publisher = self.ctx.socket(zmq.PUSH) # push update socket
2019-04-08 12:53:14 +02:00
self.publisher.setsockopt(zmq.IDENTITY, self.id)
self.publisher.setsockopt(zmq.SNDHWM, 60)
self.publisher.linger = 0
self.serial, peer = zpipe(self.ctx)
2019-04-22 15:58:32 +02:00
# self.serial_agent = threading.Thread(
# target=serialization_agent, args=(self.ctx, peer), name="serial-agent")
# self.serial_agent.daemon = True
# self.serial_agent.start()
2019-04-08 12:53:14 +02:00
def control_message(self):
2019-04-08 12:53:14 +02:00
msg = self.pipe.recv_multipart()
command = msg.pop(0)
if command == b"CONNECT":
2019-04-10 18:26:39 +02:00
self.id = msg.pop(0)
2019-04-08 12:53:14 +02:00
address = msg.pop(0)
port = int(msg.pop(0))
if self.server is None:
if address == '127.0.0.1':
self.admin = True
self.server = RCFServer(self.ctx, address, port, self.id)
2019-04-10 17:01:21 +02:00
self.publisher.connect(
"tcp://{}:{}".format(address.decode(), port+2))
2019-04-08 12:53:14 +02:00
else:
logger.error("E: too many servers (max. %i)", SERVER_MAX)
2019-04-10 17:01:21 +02:00
elif command == b"SET":
2019-04-10 18:01:55 +02:00
key = umsgpack.unpackb(msg[0])
value = umsgpack.unpackb(msg[1])
if key in self.property_map.keys():
if self.property_map[key].id == self.id:
if value == 'None':
value = helpers.dump(key)
if value:
rcfmsg = message.RCFMessage(
2019-04-22 15:58:32 +02:00
key=key, id=self.id, mtype="", body=value)
rcfmsg.store(self.property_map)
rcfmsg.send(self.publisher)
else:
logger.error("Fail to dump ")
else:
helpers.load(key,self.property_map[key].body)
elif command == b"ADD":
key = umsgpack.unpackb(msg[0])
value = umsgpack.unpackb(msg[1])
2019-04-18 15:05:48 +02:00
if value == 'None':
# try to dump from bpy
value = helpers.dump(key)
2019-04-10 18:01:55 +02:00
if value:
2019-04-18 15:05:48 +02:00
rcfmsg = message.RCFMessage(
2019-04-22 15:58:32 +02:00
key=key, id=self.id, mtype="", body=value)
2019-04-18 15:05:48 +02:00
2019-04-10 18:01:55 +02:00
rcfmsg.store(self.property_map)
rcfmsg.send(self.publisher)
else:
logger.error("Fail to dump ")
2019-04-10 17:01:21 +02:00
2019-04-08 18:21:48 +02:00
elif command == b"GET":
value = []
2019-04-08 18:21:48 +02:00
key = umsgpack.unpackb(msg[0])
for k in self.property_map.keys():
if key in k:
2019-04-18 15:05:48 +02:00
value.append([k, self.property_map.get(k).body])
# value = [self.property_map.get(key) for key in keys]
# value = self.property_map.get(key)
2019-04-18 15:05:48 +02:00
self.pipe.send(umsgpack.packb(value)
if value else umsgpack.packb(''))
2019-04-11 14:39:31 +02:00
elif command == b"LIST":
2019-04-22 12:14:39 +02:00
dump_list = []
for k,v in self.property_map.items():
dump_list.append([k,v.id])
self.pipe.send(umsgpack.packb(dump_list)
if dump_list else umsgpack.packb(''))
2019-04-10 17:01:21 +02:00
elif command == b"STATE":
self.pipe.send(umsgpack.packb(self.state.value))
2019-04-18 15:05:48 +02:00
def rcf_client_agent(ctx, pipe, queue):
agent = RCFClientAgent(ctx, pipe)
2019-04-08 12:53:14 +02:00
server = None
2019-04-11 18:07:59 +02:00
update_queue = queue
2019-04-08 17:01:02 +02:00
global stop
2019-04-10 17:01:21 +02:00
while True:
2019-04-08 17:01:02 +02:00
if stop:
break
# logger.info("asdasd")
2019-04-08 12:53:14 +02:00
poller = zmq.Poller()
poller.register(agent.pipe, zmq.POLLIN)
server_socket = None
if agent.state == State.INITIAL:
server = agent.server
if agent.server:
2019-04-10 18:26:39 +02:00
logger.info("%s: waiting for server at %s:%d...",
2019-04-18 15:05:48 +02:00
agent.id.decode(), server.address, server.port)
2019-04-08 12:53:14 +02:00
server.snapshot.send(b"SNAPSHOT_REQUEST")
agent.state = State.SYNCING
server_socket = server.snapshot
2019-04-08 12:53:14 +02:00
elif agent.state == State.SYNCING:
server_socket = server.snapshot
2019-04-08 12:53:14 +02:00
elif agent.state == State.ACTIVE:
server_socket = server.subscriber
if server_socket:
poller.register(server_socket, zmq.POLLIN)
try:
2019-04-08 17:01:02 +02:00
items = dict(poller.poll(1))
2019-04-08 12:53:14 +02:00
except:
2019-04-10 17:01:21 +02:00
raise
break
2019-04-08 12:53:14 +02:00
if agent.pipe in items:
agent.control_message()
elif server_socket in items:
2019-04-10 17:01:21 +02:00
rcfmsg = message.RCFMessage.recv(server_socket)
2019-04-18 15:05:48 +02:00
2019-04-08 12:53:14 +02:00
if agent.state == State.SYNCING:
# Store snapshot
if rcfmsg.key == "SNAPSHOT_END":
client_key = "Client/{}".format(agent.id.decode())
client_dict = {}
2019-04-17 15:48:20 +02:00
with lock:
client_dict = helpers.init_client(key=client_key)
2019-04-18 15:05:48 +02:00
client_store = message.RCFMessage(
key=client_key, id=agent.id, body=client_dict)
logger.info(client_store)
client_store.store(agent.property_map, True)
client_store.send(agent.publisher)
logger.info("snapshot complete")
2019-04-08 12:53:14 +02:00
agent.state = State.ACTIVE
else:
2019-04-18 15:05:48 +02:00
helpers.load(rcfmsg.key, rcfmsg.body)
2019-04-08 12:53:14 +02:00
rcfmsg.store(agent.property_map)
logger.info("snapshot from {} stored".format(rcfmsg.id))
2019-04-08 12:53:14 +02:00
elif agent.state == State.ACTIVE:
if rcfmsg.id != agent.id:
2019-04-11 18:27:17 +02:00
# update_queue.put((rcfmsg.key,rcfmsg.body))
with lock:
2019-04-18 15:05:48 +02:00
helpers.load(rcfmsg.key, rcfmsg.body)
2019-04-08 12:53:14 +02:00
rcfmsg.store(agent.property_map)
# elif rcfmsg.id == agent.property_map[rcfmsg.key].id:
# with lock:
# helpers.load(rcfmsg.key, rcfmsg.body)
# logger.info("load")
# agent.serial.send_multipart([b"LOAD", umsgpack.packb(rcfmsg.key), umsgpack.packb(rcfmsg.body)])
# reply = agent.serial.recv_multipart()
# if reply == b"DONE":
# rcfmsg.store(agent.property_map)
# action = "update" if rcfmsg.body else "delete"
# logging.info("{}: received from {}:{},{} {}".format(rcfmsg.key,
# server.address, rcfmsg.id, server.port, action))
else:
2019-04-17 14:22:56 +02:00
logger.debug("{} nothing to do".format(agent.id))
2019-04-08 17:01:02 +02:00
logger.info("exit thread")
stop = False
2019-04-10 17:01:21 +02:00
# else: else
# agent.state = State.INITIAL
2019-04-08 12:53:14 +02:00
2019-04-10 17:01:21 +02:00
class SerializationAgent(object):
ctx = None
pipe = None
def __init__(self, ctx, pipe):
2019-04-10 17:01:21 +02:00
self.ctx = ctx
self.pipe = pipe
logger.info("serialisation service launched")
2019-04-10 17:01:21 +02:00
def control_message(self):
msg = self.pipe.recv_multipart()
2019-04-10 17:01:21 +02:00
command = msg.pop(0)
if command == b"DUMP":
key = umsgpack.unpackb(msg[0])
2019-04-11 18:07:59 +02:00
value = helpers.dump(key)
self.pipe.send_multipart(umsgpack.packb(value))
2019-04-10 17:01:21 +02:00
elif command == b"LOAD":
2019-04-18 15:05:48 +02:00
2019-04-11 18:07:59 +02:00
key = umsgpack.unpackb(msg[0])
value = umsgpack.unpackb(msg[1])
2019-04-18 15:05:48 +02:00
helpers.load(key, value)
2019-04-11 18:07:59 +02:00
self.pipe.send_multipart([b"DONE"])
2019-04-10 17:01:21 +02:00
def serialization_agent(ctx, pipe):
agent = SerializationAgent(ctx, pipe)
2019-04-10 17:01:21 +02:00
global stop
while True:
if stop:
break
poller = zmq.Poller()
poller.register(agent.pipe, zmq.POLLIN)
try:
items = dict(poller.poll(1))
except:
raise
break
if agent.pipe in items:
agent.control_message()
2019-04-19 15:46:54 +02:00
class SyncAgent(object):
ctx = None
pipe = None
def __init__(self, ctx, pipe):
self.ctx = ctx
self.pipe = pipe
logger.info("serialisation service launched")
def control_message(self):
msg = self.pipe.recv_multipart()
command = msg.pop(0)
pass
2019-04-22 12:14:39 +02:00
2019-04-19 15:46:54 +02:00
def sync_agent(ctx, pipe):
agent = SyncAgent(ctx, pipe)
global stop
while True:
if stop:
break
poller = zmq.Poller()
poller.register(agent.pipe, zmq.POLLIN)
try:
items = dict(poller.poll(1))
except:
raise
break
if agent.pipe in items:
agent.control_message()
# Synchronisation
2019-04-22 12:14:39 +02:00