black-mamba/mods/rpyc_utcp.py

74 lines
3.0 KiB
Python

import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.core import SocketStream, Channel
from rpyc.core.stream import retry_errnos
from rpyc.utils.factory import connect_channel
from rpyc.lib import Timeout
from rpyc.lib.compat import select_error
from rpyc.lib.compat import get_exc_errno, BYTES_LITERAL
from . import utcp
import sys
import errno
import socket
class UTCPSocketStream(SocketStream):
MAX_IO_CHUNK = utcp.DATA_LENGTH
@classmethod
def utcp_connect(cls, host, port, *a, **kw):
sock = utcp.TCP(encrypted=True)
sock.connect((host, port))
return cls(sock)
def poll(self, timeout):
timeout = Timeout(timeout)
try:
while True:
try:
rl = self.sock.poll(timeout.timeleft())
except select_error:
ex = sys.exc_info()[1]
if ex.args[0] == errno.EINTR:
continue
else:
raise
else:
break
except ValueError:
ex = sys.exc_info()[1]
raise select_error(str(ex))
return rl
def connect_stream(stream, service=rpyc.VoidService, config={}):
return connect_channel(Channel(stream), service=service, config=config)
def utcp_connect(host, port, service=rpyc.VoidService, config={}, **kw):
s = UTCPSocketStream.utcp_connect(host, port, **kw)
return connect_stream(s, service, config)
class UTCPThreadedServer(ThreadedServer):
def __init__(self, service, hostname = '', ipv6 = False, port = 0,
backlog = 1, reuse_addr = True, authenticator = None, registrar = None,
auto_register = None, protocol_config = {}, logger = None, listener_timeout = 0.5,
socket_path = None):
backlog = 1
ThreadedServer.__init__(self, service, hostname=hostname, ipv6=ipv6, port=port,
backlog=backlog, reuse_addr=reuse_addr, authenticator=authenticator, registrar=registrar,
auto_register=auto_register, protocol_config=protocol_config, logger=logger, listener_timeout=listener_timeout,
socket_path=socket_path)
self.listener.close()
self.listener = None
##########
self.listener = utcp.TCP(encrypted=True)
self.listener.bind((hostname, port))
sockname = self.listener.getsockname()
self.host, self.port = sockname[0], sockname[1]
def _serve_client(self, sock, credentials):
addrinfo = sock.getpeername()
if credentials:
self.logger.info("welcome %s (%r)", addrinfo, credentials)
else:
self.logger.info("welcome %s", addrinfo)
try:
config = dict(self.protocol_config, credentials = credentials,
endpoints = (sock.getsockname(), addrinfo), logger = self.logger)
conn = self.service._connect(Channel(UTCPSocketStream(sock)), config)
self._handle_connection(conn)
finally:
self.logger.info("goodbye %s", addrinfo)