diff --git a/mods/stream.py b/mods/stream.py index 052af0e..aec211a 100644 --- a/mods/stream.py +++ b/mods/stream.py @@ -8,6 +8,8 @@ import time peers = {} +PORT = 16386 + RETRANSMIT_RETRIES = 3 DATAGRAM_MAX_SIZE = 9000 RAW_DATA_MAX_SIZE = 8000 @@ -238,5 +240,56 @@ class UDPRequestHandler(socketserver.DatagramRequestHandler): except EOFError: del peers[peer_addr] +class ThreadingUDPServer(socketserver.ThreadingMixIn, socketserver.UDPServer): + pass + +udpserver = ThreadingUDPServer(('0.0.0.0', PORT), UDPRequestHandler) +udpserver_thread = threading.Thread(target=udpserver.serve_forever) +udpserver_thread.start() + class EncryptedUDPStream: - + MAX_IO_CHUNK = 8000 + def __init__(self, sock, peer_addr): + self.peer_addr = peer_addr + self.sock = sock + @classmethod + def _connect(cls, host, port): + peers[(host, port)] = Peer(udpserver.socket, (host, port)) + peers[(host, port)].hello() + return udpserver.socket + @classmethod + def connect(cls, host, port, **kwargs): + return cls(cls._connect(host, port), (host, port)) + def poll(self, timeout): + timeout = Timeout(timeout) + while timeout.timeleft(): + try: + rl = peers[self.peer_addr].poll() + if rl: break + except: + raise EOFError + return rl + def close(self): + if self.peer_addr in peers: del peers[self.peer_addr] + @property + def closed(self): + return self.peer_addr not in peers + def fileno(self): + try: + return self.sock.fileno() + except: + self.close() + raise EOFError + def read(self, count): + try: + buf = peers[self.peer_addr].get_next_block() + except: + raise EOFError + return buf + def write(self, data): + try: + peers[self.peer_addr].send_packet(data) + except: + raise EOFError + +