siagnal peer init
parent
ed57cbcb57
commit
9cdc27c1f3
|
@ -0,0 +1,5 @@
|
||||||
|
import os
|
||||||
|
app_dir = os.path.join(os.environ['HOME'], '.black-mamba')
|
||||||
|
signalpeers_db = os.path.join(app_dir, 'signalpeers.sqlite')
|
||||||
|
downloads_db = os.path.join(app_dir, 'downloads.sqlite')
|
||||||
|
sharedfiles_db = os.path.join(app_dir, 'sharedfiles.sqlite')
|
|
@ -0,0 +1,15 @@
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
def store_signalpeer(conn, signalpeerkvstore):
|
||||||
|
peer_addr = (conn._channel.stream.sock.getpeername()[0], conn.root.peer_port)
|
||||||
|
signalpeer_key = f'{peer_addr[0]:{peer_addr[1]}}'
|
||||||
|
if signalpeer_key not in signalpeerkvstore:
|
||||||
|
signalpeerkvstore[signalpeer_key] = {'addr': peer_addr, 'seen': datetime.utcnow(), 'cname': conn.root.peer_cname}
|
||||||
|
else:
|
||||||
|
signalpeerkvstore[signalpeer_key]['seen'] = datetime.utcnow()
|
||||||
|
if signalpeerkvstore[signalpeer_key]['cname'] != conn.root.peer_cname:
|
||||||
|
signalpeerkvstore[signalpeer_key]['cname'] = conn.root.peer_cname
|
||||||
|
def merge_signalpeers(signalpeers, signalpeerkvstore):
|
||||||
|
new_keys = list(set(signalpeers.keys()) - set(signalpeerkvstore.keys()))
|
||||||
|
for k in new_keys:
|
||||||
|
signalpeerkvstore[k] = signalpeers[k]
|
|
@ -0,0 +1 @@
|
||||||
|
import rpyc
|
|
@ -1,67 +0,0 @@
|
||||||
import rpyc
|
|
||||||
from rpyc.utils.server import ThreadedServer
|
|
||||||
from rpyc.core import SocketStream, Channel
|
|
||||||
from rpyc.utils.factory import connect_stream
|
|
||||||
from rpyc.lib import Timeout
|
|
||||||
from . import utcp
|
|
||||||
import sys
|
|
||||||
|
|
||||||
class UTCPSocketStream(SocketStream):
|
|
||||||
MAX_IO_CHUNK = utcp.DATA_LENGTH
|
|
||||||
@classmethod
|
|
||||||
def utcp_connect(cls, host, port, *a, **kw):
|
|
||||||
sock = utcp.UTCP(encrypted=True)
|
|
||||||
sock.connect((host, port))
|
|
||||||
return cls(sock)
|
|
||||||
def poll(self, timeout):
|
|
||||||
timeout = Timeout(timeout)
|
|
||||||
return self.sock.poll(timeout.timeleft())
|
|
||||||
|
|
||||||
def read(self, count):
|
|
||||||
try:
|
|
||||||
return self.sock.recv(count)
|
|
||||||
except EOFError:
|
|
||||||
self.close()
|
|
||||||
raise EOFError
|
|
||||||
def write(self, data):
|
|
||||||
try:
|
|
||||||
self.sock.send(data)
|
|
||||||
except EOFError:
|
|
||||||
|
|
||||||
ex = sys.exc_info()[1]
|
|
||||||
self.close()
|
|
||||||
raise EOFError(ex)
|
|
||||||
def utcp_connect(host, port, service=rpyc.VoidService, config={}, **kw):
|
|
||||||
s = UTCPSocketStream.utcp_connect(host, port, **kw)
|
|
||||||
return connect_stream(s, service, config)
|
|
||||||
class UTCPThreadedServer(ThreadedServer):
|
|
||||||
def __init__(self, service, hostname = '', ipv6 = False, port = 0,
|
|
||||||
backlog = 1, reuse_addr = True, authenticator = None, registrar = None,
|
|
||||||
auto_register = None, protocol_config = {}, logger = None, listener_timeout = 0.5,
|
|
||||||
socket_path = None):
|
|
||||||
backlog = 1
|
|
||||||
ThreadedServer.__init__(self, service, hostname=hostname, ipv6=ipv6, port=port,
|
|
||||||
backlog=backlog, reuse_addr=reuse_addr, authenticator=authenticator, registrar=registrar,
|
|
||||||
auto_register=auto_register, protocol_config=protocol_config, logger=logger, listener_timeout=listener_timeout,
|
|
||||||
socket_path=socket_path)
|
|
||||||
self.listener.close()
|
|
||||||
self.listener = None
|
|
||||||
##########
|
|
||||||
self.listener = utcp.UTCP(encrypted=True)
|
|
||||||
self.listener.bind((hostname, port))
|
|
||||||
sockname = self.listener.getsockname()
|
|
||||||
self.host, self.port = sockname[0], sockname[1]
|
|
||||||
def _serve_client(self, sock, credentials):
|
|
||||||
addrinfo = sock.getpeername()
|
|
||||||
if credentials:
|
|
||||||
self.logger.info("welcome %s (%r)", addrinfo, credentials)
|
|
||||||
else:
|
|
||||||
self.logger.info("welcome %s", addrinfo)
|
|
||||||
try:
|
|
||||||
config = dict(self.protocol_config, credentials = credentials,
|
|
||||||
endpoints = (sock.getsockname(), addrinfo), logger = self.logger)
|
|
||||||
conn = self.service._connect(Channel(UTCPSocketStream(sock)), config)
|
|
||||||
self._handle_connection(conn)
|
|
||||||
finally:
|
|
||||||
self.logger.info("goodbye %s", addrinfo)
|
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
|
||||||
|
class Settings:
|
||||||
|
def __init__(self):
|
||||||
|
self.listen_port = 8765
|
||||||
|
self.listen_host = ''
|
||||||
|
self.cname = 'Very cool peer'
|
||||||
|
|
||||||
|
settings = Settings()
|
|
@ -0,0 +1,41 @@
|
||||||
|
import rpyc
|
||||||
|
from . import const
|
||||||
|
from . import typedefs
|
||||||
|
from .settings import settings
|
||||||
|
from uuid import uuid4
|
||||||
|
from sqlitedict import SqliteDict
|
||||||
|
from . import helpers
|
||||||
|
|
||||||
|
peer = {}
|
||||||
|
my_id = uuid4().hex
|
||||||
|
|
||||||
|
class SignalpeerService(rpyc.Service):
|
||||||
|
signalpeerkvstore = None
|
||||||
|
peer_id = None
|
||||||
|
exposed_peer_type = typedefs.SIGNALPEER
|
||||||
|
def on_connect(self, conn):
|
||||||
|
self.peer_id = uuid4().hex
|
||||||
|
if conn.root.pid == my_id:
|
||||||
|
conn.close()
|
||||||
|
return
|
||||||
|
self.signalpeerkvstore = SqliteDict(const.signalpeers_db, autocommit=True)
|
||||||
|
peer[self.peer_id] = conn
|
||||||
|
peer_type = conn.root.peer_type
|
||||||
|
if peer_type == typedefs.SIGNALPEER:
|
||||||
|
helpers.store_signalpeer(conn, self.signalpeerkvstore)
|
||||||
|
helpers.merge_signalpeers(conn.root.signal_peers, self.signalpeerkvstore)
|
||||||
|
def on_disconnect(self, conn):
|
||||||
|
if self.peer_id in peer:
|
||||||
|
peer.pop(self.peer_id)
|
||||||
|
@property
|
||||||
|
def exposed_peer_port(self):
|
||||||
|
return settings.listen_port
|
||||||
|
@property
|
||||||
|
def exposed_peer_cname(self):
|
||||||
|
return settings.cname
|
||||||
|
@property
|
||||||
|
def exposed_pid(self):
|
||||||
|
return my_id
|
||||||
|
@property
|
||||||
|
def exposed_signal_peers(self):
|
||||||
|
return dict(self.signalpeerkvstore.items())
|
|
@ -0,0 +1,2 @@
|
||||||
|
SIGNALPEER = 1
|
||||||
|
PEER = 2
|
593
mods/utcp.py
593
mods/utcp.py
|
@ -1,593 +0,0 @@
|
||||||
# based on https://github.com/ethay012/UTCP-over-UDP
|
|
||||||
import random
|
|
||||||
import socket
|
|
||||||
import pickle
|
|
||||||
import threading
|
|
||||||
import io
|
|
||||||
import hashlib
|
|
||||||
import simplecrypto
|
|
||||||
from struct import Struct
|
|
||||||
import uuid
|
|
||||||
import bisect
|
|
||||||
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
DATA_DIVIDE_LENGTH = 8000
|
|
||||||
PACKET_HEADER_SIZE = 512 # Pickle service info
|
|
||||||
DATA_LENGTH = DATA_DIVIDE_LENGTH
|
|
||||||
SENT_SIZE = PACKET_HEADER_SIZE + DATA_LENGTH + 272 # Encrypted data always 272 bytes bigger
|
|
||||||
|
|
||||||
class Connection:
|
|
||||||
SMALLEST_STARTING_SEQ = 0
|
|
||||||
HIGHEST_STARTING_SEQ = 4294967295
|
|
||||||
def __init__(self, remote, encrypted=False):
|
|
||||||
self.fileno = 0
|
|
||||||
self.peer_addr = remote
|
|
||||||
self.seq = Connection.gen_starting_seq_num()
|
|
||||||
self.recv_seq = -1
|
|
||||||
self.my_key = None
|
|
||||||
if encrypted:
|
|
||||||
self.my_key = simplecrypto.RsaKeypair()
|
|
||||||
self.pubkey = self.my_key.publickey.serialize()
|
|
||||||
self.peer_pub = None
|
|
||||||
self.recv_lock = threading.Lock()
|
|
||||||
self.send_lock = threading.Lock()
|
|
||||||
self.packet_buffer = {
|
|
||||||
'ACK': [],
|
|
||||||
'SYN-ACK': [],
|
|
||||||
'DATA': [],
|
|
||||||
'FIN-ACK': []
|
|
||||||
}
|
|
||||||
@staticmethod
|
|
||||||
def gen_starting_seq_num():
|
|
||||||
return random.randint(Connection.SMALLEST_STARTING_SEQ, Connection.HIGHEST_STARTING_SEQ)
|
|
||||||
def seq_inc(self, inc=1):
|
|
||||||
self.seq += inc
|
|
||||||
return self.seq
|
|
||||||
def set_ack(self, ack):
|
|
||||||
self.ack = ack
|
|
||||||
return ack
|
|
||||||
|
|
||||||
class UTCPPacket:
|
|
||||||
def __cmp__(self, other):
|
|
||||||
return (self.seq > other.seq) - (self.seq < other.seq)
|
|
||||||
def __lt__(self, other):
|
|
||||||
return self.seq < other.seq
|
|
||||||
def __gt__(self, other):
|
|
||||||
return self.seq > other.seq
|
|
||||||
def __eq__(self, other):
|
|
||||||
return self.seq == other.seq
|
|
||||||
|
|
||||||
class Ack(UTCPPacket):
|
|
||||||
type = 'ACK'
|
|
||||||
def __init__(self, id_):
|
|
||||||
self.id = id_
|
|
||||||
self.seq = 0
|
|
||||||
class Fin(UTCPPacket):
|
|
||||||
type = 'FIN'
|
|
||||||
def __init__(self):
|
|
||||||
self.id = uuid.uuid4().bytes
|
|
||||||
self.seq = 0
|
|
||||||
class FinAck(UTCPPacket):
|
|
||||||
type = 'FIN-ACK'
|
|
||||||
def __init__(self, id_):
|
|
||||||
self.id = id_
|
|
||||||
self.seq = 0
|
|
||||||
class Syn(UTCPPacket):
|
|
||||||
type = 'SYN'
|
|
||||||
checksum = None
|
|
||||||
def __init__(self):
|
|
||||||
self.id = uuid.uuid4().bytes
|
|
||||||
self.seq = 0
|
|
||||||
def set_pub(self, pubkey):
|
|
||||||
self.checksum = UTCP.checksum(pubkey)
|
|
||||||
self.pubkey = pubkey
|
|
||||||
class SynAck(UTCPPacket):
|
|
||||||
type = 'SYN-ACK'
|
|
||||||
checksum = None
|
|
||||||
def __init__(self, id_):
|
|
||||||
self.id = id_
|
|
||||||
self.seq = 0
|
|
||||||
def set_pub(self, pubkey):
|
|
||||||
self.checksum = UTCP.checksum(pubkey)
|
|
||||||
self.pubkey = pubkey
|
|
||||||
|
|
||||||
class Packet(UTCPPacket):
|
|
||||||
type = 'DATA'
|
|
||||||
def __init__(self):
|
|
||||||
self.id = uuid.uuid4().bytes
|
|
||||||
self.checksum = 0
|
|
||||||
self.data = b''
|
|
||||||
self.seq = 0
|
|
||||||
def __repr__(self):
|
|
||||||
return f'Packet(seq={self.seq})'
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return 'UUID: %d, DATA:%s' % (uuid.UUID(bytes=self.id), self.data)
|
|
||||||
|
|
||||||
def set_data(self, data):
|
|
||||||
self.checksum = UTCP.checksum(data)
|
|
||||||
self.data = data
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class RestrictedUnpickler(pickle.Unpickler):
|
|
||||||
def find_class(self, module, name):
|
|
||||||
if module != 'builtins':
|
|
||||||
if name == 'Packet': return Packet
|
|
||||||
if name == 'Ack': return Ack
|
|
||||||
if name == 'Fin': return Fin
|
|
||||||
if name == 'FinAck': return FinAck
|
|
||||||
if name == 'Syn': return Syn
|
|
||||||
if name == 'SynAck': return SynAck
|
|
||||||
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
|
|
||||||
(module, name))
|
|
||||||
def restricted_pickle_loads(s):
|
|
||||||
return RestrictedUnpickler(io.BytesIO(s)).load()
|
|
||||||
|
|
||||||
class UTCPChannel:
|
|
||||||
HEADER = Struct('!I')
|
|
||||||
POLL_TIMEOUT = 0.1
|
|
||||||
def __init__(self, sock):
|
|
||||||
self.sock = sock
|
|
||||||
def poll(self):
|
|
||||||
return self.sock.poll(self.POLL_TIMEOUT)
|
|
||||||
def recv(self):
|
|
||||||
header = self.sock.recv(self.HEADER.size)
|
|
||||||
data_len = self.HEADER.unpack(header)[0]
|
|
||||||
return self.sock.recv(data_len)
|
|
||||||
def send(self, data):
|
|
||||||
header = self.HEADER.pack(len(data))
|
|
||||||
self.sock.send(header + data)
|
|
||||||
|
|
||||||
class ConnectedSOCK(object):
|
|
||||||
def __init__(self, low_sock, client_addr):
|
|
||||||
self.client_addr = client_addr
|
|
||||||
self.low_sock = low_sock
|
|
||||||
self.channel = UTCPChannel(self)
|
|
||||||
def __getattribute__(self, att):
|
|
||||||
try:
|
|
||||||
return object.__getattribute__(self, att)
|
|
||||||
except AttributeError:
|
|
||||||
return getattr(self.low_sock, att)
|
|
||||||
def getpeername(self):
|
|
||||||
return self.client_addr
|
|
||||||
def send(self, data):
|
|
||||||
if self.closed:
|
|
||||||
raise EOFError
|
|
||||||
return self.low_sock.send(data, self.client_addr)
|
|
||||||
def sendall(self, data):
|
|
||||||
if self.closed:
|
|
||||||
raise EOFError
|
|
||||||
self.low_sock.sendall(data, self.client_addr)
|
|
||||||
def recv(self, size):
|
|
||||||
if self.closed:
|
|
||||||
raise EOFError
|
|
||||||
return self.low_sock.recv(size, self.client_addr)
|
|
||||||
@property
|
|
||||||
def closed(self):
|
|
||||||
return self.low_sock.own_socket._closed or self.client_addr not in self.low_sock.connections
|
|
||||||
def close(self):
|
|
||||||
if self.client_addr in self.low_sock.connections:
|
|
||||||
self.low_sock.close(self.client_addr)
|
|
||||||
def shutdown(self, *a, **kw):
|
|
||||||
self.close()
|
|
||||||
def poll(self, timeout):
|
|
||||||
return self.low_sock.poll(timeout, self.client_addr)
|
|
||||||
def packets_arrived(self, packet_type):
|
|
||||||
return self.low_sock.packets_arrived(packet_type, self.client_addr)
|
|
||||||
def fileno(self):
|
|
||||||
if self.closed:
|
|
||||||
raise EOFError
|
|
||||||
return self.low_sock.fileno(self.client_addr)
|
|
||||||
|
|
||||||
class UTCP(object):
|
|
||||||
host = None
|
|
||||||
port = None
|
|
||||||
client = False
|
|
||||||
def __init__(self,encrypted=False, **kw):
|
|
||||||
self.encrypted = encrypted
|
|
||||||
self.incoming_packet_event = threading.Event()
|
|
||||||
self.new_conn_event = threading.Event()
|
|
||||||
self.own_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
self.own_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
||||||
self.settimeout()
|
|
||||||
self.connection_lock = threading.Lock()
|
|
||||||
self.queue_lock = threading.Lock()
|
|
||||||
self.channel = None
|
|
||||||
self.connections = {}
|
|
||||||
self.connection_queue = []
|
|
||||||
self.syn_received = {}
|
|
||||||
self.fileno_seq = 40000000
|
|
||||||
def next_fileno(self):
|
|
||||||
self.fileno_seq += 1
|
|
||||||
return self.fileno_seq
|
|
||||||
def packets_arrived(self, packet_type, connection=None):
|
|
||||||
try:
|
|
||||||
conn = self.connections[connection]
|
|
||||||
except:
|
|
||||||
raise EOFError
|
|
||||||
with conn.recv_lock:
|
|
||||||
return bool(len(conn.packet_buffer[packet_type]))
|
|
||||||
def poll(self, timeout, connection=None):
|
|
||||||
if connection not in list(self.connections.keys()):
|
|
||||||
if connection is None:
|
|
||||||
connection = list(self.connections.keys())[0]
|
|
||||||
else:
|
|
||||||
raise EOFError('Connection not in connected devices')
|
|
||||||
if not self.closed:
|
|
||||||
has_data = self.packets_arrived('DATA', connection)
|
|
||||||
if has_data:
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
if not timeout:
|
|
||||||
timeout = 0.5
|
|
||||||
while True and not self.closed:
|
|
||||||
self.incoming_packet_event.wait(timeout)
|
|
||||||
has_data = self.packets_arrived('DATA', connection)
|
|
||||||
if not has_data:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
return has_data
|
|
||||||
else:
|
|
||||||
self.incoming_packet_event.wait(timeout)
|
|
||||||
return self.packets_arrived('DATA', connection)
|
|
||||||
return False
|
|
||||||
def get_free_port(self):
|
|
||||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
s.bind(('', 0))
|
|
||||||
port = s.getsockname()[1]
|
|
||||||
s.close()
|
|
||||||
return port
|
|
||||||
def setsockopt(self, *a, **kw):
|
|
||||||
pass
|
|
||||||
def settimeout(self, timeout=5):
|
|
||||||
self.own_socket.settimeout(timeout)
|
|
||||||
def setblocking(self, mode):
|
|
||||||
self.own_socket.setblocking(mode)
|
|
||||||
def __repr__(self):
|
|
||||||
return 'UTCP()'
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return 'Connections: %s' \
|
|
||||||
% str(self.connections)
|
|
||||||
def getsockname(self):
|
|
||||||
return self.own_socket.getsockname()
|
|
||||||
def getpeername(self):
|
|
||||||
if len(self.connections):
|
|
||||||
return list(self.connections.keys())[0]
|
|
||||||
else:
|
|
||||||
raise EOFError('Not connected')
|
|
||||||
def bind(self, addr):
|
|
||||||
self.host = addr[0]
|
|
||||||
self.port = addr[1]
|
|
||||||
self.own_socket.bind(addr)
|
|
||||||
def send(self, data, connection=None):
|
|
||||||
if self.closed:
|
|
||||||
raise EOFError
|
|
||||||
try:
|
|
||||||
if connection not in list(self.connections.keys()):
|
|
||||||
if connection is None:
|
|
||||||
connection = list(self.connections.keys())[0]
|
|
||||||
else:
|
|
||||||
raise EOFError('Connection not in connected devices')
|
|
||||||
conn = self.connections[connection]
|
|
||||||
data_parts = UTCP.data_divider(data)
|
|
||||||
for data_part in data_parts:
|
|
||||||
data_chunk = data_part if not self.encrypted else conn.peer_pub.encrypt_raw(data_part)
|
|
||||||
packet = Packet()
|
|
||||||
packet.set_data(data_chunk)
|
|
||||||
self.__send_packet(connection, packet, retransmit=True)
|
|
||||||
return len(data)
|
|
||||||
except socket.error as error:
|
|
||||||
raise EOFError('Socket was closed before executing command. Error is: %s.' % error)
|
|
||||||
def sendall(self, data, connection=None):
|
|
||||||
_ = self.send(data, connection)
|
|
||||||
def recv(self, size, connection=None):
|
|
||||||
if self.closed:
|
|
||||||
raise EOFError
|
|
||||||
try:
|
|
||||||
if connection not in list(self.connections.keys()):
|
|
||||||
if connection is None:
|
|
||||||
connection = list(self.connections.keys())[0]
|
|
||||||
else:
|
|
||||||
raise EOFError('Connection not in connected devices')
|
|
||||||
data = self.find_correct_packet('DATA', connection, size)
|
|
||||||
if not self.status:
|
|
||||||
raise EOFError('Disconnecting')
|
|
||||||
return data
|
|
||||||
except socket.error as error:
|
|
||||||
raise EOFError('Socket was closed before executing command. Error is: %s.' % error)
|
|
||||||
def __send_packet(self, peer_addr, packet, retransmit=False, wait_cond='ACK'):
|
|
||||||
conn = self.connections[peer_addr]
|
|
||||||
packet.seq = conn.seq_inc()
|
|
||||||
packet_to_send = pickle.dumps(packet)
|
|
||||||
if not retransmit:
|
|
||||||
self.own_socket.sendto(packet_to_send, peer_addr)
|
|
||||||
else:
|
|
||||||
data_not_received = True
|
|
||||||
retransmit_count = 0
|
|
||||||
while data_not_received and retransmit_count < 3:
|
|
||||||
data_not_received = False
|
|
||||||
try:
|
|
||||||
self.own_socket.sendto(packet_to_send, peer_addr)
|
|
||||||
answer = self.find_correct_packet(wait_cond, peer_addr, want_id=packet.id)
|
|
||||||
if not answer:
|
|
||||||
data_not_received = True
|
|
||||||
retransmit_count += 1
|
|
||||||
except socket.timeout:
|
|
||||||
data_not_received = True
|
|
||||||
if not answer:
|
|
||||||
self.drop_connection(peer_addr)
|
|
||||||
raise EOFError('Connection lost')
|
|
||||||
return answer
|
|
||||||
def listen_handler(self, max_connections):
|
|
||||||
try:
|
|
||||||
while True and self.status:
|
|
||||||
try:
|
|
||||||
answer, address = self.find_correct_packet('SYN')
|
|
||||||
with self.queue_lock:
|
|
||||||
if len(self.connection_queue) < max_connections:
|
|
||||||
conn = Connection(address, self.encrypted)
|
|
||||||
conn.fileno = self.next_fileno()
|
|
||||||
if self.encrypted:
|
|
||||||
try:
|
|
||||||
conn.peer_pub = simplecrypto.RsaPublicKey(answer.pubkey)
|
|
||||||
except:
|
|
||||||
raise socket.error('Init peer public key error')
|
|
||||||
self.connection_queue.append((answer, conn))
|
|
||||||
self.blink_new_conn_event()
|
|
||||||
else:
|
|
||||||
if answer.id in map(lambda x: x[0].id, self.connection_queue):
|
|
||||||
continue
|
|
||||||
self.own_socket.sendto(b'Connections full', address)
|
|
||||||
except KeyError:
|
|
||||||
continue
|
|
||||||
except TypeError:
|
|
||||||
continue
|
|
||||||
except socket.error as error:
|
|
||||||
raise EOFError('Something went wrong in listen_handler func! Error is: %s.' + str(error))
|
|
||||||
|
|
||||||
def listen(self, max_connections=1):
|
|
||||||
self.status = 1
|
|
||||||
self.central_receive()
|
|
||||||
try:
|
|
||||||
t = threading.Thread(target=self.listen_handler, args=(max_connections,))
|
|
||||||
t.daemon = True
|
|
||||||
t.start()
|
|
||||||
except Exception as error:
|
|
||||||
raise EOFError('Something went wrong in listen func! Error is: %s.' % str(error))
|
|
||||||
def stop(self):
|
|
||||||
self.own_socket.close()
|
|
||||||
self.status = 0
|
|
||||||
def shutdown(self, *a, **kw):
|
|
||||||
self.close()
|
|
||||||
self.status = 0
|
|
||||||
self.connections = {}
|
|
||||||
self.stop()
|
|
||||||
def accept(self):
|
|
||||||
while self.status:
|
|
||||||
try:
|
|
||||||
self.new_conn_event.wait(0.1)
|
|
||||||
if self.connection_queue:
|
|
||||||
with self.queue_lock:
|
|
||||||
answer, conn = self.connection_queue.pop()
|
|
||||||
conn.recv_seq = answer.seq
|
|
||||||
self.connections[conn.peer_addr] = conn
|
|
||||||
syn_ack = SynAck(answer.id)
|
|
||||||
if self.encrypted:
|
|
||||||
syn_ack.set_pub(conn.peer_pub.encrypt_raw(conn.pubkey))
|
|
||||||
answer = self.__send_packet(conn.peer_addr, syn_ack, retransmit=True)
|
|
||||||
return ConnectedSOCK(self, conn.peer_addr), conn.peer_addr
|
|
||||||
except EOFError:
|
|
||||||
if conn.peer_addr in self.connections:
|
|
||||||
self.close(conn.peer_addr)
|
|
||||||
continue
|
|
||||||
except Exception as error:
|
|
||||||
if conn.peer_addr in self.connections:
|
|
||||||
self.close(conn.peer_addr)
|
|
||||||
raise EOFError('Something went wrong in accept func: ' + str(error))
|
|
||||||
|
|
||||||
def connect(self, server_address=('127.0.0.1', 10000)):
|
|
||||||
if server_address in self.connections:
|
|
||||||
raise EOFError('Already connected to peer')
|
|
||||||
try:
|
|
||||||
self.bind(('', self.get_free_port()))
|
|
||||||
self.status = 1
|
|
||||||
self.client = True
|
|
||||||
self.central_receive()
|
|
||||||
conn = Connection(server_address, self.encrypted)
|
|
||||||
self.connections[server_address] = conn
|
|
||||||
syn = Syn()
|
|
||||||
if self.encrypted:
|
|
||||||
syn.set_pub(conn.pubkey)
|
|
||||||
try:
|
|
||||||
answer = self.__send_packet(server_address, syn, retransmit=True, wait_cond='SYN-ACK')
|
|
||||||
except EOFError:
|
|
||||||
raise EOFError('Remote peer unreachable')
|
|
||||||
if type(answer) == str: # == 'Connections full':
|
|
||||||
raise socket.error('Server cant receive any connections right now.')
|
|
||||||
if self.encrypted:
|
|
||||||
try:
|
|
||||||
peer_pub = conn.my_key.decrypt_raw(answer.pubkey)
|
|
||||||
conn.peer_pub = simplecrypto.RsaPublicKey(peer_pub)
|
|
||||||
except:
|
|
||||||
raise socket.error('Decrypt peer public key error')
|
|
||||||
ack = Ack(answer.id)
|
|
||||||
self.__send_packet(server_address, ack)
|
|
||||||
self.channel = UTCPChannel(self)
|
|
||||||
conn.fileno = self.next_fileno()
|
|
||||||
except socket.error as error:
|
|
||||||
self.own_socket.close()
|
|
||||||
self.connections = {}
|
|
||||||
self.status = 0
|
|
||||||
raise EOFError('The socket was closed. Error:' + str(error))
|
|
||||||
def fileno(self, connection=None):
|
|
||||||
if connection not in list(self.connections.keys()):
|
|
||||||
if connection is None:
|
|
||||||
connection = list(self.connections.keys())[0]
|
|
||||||
else:
|
|
||||||
raise EOFError('Connection not in connected devices')
|
|
||||||
return self.connections[connection].fileno
|
|
||||||
@property
|
|
||||||
def closed(self):
|
|
||||||
return not bool(len(self.connections))
|
|
||||||
def drop_connection(self, connection):
|
|
||||||
with self.connection_lock:
|
|
||||||
if len(self.connections):
|
|
||||||
self.connections.pop(connection)
|
|
||||||
def close(self, connection=None):
|
|
||||||
try:
|
|
||||||
if connection not in list(self.connections.keys()):
|
|
||||||
if connection is None:
|
|
||||||
if len(self.connections):
|
|
||||||
connection = list(self.connections.keys())[0]
|
|
||||||
else:
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
raise EOFError('Connection not in connected devices')
|
|
||||||
fin = Fin()
|
|
||||||
answer = self.__send_packet(connection, fin, retransmit=True)
|
|
||||||
answer = self.find_correct_packet('FIN-ACK', connection, want_id=fin.id)
|
|
||||||
if not answer:
|
|
||||||
raise Exception('The receiver didn\'t send the fin packet')
|
|
||||||
else:
|
|
||||||
self.drop_connection(connection)
|
|
||||||
if not len(self.connections) and self.client:
|
|
||||||
self.stop()
|
|
||||||
except Exception as error:
|
|
||||||
raise EOFError('Something went wrong in the close func! Error is: %s.' % error)
|
|
||||||
|
|
||||||
def disconnect(self, connection, fin_id):
|
|
||||||
try:
|
|
||||||
ack = Ack(fin_id)
|
|
||||||
self.__send_packet(connection, ack)
|
|
||||||
fin_ack = FinAck(fin_id)
|
|
||||||
try:
|
|
||||||
self.__send_packet(connection, fin_ack)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
self.drop_connection(connection)
|
|
||||||
except Exception as error:
|
|
||||||
raise EOFError('Something went wrong in disconnect func:%s ' % error)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def data_divider(data):
|
|
||||||
'''Divides the data into a list where each element's length is 1024'''
|
|
||||||
data = [data[i:i + DATA_DIVIDE_LENGTH] for i in range(0, len(data), DATA_DIVIDE_LENGTH)]
|
|
||||||
return data
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def checksum(source_bytes):
|
|
||||||
return hashlib.sha1(source_bytes).digest()
|
|
||||||
|
|
||||||
def find_correct_packet(self, condition, address=('Any',), size=DATA_LENGTH, want_id=None):
|
|
||||||
not_found = True
|
|
||||||
tries = 0
|
|
||||||
while not_found and tries < 2 and self.status:
|
|
||||||
try:
|
|
||||||
not_found = False
|
|
||||||
if address[0] == 'Any':
|
|
||||||
order = self.syn_received.popitem() # to reverse the tuple received
|
|
||||||
return order[1], order[0]
|
|
||||||
try:
|
|
||||||
conn = self.connections[address]
|
|
||||||
except:
|
|
||||||
break
|
|
||||||
if condition in ['ACK', 'SYN-ACK', 'FIN-ACK']:
|
|
||||||
tries += 1
|
|
||||||
if condition == 'DATA':
|
|
||||||
if self.poll(0.1, address):
|
|
||||||
data = b''
|
|
||||||
while size:
|
|
||||||
if not self.poll(0.1, address):
|
|
||||||
continue
|
|
||||||
with conn.recv_lock:
|
|
||||||
packet = conn.packet_buffer[condition][0]
|
|
||||||
chunk = packet.data[:size]
|
|
||||||
chunk_len = len(chunk)
|
|
||||||
data += chunk
|
|
||||||
packet.data = packet.data[size:]
|
|
||||||
size -= chunk_len
|
|
||||||
if not len(packet.data):
|
|
||||||
try:
|
|
||||||
conn.packet_buffer[condition].pop(0)
|
|
||||||
except IndexError:
|
|
||||||
size = 0
|
|
||||||
return data
|
|
||||||
else:
|
|
||||||
raise KeyError
|
|
||||||
else:
|
|
||||||
if self.packets_arrived(condition, address):
|
|
||||||
packet = conn.packet_buffer[condition].pop()
|
|
||||||
else:
|
|
||||||
raise KeyError
|
|
||||||
if want_id and packet.id != want_id:
|
|
||||||
raise KeyError
|
|
||||||
return packet
|
|
||||||
except KeyError:
|
|
||||||
not_found = True
|
|
||||||
self.incoming_packet_event.wait(0.5)
|
|
||||||
def blink_incoming_packet_event(self):
|
|
||||||
self.incoming_packet_event.set()
|
|
||||||
self.incoming_packet_event.clear()
|
|
||||||
def blink_new_conn_event(self):
|
|
||||||
self.new_conn_event.set()
|
|
||||||
self.new_conn_event.clear()
|
|
||||||
def multiplex(self, packet, address):
|
|
||||||
if address not in self.connections and not isinstance(packet, Syn):
|
|
||||||
return
|
|
||||||
if not isinstance(packet, Syn):
|
|
||||||
conn = self.connections[address]
|
|
||||||
if isinstance(packet, SynAck):
|
|
||||||
conn.recv_seq = packet.seq
|
|
||||||
elif conn.recv_seq == packet.seq: # Repeat ACK
|
|
||||||
ack = Ack(packet.id)
|
|
||||||
self.__send_packet(address, ack)
|
|
||||||
return
|
|
||||||
elif packet.seq < conn.recv_seq: # Possibly DUP
|
|
||||||
return
|
|
||||||
elif packet.seq > (conn.recv_seq + 1): # Intermediate packet lost
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
conn.recv_seq = packet.seq
|
|
||||||
if isinstance(packet, Packet):
|
|
||||||
if packet.checksum == UTCP.checksum(packet.data):
|
|
||||||
if self.encrypted:
|
|
||||||
packet.data = conn.my_key.decrypt_raw(packet.data)
|
|
||||||
else:
|
|
||||||
return
|
|
||||||
if isinstance(packet, Fin):
|
|
||||||
self.disconnect(address, packet.id)
|
|
||||||
elif isinstance(packet, Syn):
|
|
||||||
if address in self.connections:
|
|
||||||
return
|
|
||||||
if packet.id not in map(lambda x: x.id, self.syn_received.values()):
|
|
||||||
self.syn_received[address] = packet
|
|
||||||
else:
|
|
||||||
with conn.recv_lock:
|
|
||||||
if packet.id not in map(lambda x: x.id, conn.packet_buffer[packet.type]):
|
|
||||||
bisect.insort(conn.packet_buffer[packet.type], packet)
|
|
||||||
if isinstance(packet, Packet):
|
|
||||||
ack = Ack(packet.id)
|
|
||||||
self.__send_packet(address, ack)
|
|
||||||
self.blink_incoming_packet_event()
|
|
||||||
|
|
||||||
def central_receive_handler(self):
|
|
||||||
while True and self.status:
|
|
||||||
try:
|
|
||||||
packet, address = self.own_socket.recvfrom(SENT_SIZE)
|
|
||||||
packet = restricted_pickle_loads(packet)
|
|
||||||
self.multiplex(packet, address)
|
|
||||||
except pickle.UnpicklingError:
|
|
||||||
continue
|
|
||||||
except socket.timeout:
|
|
||||||
continue
|
|
||||||
except socket.error:
|
|
||||||
self.own_socket.close()
|
|
||||||
self.status = 0
|
|
||||||
|
|
||||||
def central_receive(self):
|
|
||||||
t = threading.Thread(target=self.central_receive_handler)
|
|
||||||
t.daemon = True
|
|
||||||
t.start()
|
|
Loading…
Reference in New Issue