From 4eebb0cb845bf172da45f163e7bdbb029a344f38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A0=D0=BE=D0=BC=D0=B0=D0=BD=20=D0=91=D0=BE=D1=80=D0=BE?= =?UTF-8?q?=D0=B4=D0=B8=D0=BD?= Date: Thu, 4 Apr 2019 20:38:48 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A7=D0=B0=D1=81=D1=82=D1=8C=20=D1=82=D1=80?= =?UTF-8?q?=D0=B0=D0=BD=D1=81=D0=BF=D0=BE=D1=80=D1=82=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mods/stream.py | 181 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 mods/stream.py diff --git a/mods/stream.py b/mods/stream.py new file mode 100644 index 0000000..5818bb8 --- /dev/null +++ b/mods/stream.py @@ -0,0 +1,181 @@ +import socketserver +import pickle +import simplecrypto +from uuid import uuid4 +from datetime import datetime, timedelta +import threading +import time + +from rpyc.lib import safe_import, Timeout + +peers = {} + +RETRANSMIT_RETRIES = 3 +DATAGRAM_MAX_SIZE = 9000 +PACKET_NUM_SEQ_TTL = 300 + +SOCK_SEND_TIMEOUT = 60 + +PACKET_TYPE_HELLO = 0x00 +PACKET_TYPE_PEER_PUB_KEY_REQUEST = 0x01 +PACKET_TYPE_PEER_PUB_KEY_REPLY = 0x02 +PACKET_TYPE_PEER_NEW_PUB_KEY = 0x03 +PACKET_TYPE_PACKET = 0xa0 +PACKET_TYPE_CONFIRM_RECV = 0xa1 +PACKET_TYPE_GOODBUY = 0xff + +class InvalidPacket(Exception): pass + +def pickle_data(data): + return pickle.dumps(data, protocol=4) + +# From rpyc.lib +class Timeout: + def __init__(self, timeout): + if isinstance(timeout, Timeout): + self.finite = timeout.finite + self.tmax = timeout.tmax + else: + self.finite = timeout is not None and timeout >= 0 + self.tmax = time.time()+timeout if self.finite else None + def expired(self): + return self.finite and time.time() >= self.tmax + def timeleft(self): + return max((0, self.tmax - time.time())) if self.finite else None + def sleep(self, interval): + time.sleep(min(interval, self.timeleft()) if self.finite else interval) + +class Packet: + def __init__(self, packet_payload): + try: + d = pickle.loads(packet_payload) + self.sid = d['sid'] + self.type = d['type'] + self.reset_timestamp = d['reset_timestamp'] + self.num = d['num'] + self.data = d['data'] + except: + raise InvalidPacket + +class Peer: + def __init__(self, sock, endpoint): + self.sid = None + self.sock = sock + self.endpoint = endpoint + self.my_key = None + self.peer_pub_key = None + self.buf = [] + self.confirm_wait_packet = None + self.last_packet = None + self.request_lock = threading.Lock() + self.num_seq_ttl = timedelta(seconds=PACKET_NUM_SEQ_TTL) + self.last_sent_packet_num = -1 + self.last_sent_packet_num_reset_time = datetime.utcnow() + self.last_received_packet_num = -1 + self.last_received_packet_num_reset_time = None + self.retransmit_count = 0 + def next_packet_num(self): + new_time = datetime.utcnow() + if (new_time - self.last_sent_packet_num_reset_time) >= self.num_seq_ttl: + self.last_sent_packet_num = -1 + self.last_sent_packet_num += 1 + return self.last_sent_packet_num + def poll(self): + return bool(len(self.buf)) + def get_next_block(self): + if not len(self.buf): + return None + return self.buf.pop() + def put_block(self, data): + self.buf.insert(0, data) + def send(self, d, encrypted=False, confirm=False): + if 'sid' not in d: d['sid'] = self.sid + if 'num' not in d: d['num'] = None + if 'reset_timestamp' not in d: d['reset_timestamp'] = None + if 'data' not in d: d['data'] = b'' + if encrypted: d['data'] = self.peer_pub_key.encrypt_raw(d['data']) + data = pickle_data(d) + if confirm: + self.last_packet = data + self.confirm_wait_packet = (d['reset_timestamp'], d['num']) + self.sock.sendto(pickle_data(d), self.endpoint) + def mark_packet(self, d): + d['num'] = self.next_packet_num() + d['reset_timestamp'] = self.last_sent_packet_num_reset_time + return d + def retransmit(self): + pass + def reply_my_pub_key(self, packet): + try: + self.peer_pub_key = simplecrypto.RsaPublicKey(packet.data) + except: + raise EOFError + self.my_key = simplecrypto.RsaKeypair() + d = { + 'type': PACKET_TYPE_PEER_PUB_KEY_REPLY, + 'data': self.my_key.publickey.serialize() + } + self.send(d, encrypted=True) + def request_peer_bub_key(self, packet): + self.sid = packet.sid + self.my_key = simplecrypto.RsaKeypair() + d = { + 'type': PACKET_TYPE_PEER_PUB_KEY_REQUEST, + 'data': self.my_key.publickey.serialize() + } + self.send(d) + def confirm_packet_recv(self, packet): + self.confirm_wait_packet = None + self.last_packet = None + d = { + 'type': PACKET_TYPE_CONFIRM_RECV, + 'num': packet.num, + 'reset_timestamp': packet.reset_timestamp + } + self.send(d) + def hello(self): + self.sid = uuid4().hex + d = { + 'type': PACKET_TYPE_HELLO, + 'reset_timestamp': None, + 'num': None + } + self.sock.sendto(pickle_data(d)) + def recv_packet(self, packet_payload): + with self.request_lock: + try: + packet = Packet(packet_payload) + if packet.type == PACKET_TYPE_GOODBUY: + raise EOFError + except: + raise EOFError + if packet.type != PACKET_TYPE_HELLO and (not self.sid or self.sid != packet.sid): + self.hello() + return + ############################################ + if not self.peer_pub_key: + if packet.type == PACKET_TYPE_PEER_PUB_KEY_REPLY: + try: + self.peer_pub_key = simplecrypto.RsaPublicKey(self.my_key.decrypt_raw(packet.data)) + return + except: + raise EOFError + elif packet.type == PACKET_TYPE_PEER_PUB_KEY_REQUEST: + self.reply_my_pub_key(packet) + return + elif packet.type == PACKET_TYPE_HELLO: + self.request_peer_bub_key(packet) + return + ############################################ + if self.confirm_wait_packet: + if (packet.reset_timestamp, packet.num) == self.confirm_wait_packet: + self.confirm_packet_recv(packet) + else: + pass + + + +class UDPRequestHandler(socketserver.DatagramRequestHandler): + def handle(self): + datagram = self.rfile.read(BUFSIZE) + if self.client_address not in peers: