diff --git a/mods/const.py b/mods/const.py new file mode 100644 index 0000000..9039eb2 --- /dev/null +++ b/mods/const.py @@ -0,0 +1,5 @@ +import os +app_dir = os.path.join(os.environ['HOME'], '.black-mamba') +signalpeers_db = os.path.join(app_dir, 'signalpeers.sqlite') +downloads_db = os.path.join(app_dir, 'downloads.sqlite') +sharedfiles_db = os.path.join(app_dir, 'sharedfiles.sqlite') diff --git a/mods/helpers.py b/mods/helpers.py new file mode 100644 index 0000000..9baf5e6 --- /dev/null +++ b/mods/helpers.py @@ -0,0 +1,15 @@ +from datetime import datetime + +def store_signalpeer(conn, signalpeerkvstore): + peer_addr = (conn._channel.stream.sock.getpeername()[0], conn.root.peer_port) + signalpeer_key = f'{peer_addr[0]:{peer_addr[1]}}' + if signalpeer_key not in signalpeerkvstore: + signalpeerkvstore[signalpeer_key] = {'addr': peer_addr, 'seen': datetime.utcnow(), 'cname': conn.root.peer_cname} + else: + signalpeerkvstore[signalpeer_key]['seen'] = datetime.utcnow() + if signalpeerkvstore[signalpeer_key]['cname'] != conn.root.peer_cname: + signalpeerkvstore[signalpeer_key]['cname'] = conn.root.peer_cname +def merge_signalpeers(signalpeers, signalpeerkvstore): + new_keys = list(set(signalpeers.keys()) - set(signalpeerkvstore.keys())) + for k in new_keys: + signalpeerkvstore[k] = signalpeers[k] diff --git a/mods/peer.py b/mods/peer.py new file mode 100644 index 0000000..4fa1ba3 --- /dev/null +++ b/mods/peer.py @@ -0,0 +1 @@ +import rpyc diff --git a/mods/rpyc_utcp.py b/mods/rpyc_utcp.py deleted file mode 100644 index 6f2a524..0000000 --- a/mods/rpyc_utcp.py +++ /dev/null @@ -1,67 +0,0 @@ -import rpyc -from rpyc.utils.server import ThreadedServer -from rpyc.core import SocketStream, Channel -from rpyc.utils.factory import connect_stream -from rpyc.lib import Timeout -from . import utcp -import sys - -class UTCPSocketStream(SocketStream): - MAX_IO_CHUNK = utcp.DATA_LENGTH - @classmethod - def utcp_connect(cls, host, port, *a, **kw): - sock = utcp.UTCP(encrypted=True) - sock.connect((host, port)) - return cls(sock) - def poll(self, timeout): - timeout = Timeout(timeout) - return self.sock.poll(timeout.timeleft()) - - def read(self, count): - try: - return self.sock.recv(count) - except EOFError: - self.close() - raise EOFError - def write(self, data): - try: - self.sock.send(data) - except EOFError: - - ex = sys.exc_info()[1] - self.close() - raise EOFError(ex) -def utcp_connect(host, port, service=rpyc.VoidService, config={}, **kw): - s = UTCPSocketStream.utcp_connect(host, port, **kw) - return connect_stream(s, service, config) -class UTCPThreadedServer(ThreadedServer): - def __init__(self, service, hostname = '', ipv6 = False, port = 0, - backlog = 1, reuse_addr = True, authenticator = None, registrar = None, - auto_register = None, protocol_config = {}, logger = None, listener_timeout = 0.5, - socket_path = None): - backlog = 1 - ThreadedServer.__init__(self, service, hostname=hostname, ipv6=ipv6, port=port, - backlog=backlog, reuse_addr=reuse_addr, authenticator=authenticator, registrar=registrar, - auto_register=auto_register, protocol_config=protocol_config, logger=logger, listener_timeout=listener_timeout, - socket_path=socket_path) - self.listener.close() - self.listener = None - ########## - self.listener = utcp.UTCP(encrypted=True) - self.listener.bind((hostname, port)) - sockname = self.listener.getsockname() - self.host, self.port = sockname[0], sockname[1] - def _serve_client(self, sock, credentials): - addrinfo = sock.getpeername() - if credentials: - self.logger.info("welcome %s (%r)", addrinfo, credentials) - else: - self.logger.info("welcome %s", addrinfo) - try: - config = dict(self.protocol_config, credentials = credentials, - endpoints = (sock.getsockname(), addrinfo), logger = self.logger) - conn = self.service._connect(Channel(UTCPSocketStream(sock)), config) - self._handle_connection(conn) - finally: - self.logger.info("goodbye %s", addrinfo) - diff --git a/mods/settings.py b/mods/settings.py new file mode 100644 index 0000000..40978c5 --- /dev/null +++ b/mods/settings.py @@ -0,0 +1,8 @@ + +class Settings: + def __init__(self): + self.listen_port = 8765 + self.listen_host = '' + self.cname = 'Very cool peer' + +settings = Settings() diff --git a/mods/signalpeer.py b/mods/signalpeer.py new file mode 100644 index 0000000..f6ba0bb --- /dev/null +++ b/mods/signalpeer.py @@ -0,0 +1,41 @@ +import rpyc +from . import const +from . import typedefs +from .settings import settings +from uuid import uuid4 +from sqlitedict import SqliteDict +from . import helpers + +peer = {} +my_id = uuid4().hex + +class SignalpeerService(rpyc.Service): + signalpeerkvstore = None + peer_id = None + exposed_peer_type = typedefs.SIGNALPEER + def on_connect(self, conn): + self.peer_id = uuid4().hex + if conn.root.pid == my_id: + conn.close() + return + self.signalpeerkvstore = SqliteDict(const.signalpeers_db, autocommit=True) + peer[self.peer_id] = conn + peer_type = conn.root.peer_type + if peer_type == typedefs.SIGNALPEER: + helpers.store_signalpeer(conn, self.signalpeerkvstore) + helpers.merge_signalpeers(conn.root.signal_peers, self.signalpeerkvstore) + def on_disconnect(self, conn): + if self.peer_id in peer: + peer.pop(self.peer_id) + @property + def exposed_peer_port(self): + return settings.listen_port + @property + def exposed_peer_cname(self): + return settings.cname + @property + def exposed_pid(self): + return my_id + @property + def exposed_signal_peers(self): + return dict(self.signalpeerkvstore.items()) diff --git a/mods/typedefs.py b/mods/typedefs.py new file mode 100644 index 0000000..b64df68 --- /dev/null +++ b/mods/typedefs.py @@ -0,0 +1,2 @@ +SIGNALPEER = 1 +PEER = 2 diff --git a/mods/utcp.py b/mods/utcp.py deleted file mode 100644 index 5e82da9..0000000 --- a/mods/utcp.py +++ /dev/null @@ -1,593 +0,0 @@ -# based on https://github.com/ethay012/UTCP-over-UDP -import random -import socket -import pickle -import threading -import io -import hashlib -import simplecrypto -from struct import Struct -import uuid -import bisect - -from datetime import datetime - -DATA_DIVIDE_LENGTH = 8000 -PACKET_HEADER_SIZE = 512 # Pickle service info -DATA_LENGTH = DATA_DIVIDE_LENGTH -SENT_SIZE = PACKET_HEADER_SIZE + DATA_LENGTH + 272 # Encrypted data always 272 bytes bigger - -class Connection: - SMALLEST_STARTING_SEQ = 0 - HIGHEST_STARTING_SEQ = 4294967295 - def __init__(self, remote, encrypted=False): - self.fileno = 0 - self.peer_addr = remote - self.seq = Connection.gen_starting_seq_num() - self.recv_seq = -1 - self.my_key = None - if encrypted: - self.my_key = simplecrypto.RsaKeypair() - self.pubkey = self.my_key.publickey.serialize() - self.peer_pub = None - self.recv_lock = threading.Lock() - self.send_lock = threading.Lock() - self.packet_buffer = { - 'ACK': [], - 'SYN-ACK': [], - 'DATA': [], - 'FIN-ACK': [] - } - @staticmethod - def gen_starting_seq_num(): - return random.randint(Connection.SMALLEST_STARTING_SEQ, Connection.HIGHEST_STARTING_SEQ) - def seq_inc(self, inc=1): - self.seq += inc - return self.seq - def set_ack(self, ack): - self.ack = ack - return ack - -class UTCPPacket: - def __cmp__(self, other): - return (self.seq > other.seq) - (self.seq < other.seq) - def __lt__(self, other): - return self.seq < other.seq - def __gt__(self, other): - return self.seq > other.seq - def __eq__(self, other): - return self.seq == other.seq - -class Ack(UTCPPacket): - type = 'ACK' - def __init__(self, id_): - self.id = id_ - self.seq = 0 -class Fin(UTCPPacket): - type = 'FIN' - def __init__(self): - self.id = uuid.uuid4().bytes - self.seq = 0 -class FinAck(UTCPPacket): - type = 'FIN-ACK' - def __init__(self, id_): - self.id = id_ - self.seq = 0 -class Syn(UTCPPacket): - type = 'SYN' - checksum = None - def __init__(self): - self.id = uuid.uuid4().bytes - self.seq = 0 - def set_pub(self, pubkey): - self.checksum = UTCP.checksum(pubkey) - self.pubkey = pubkey -class SynAck(UTCPPacket): - type = 'SYN-ACK' - checksum = None - def __init__(self, id_): - self.id = id_ - self.seq = 0 - def set_pub(self, pubkey): - self.checksum = UTCP.checksum(pubkey) - self.pubkey = pubkey - -class Packet(UTCPPacket): - type = 'DATA' - def __init__(self): - self.id = uuid.uuid4().bytes - self.checksum = 0 - self.data = b'' - self.seq = 0 - def __repr__(self): - return f'Packet(seq={self.seq})' - - def __str__(self): - return 'UUID: %d, DATA:%s' % (uuid.UUID(bytes=self.id), self.data) - - def set_data(self, data): - self.checksum = UTCP.checksum(data) - self.data = data - - - -class RestrictedUnpickler(pickle.Unpickler): - def find_class(self, module, name): - if module != 'builtins': - if name == 'Packet': return Packet - if name == 'Ack': return Ack - if name == 'Fin': return Fin - if name == 'FinAck': return FinAck - if name == 'Syn': return Syn - if name == 'SynAck': return SynAck - raise pickle.UnpicklingError("global '%s.%s' is forbidden" % - (module, name)) -def restricted_pickle_loads(s): - return RestrictedUnpickler(io.BytesIO(s)).load() - -class UTCPChannel: - HEADER = Struct('!I') - POLL_TIMEOUT = 0.1 - def __init__(self, sock): - self.sock = sock - def poll(self): - return self.sock.poll(self.POLL_TIMEOUT) - def recv(self): - header = self.sock.recv(self.HEADER.size) - data_len = self.HEADER.unpack(header)[0] - return self.sock.recv(data_len) - def send(self, data): - header = self.HEADER.pack(len(data)) - self.sock.send(header + data) - -class ConnectedSOCK(object): - def __init__(self, low_sock, client_addr): - self.client_addr = client_addr - self.low_sock = low_sock - self.channel = UTCPChannel(self) - def __getattribute__(self, att): - try: - return object.__getattribute__(self, att) - except AttributeError: - return getattr(self.low_sock, att) - def getpeername(self): - return self.client_addr - def send(self, data): - if self.closed: - raise EOFError - return self.low_sock.send(data, self.client_addr) - def sendall(self, data): - if self.closed: - raise EOFError - self.low_sock.sendall(data, self.client_addr) - def recv(self, size): - if self.closed: - raise EOFError - return self.low_sock.recv(size, self.client_addr) - @property - def closed(self): - return self.low_sock.own_socket._closed or self.client_addr not in self.low_sock.connections - def close(self): - if self.client_addr in self.low_sock.connections: - self.low_sock.close(self.client_addr) - def shutdown(self, *a, **kw): - self.close() - def poll(self, timeout): - return self.low_sock.poll(timeout, self.client_addr) - def packets_arrived(self, packet_type): - return self.low_sock.packets_arrived(packet_type, self.client_addr) - def fileno(self): - if self.closed: - raise EOFError - return self.low_sock.fileno(self.client_addr) - -class UTCP(object): - host = None - port = None - client = False - def __init__(self,encrypted=False, **kw): - self.encrypted = encrypted - self.incoming_packet_event = threading.Event() - self.new_conn_event = threading.Event() - self.own_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.own_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.settimeout() - self.connection_lock = threading.Lock() - self.queue_lock = threading.Lock() - self.channel = None - self.connections = {} - self.connection_queue = [] - self.syn_received = {} - self.fileno_seq = 40000000 - def next_fileno(self): - self.fileno_seq += 1 - return self.fileno_seq - def packets_arrived(self, packet_type, connection=None): - try: - conn = self.connections[connection] - except: - raise EOFError - with conn.recv_lock: - return bool(len(conn.packet_buffer[packet_type])) - def poll(self, timeout, connection=None): - if connection not in list(self.connections.keys()): - if connection is None: - connection = list(self.connections.keys())[0] - else: - raise EOFError('Connection not in connected devices') - if not self.closed: - has_data = self.packets_arrived('DATA', connection) - if has_data: - return True - else: - if not timeout: - timeout = 0.5 - while True and not self.closed: - self.incoming_packet_event.wait(timeout) - has_data = self.packets_arrived('DATA', connection) - if not has_data: - continue - else: - return has_data - else: - self.incoming_packet_event.wait(timeout) - return self.packets_arrived('DATA', connection) - return False - def get_free_port(self): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.bind(('', 0)) - port = s.getsockname()[1] - s.close() - return port - def setsockopt(self, *a, **kw): - pass - def settimeout(self, timeout=5): - self.own_socket.settimeout(timeout) - def setblocking(self, mode): - self.own_socket.setblocking(mode) - def __repr__(self): - return 'UTCP()' - - def __str__(self): - return 'Connections: %s' \ - % str(self.connections) - def getsockname(self): - return self.own_socket.getsockname() - def getpeername(self): - if len(self.connections): - return list(self.connections.keys())[0] - else: - raise EOFError('Not connected') - def bind(self, addr): - self.host = addr[0] - self.port = addr[1] - self.own_socket.bind(addr) - def send(self, data, connection=None): - if self.closed: - raise EOFError - try: - if connection not in list(self.connections.keys()): - if connection is None: - connection = list(self.connections.keys())[0] - else: - raise EOFError('Connection not in connected devices') - conn = self.connections[connection] - data_parts = UTCP.data_divider(data) - for data_part in data_parts: - data_chunk = data_part if not self.encrypted else conn.peer_pub.encrypt_raw(data_part) - packet = Packet() - packet.set_data(data_chunk) - self.__send_packet(connection, packet, retransmit=True) - return len(data) - except socket.error as error: - raise EOFError('Socket was closed before executing command. Error is: %s.' % error) - def sendall(self, data, connection=None): - _ = self.send(data, connection) - def recv(self, size, connection=None): - if self.closed: - raise EOFError - try: - if connection not in list(self.connections.keys()): - if connection is None: - connection = list(self.connections.keys())[0] - else: - raise EOFError('Connection not in connected devices') - data = self.find_correct_packet('DATA', connection, size) - if not self.status: - raise EOFError('Disconnecting') - return data - except socket.error as error: - raise EOFError('Socket was closed before executing command. Error is: %s.' % error) - def __send_packet(self, peer_addr, packet, retransmit=False, wait_cond='ACK'): - conn = self.connections[peer_addr] - packet.seq = conn.seq_inc() - packet_to_send = pickle.dumps(packet) - if not retransmit: - self.own_socket.sendto(packet_to_send, peer_addr) - else: - data_not_received = True - retransmit_count = 0 - while data_not_received and retransmit_count < 3: - data_not_received = False - try: - self.own_socket.sendto(packet_to_send, peer_addr) - answer = self.find_correct_packet(wait_cond, peer_addr, want_id=packet.id) - if not answer: - data_not_received = True - retransmit_count += 1 - except socket.timeout: - data_not_received = True - if not answer: - self.drop_connection(peer_addr) - raise EOFError('Connection lost') - return answer - def listen_handler(self, max_connections): - try: - while True and self.status: - try: - answer, address = self.find_correct_packet('SYN') - with self.queue_lock: - if len(self.connection_queue) < max_connections: - conn = Connection(address, self.encrypted) - conn.fileno = self.next_fileno() - if self.encrypted: - try: - conn.peer_pub = simplecrypto.RsaPublicKey(answer.pubkey) - except: - raise socket.error('Init peer public key error') - self.connection_queue.append((answer, conn)) - self.blink_new_conn_event() - else: - if answer.id in map(lambda x: x[0].id, self.connection_queue): - continue - self.own_socket.sendto(b'Connections full', address) - except KeyError: - continue - except TypeError: - continue - except socket.error as error: - raise EOFError('Something went wrong in listen_handler func! Error is: %s.' + str(error)) - - def listen(self, max_connections=1): - self.status = 1 - self.central_receive() - try: - t = threading.Thread(target=self.listen_handler, args=(max_connections,)) - t.daemon = True - t.start() - except Exception as error: - raise EOFError('Something went wrong in listen func! Error is: %s.' % str(error)) - def stop(self): - self.own_socket.close() - self.status = 0 - def shutdown(self, *a, **kw): - self.close() - self.status = 0 - self.connections = {} - self.stop() - def accept(self): - while self.status: - try: - self.new_conn_event.wait(0.1) - if self.connection_queue: - with self.queue_lock: - answer, conn = self.connection_queue.pop() - conn.recv_seq = answer.seq - self.connections[conn.peer_addr] = conn - syn_ack = SynAck(answer.id) - if self.encrypted: - syn_ack.set_pub(conn.peer_pub.encrypt_raw(conn.pubkey)) - answer = self.__send_packet(conn.peer_addr, syn_ack, retransmit=True) - return ConnectedSOCK(self, conn.peer_addr), conn.peer_addr - except EOFError: - if conn.peer_addr in self.connections: - self.close(conn.peer_addr) - continue - except Exception as error: - if conn.peer_addr in self.connections: - self.close(conn.peer_addr) - raise EOFError('Something went wrong in accept func: ' + str(error)) - - def connect(self, server_address=('127.0.0.1', 10000)): - if server_address in self.connections: - raise EOFError('Already connected to peer') - try: - self.bind(('', self.get_free_port())) - self.status = 1 - self.client = True - self.central_receive() - conn = Connection(server_address, self.encrypted) - self.connections[server_address] = conn - syn = Syn() - if self.encrypted: - syn.set_pub(conn.pubkey) - try: - answer = self.__send_packet(server_address, syn, retransmit=True, wait_cond='SYN-ACK') - except EOFError: - raise EOFError('Remote peer unreachable') - if type(answer) == str: # == 'Connections full': - raise socket.error('Server cant receive any connections right now.') - if self.encrypted: - try: - peer_pub = conn.my_key.decrypt_raw(answer.pubkey) - conn.peer_pub = simplecrypto.RsaPublicKey(peer_pub) - except: - raise socket.error('Decrypt peer public key error') - ack = Ack(answer.id) - self.__send_packet(server_address, ack) - self.channel = UTCPChannel(self) - conn.fileno = self.next_fileno() - except socket.error as error: - self.own_socket.close() - self.connections = {} - self.status = 0 - raise EOFError('The socket was closed. Error:' + str(error)) - def fileno(self, connection=None): - if connection not in list(self.connections.keys()): - if connection is None: - connection = list(self.connections.keys())[0] - else: - raise EOFError('Connection not in connected devices') - return self.connections[connection].fileno - @property - def closed(self): - return not bool(len(self.connections)) - def drop_connection(self, connection): - with self.connection_lock: - if len(self.connections): - self.connections.pop(connection) - def close(self, connection=None): - try: - if connection not in list(self.connections.keys()): - if connection is None: - if len(self.connections): - connection = list(self.connections.keys())[0] - else: - return - else: - raise EOFError('Connection not in connected devices') - fin = Fin() - answer = self.__send_packet(connection, fin, retransmit=True) - answer = self.find_correct_packet('FIN-ACK', connection, want_id=fin.id) - if not answer: - raise Exception('The receiver didn\'t send the fin packet') - else: - self.drop_connection(connection) - if not len(self.connections) and self.client: - self.stop() - except Exception as error: - raise EOFError('Something went wrong in the close func! Error is: %s.' % error) - - def disconnect(self, connection, fin_id): - try: - ack = Ack(fin_id) - self.__send_packet(connection, ack) - fin_ack = FinAck(fin_id) - try: - self.__send_packet(connection, fin_ack) - except: - pass - self.drop_connection(connection) - except Exception as error: - raise EOFError('Something went wrong in disconnect func:%s ' % error) - - @staticmethod - def data_divider(data): - '''Divides the data into a list where each element's length is 1024''' - data = [data[i:i + DATA_DIVIDE_LENGTH] for i in range(0, len(data), DATA_DIVIDE_LENGTH)] - return data - - @staticmethod - def checksum(source_bytes): - return hashlib.sha1(source_bytes).digest() - - def find_correct_packet(self, condition, address=('Any',), size=DATA_LENGTH, want_id=None): - not_found = True - tries = 0 - while not_found and tries < 2 and self.status: - try: - not_found = False - if address[0] == 'Any': - order = self.syn_received.popitem() # to reverse the tuple received - return order[1], order[0] - try: - conn = self.connections[address] - except: - break - if condition in ['ACK', 'SYN-ACK', 'FIN-ACK']: - tries += 1 - if condition == 'DATA': - if self.poll(0.1, address): - data = b'' - while size: - if not self.poll(0.1, address): - continue - with conn.recv_lock: - packet = conn.packet_buffer[condition][0] - chunk = packet.data[:size] - chunk_len = len(chunk) - data += chunk - packet.data = packet.data[size:] - size -= chunk_len - if not len(packet.data): - try: - conn.packet_buffer[condition].pop(0) - except IndexError: - size = 0 - return data - else: - raise KeyError - else: - if self.packets_arrived(condition, address): - packet = conn.packet_buffer[condition].pop() - else: - raise KeyError - if want_id and packet.id != want_id: - raise KeyError - return packet - except KeyError: - not_found = True - self.incoming_packet_event.wait(0.5) - def blink_incoming_packet_event(self): - self.incoming_packet_event.set() - self.incoming_packet_event.clear() - def blink_new_conn_event(self): - self.new_conn_event.set() - self.new_conn_event.clear() - def multiplex(self, packet, address): - if address not in self.connections and not isinstance(packet, Syn): - return - if not isinstance(packet, Syn): - conn = self.connections[address] - if isinstance(packet, SynAck): - conn.recv_seq = packet.seq - elif conn.recv_seq == packet.seq: # Repeat ACK - ack = Ack(packet.id) - self.__send_packet(address, ack) - return - elif packet.seq < conn.recv_seq: # Possibly DUP - return - elif packet.seq > (conn.recv_seq + 1): # Intermediate packet lost - return - else: - conn.recv_seq = packet.seq - if isinstance(packet, Packet): - if packet.checksum == UTCP.checksum(packet.data): - if self.encrypted: - packet.data = conn.my_key.decrypt_raw(packet.data) - else: - return - if isinstance(packet, Fin): - self.disconnect(address, packet.id) - elif isinstance(packet, Syn): - if address in self.connections: - return - if packet.id not in map(lambda x: x.id, self.syn_received.values()): - self.syn_received[address] = packet - else: - with conn.recv_lock: - if packet.id not in map(lambda x: x.id, conn.packet_buffer[packet.type]): - bisect.insort(conn.packet_buffer[packet.type], packet) - if isinstance(packet, Packet): - ack = Ack(packet.id) - self.__send_packet(address, ack) - self.blink_incoming_packet_event() - - def central_receive_handler(self): - while True and self.status: - try: - packet, address = self.own_socket.recvfrom(SENT_SIZE) - packet = restricted_pickle_loads(packet) - self.multiplex(packet, address) - except pickle.UnpicklingError: - continue - except socket.timeout: - continue - except socket.error: - self.own_socket.close() - self.status = 0 - - def central_receive(self): - t = threading.Thread(target=self.central_receive_handler) - t.daemon = True - t.start() diff --git a/runbm.py b/runbm.py new file mode 100644 index 0000000..e5a0d9b --- /dev/null +++ b/runbm.py @@ -0,0 +1 @@ +#!/usr/bin/env python3