Часть транспорта

utcp_objects
Роман Бородин 2019-04-04 20:38:48 +03:00
parent be77eeb6c1
commit 4eebb0cb84
1 changed files with 181 additions and 0 deletions

181
mods/stream.py 100644
View File

@ -0,0 +1,181 @@
import socketserver
import pickle
import simplecrypto
from uuid import uuid4
from datetime import datetime, timedelta
import threading
import time
from rpyc.lib import safe_import, Timeout
peers = {}
RETRANSMIT_RETRIES = 3
DATAGRAM_MAX_SIZE = 9000
PACKET_NUM_SEQ_TTL = 300
SOCK_SEND_TIMEOUT = 60
PACKET_TYPE_HELLO = 0x00
PACKET_TYPE_PEER_PUB_KEY_REQUEST = 0x01
PACKET_TYPE_PEER_PUB_KEY_REPLY = 0x02
PACKET_TYPE_PEER_NEW_PUB_KEY = 0x03
PACKET_TYPE_PACKET = 0xa0
PACKET_TYPE_CONFIRM_RECV = 0xa1
PACKET_TYPE_GOODBUY = 0xff
class InvalidPacket(Exception): pass
def pickle_data(data):
return pickle.dumps(data, protocol=4)
# From rpyc.lib
class Timeout:
def __init__(self, timeout):
if isinstance(timeout, Timeout):
self.finite = timeout.finite
self.tmax = timeout.tmax
else:
self.finite = timeout is not None and timeout >= 0
self.tmax = time.time()+timeout if self.finite else None
def expired(self):
return self.finite and time.time() >= self.tmax
def timeleft(self):
return max((0, self.tmax - time.time())) if self.finite else None
def sleep(self, interval):
time.sleep(min(interval, self.timeleft()) if self.finite else interval)
class Packet:
def __init__(self, packet_payload):
try:
d = pickle.loads(packet_payload)
self.sid = d['sid']
self.type = d['type']
self.reset_timestamp = d['reset_timestamp']
self.num = d['num']
self.data = d['data']
except:
raise InvalidPacket
class Peer:
def __init__(self, sock, endpoint):
self.sid = None
self.sock = sock
self.endpoint = endpoint
self.my_key = None
self.peer_pub_key = None
self.buf = []
self.confirm_wait_packet = None
self.last_packet = None
self.request_lock = threading.Lock()
self.num_seq_ttl = timedelta(seconds=PACKET_NUM_SEQ_TTL)
self.last_sent_packet_num = -1
self.last_sent_packet_num_reset_time = datetime.utcnow()
self.last_received_packet_num = -1
self.last_received_packet_num_reset_time = None
self.retransmit_count = 0
def next_packet_num(self):
new_time = datetime.utcnow()
if (new_time - self.last_sent_packet_num_reset_time) >= self.num_seq_ttl:
self.last_sent_packet_num = -1
self.last_sent_packet_num += 1
return self.last_sent_packet_num
def poll(self):
return bool(len(self.buf))
def get_next_block(self):
if not len(self.buf):
return None
return self.buf.pop()
def put_block(self, data):
self.buf.insert(0, data)
def send(self, d, encrypted=False, confirm=False):
if 'sid' not in d: d['sid'] = self.sid
if 'num' not in d: d['num'] = None
if 'reset_timestamp' not in d: d['reset_timestamp'] = None
if 'data' not in d: d['data'] = b''
if encrypted: d['data'] = self.peer_pub_key.encrypt_raw(d['data'])
data = pickle_data(d)
if confirm:
self.last_packet = data
self.confirm_wait_packet = (d['reset_timestamp'], d['num'])
self.sock.sendto(pickle_data(d), self.endpoint)
def mark_packet(self, d):
d['num'] = self.next_packet_num()
d['reset_timestamp'] = self.last_sent_packet_num_reset_time
return d
def retransmit(self):
pass
def reply_my_pub_key(self, packet):
try:
self.peer_pub_key = simplecrypto.RsaPublicKey(packet.data)
except:
raise EOFError
self.my_key = simplecrypto.RsaKeypair()
d = {
'type': PACKET_TYPE_PEER_PUB_KEY_REPLY,
'data': self.my_key.publickey.serialize()
}
self.send(d, encrypted=True)
def request_peer_bub_key(self, packet):
self.sid = packet.sid
self.my_key = simplecrypto.RsaKeypair()
d = {
'type': PACKET_TYPE_PEER_PUB_KEY_REQUEST,
'data': self.my_key.publickey.serialize()
}
self.send(d)
def confirm_packet_recv(self, packet):
self.confirm_wait_packet = None
self.last_packet = None
d = {
'type': PACKET_TYPE_CONFIRM_RECV,
'num': packet.num,
'reset_timestamp': packet.reset_timestamp
}
self.send(d)
def hello(self):
self.sid = uuid4().hex
d = {
'type': PACKET_TYPE_HELLO,
'reset_timestamp': None,
'num': None
}
self.sock.sendto(pickle_data(d))
def recv_packet(self, packet_payload):
with self.request_lock:
try:
packet = Packet(packet_payload)
if packet.type == PACKET_TYPE_GOODBUY:
raise EOFError
except:
raise EOFError
if packet.type != PACKET_TYPE_HELLO and (not self.sid or self.sid != packet.sid):
self.hello()
return
############################################
if not self.peer_pub_key:
if packet.type == PACKET_TYPE_PEER_PUB_KEY_REPLY:
try:
self.peer_pub_key = simplecrypto.RsaPublicKey(self.my_key.decrypt_raw(packet.data))
return
except:
raise EOFError
elif packet.type == PACKET_TYPE_PEER_PUB_KEY_REQUEST:
self.reply_my_pub_key(packet)
return
elif packet.type == PACKET_TYPE_HELLO:
self.request_peer_bub_key(packet)
return
############################################
if self.confirm_wait_packet:
if (packet.reset_timestamp, packet.num) == self.confirm_wait_packet:
self.confirm_packet_recv(packet)
else:
pass
class UDPRequestHandler(socketserver.DatagramRequestHandler):
def handle(self):
datagram = self.rfile.read(BUFSIZE)
if self.client_address not in peers: