diff --git a/eudp/rpyc.py b/eudp/rpyc.py new file mode 100644 index 0000000..2ead874 --- /dev/null +++ b/eudp/rpyc.py @@ -0,0 +1,65 @@ +import rpyc +from rpyc.utils.server import ThreadedServer +from rpyc.core import SocketStream, Channel +from rpyc.utils.factory import connect_stream +from rpyc.lib import Timeout +import eudp +import sys + +class EUDPSocketStream(SocketStream): + MAX_IO_CHUNK = eudp.DATA_LENGTH + @classmethod + def eudp_connect(cls, host, port, *a, **kw): + sock = eudp.EUDP(encrypted=True) + sock.connect((host, port)) + return cls(sock) + def poll(self, timeout): + timeout = Timeout(timeout) + return self.sock.poll(timeout.timeleft()) + + def read(self, count): + try: + return self.sock.recv(count) + except EOFError: + self.close() + raise EOFError + def write(self, data): + try: + self.sock.send(data) + except EOFError: + ex = sys.exc_info()[1] + self.close() + raise EOFError(ex) +def eudp_connect(host, port, service=rpyc.VoidService, config={}, **kw): + s = EUDPSocketStream.eudp_connect(host, port, **kw) + return connect_stream(s, service, config) +class EUDPThreadedServer(ThreadedServer): + def __init__(self, service, hostname = '', ipv6 = False, port = 0, + backlog = 1, reuse_addr = True, authenticator = None, registrar = None, + auto_register = None, protocol_config = {}, logger = None, listener_timeout = 0.5, + socket_path = None): + ThreadedServer.__init__(self, service, hostname=hostname, ipv6=ipv6, port=port, + backlog=backlog, reuse_addr=reuse_addr, authenticator=authenticator, registrar=registrar, + auto_register=auto_register, protocol_config=protocol_config, logger=logger, listener_timeout=listener_timeout, + socket_path=socket_path) + self.listener.close() + self.listener = None + ########## + self.listener = eudp.EUDP(encrypted=True) + self.listener.bind((hostname, port)) + sockname = self.listener.getsockname() + self.host, self.port = sockname[0], sockname[1] + def _serve_client(self, sock, credentials): + addrinfo = sock.getpeername() + if credentials: + self.logger.info("welcome %s (%r)", addrinfo, credentials) + else: + self.logger.info("welcome %s", addrinfo) + try: + config = dict(self.protocol_config, credentials = credentials, + endpoints = (sock.getsockname(), addrinfo), logger = self.logger) + conn = self.service._connect(Channel(EUDPSocketStream(sock)), config) + self._handle_connection(conn) + finally: + self.logger.info("goodbye %s", addrinfo) +