From bd5828240ae02aacc28e1f83adc5f1b3cd7e7704 Mon Sep 17 00:00:00 2001 From: Fred Stober Date: Wed, 10 Sep 2014 01:13:14 +0200 Subject: [PATCH] Initial implementation of the KRPC protocol --- .gitignore | 1 + LICENSE | 21 ++++++++ README.md | 26 ++++++++++ bencode.py | 129 +++++++++++++++++++++++++++++++++++++++++++++++ krpc.py | 144 +++++++++++++++++++++++++++++++++++++++++++++++++++++ utils.py | 94 ++++++++++++++++++++++++++++++++++ 6 files changed, 415 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 bencode.py create mode 100644 krpc.py create mode 100644 utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d20b64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..ef8aae0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Fred Stober + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..d751a6f --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ +tiny Bittorrent client +====================== + +The goal is to supply an easy to use and simple to understand implementation +of the BitTorrent specifications in python with no external dependencies +except for the python standard library. + +The implementation is spread over several files, each implementing +a single component. + + krpc.py - implements the basic UDP Kademila-RPC protocol layer + + +KRPC Implementation +------------------- + +The KRPCPeer only exposes three methods: + - __init__((host, port), query_handler) + That takes the (host, port) tuple where it should listen and the second + argument is the function that processes incoming messages. + - shutdown() + Shutdown of all threads and connections of the KRPC peer. + - send_krpc_query((host, port), method, **kwargs) + This method sends a query to a remote host specified by a (host, pool) tuple. + The name and arguments to call on the remote host is given as well. + An async result holder is returned, that allows to wait for a reply. diff --git a/bencode.py b/bencode.py new file mode 100644 index 0000000..af24758 --- /dev/null +++ b/bencode.py @@ -0,0 +1,129 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +# Written by Petru Paler + +class BTFailure(Exception): + pass + +def decode_int(x, f): + f += 1 + newf = x.index('e', f) + n = int(x[f:newf]) + if x[f] == '-': + if x[f + 1] == '0': + raise ValueError + elif x[f] == '0' and newf != f+1: + raise ValueError + return (n, newf+1) + +def decode_string(x, f): + colon = x.index(':', f) + n = int(x[f:colon]) + if x[f] == '0' and colon != f+1: + raise ValueError + colon += 1 + return (x[colon:colon+n], colon+n) + +def decode_list(x, f): + r, f = [], f+1 + while x[f] != 'e': + v, f = decode_func[x[f]](x, f) + r.append(v) + return (r, f + 1) + +def decode_dict(x, f): + r, f = {}, f+1 + while x[f] != 'e': + k, f = decode_string(x, f) + r[k], f = decode_func[x[f]](x, f) + return (r, f + 1) + +decode_func = {} +decode_func['l'] = decode_list +decode_func['d'] = decode_dict +decode_func['i'] = decode_int +decode_func['0'] = decode_string +decode_func['1'] = decode_string +decode_func['2'] = decode_string +decode_func['3'] = decode_string +decode_func['4'] = decode_string +decode_func['5'] = decode_string +decode_func['6'] = decode_string +decode_func['7'] = decode_string +decode_func['8'] = decode_string +decode_func['9'] = decode_string + +def bdecode(x): + try: + r, l = decode_func[x[0]](x, 0) + except (IndexError, KeyError, ValueError): + raise BTFailure("not a valid bencoded string") + if l != len(x): + raise BTFailure("invalid bencoded value (data after valid prefix)") + return r + +from types import StringType, IntType, LongType, DictType, ListType, TupleType + + +class Bencached(object): + __slots__ = ['bencoded'] + def __init__(self, s): + self.bencoded = s + +def encode_bencached(x,r): + r.append(x.bencoded) + +def encode_int(x, r): + r.extend(('i', str(x), 'e')) + +def encode_bool(x, r): + if x: + encode_int(1, r) + else: + encode_int(0, r) + +def encode_string(x, r): + r.extend((str(len(x)), ':', x)) + +def encode_list(x, r): + r.append('l') + for i in x: + encode_func[type(i)](i, r) + r.append('e') + +def encode_dict(x,r): + r.append('d') + ilist = x.items() + ilist.sort() + for k, v in ilist: + r.extend((str(len(k)), ':', k)) + encode_func[type(v)](v, r) + r.append('e') + +encode_func = {} +encode_func[Bencached] = encode_bencached +encode_func[IntType] = encode_int +encode_func[LongType] = encode_int +encode_func[StringType] = encode_string +encode_func[ListType] = encode_list +encode_func[TupleType] = encode_list +encode_func[DictType] = encode_dict + +try: + from types import BooleanType + encode_func[BooleanType] = encode_bool +except ImportError: + pass + +def bencode(x): + r = [] + encode_func[type(x)](x, r) + return ''.join(r) diff --git a/krpc.py b/krpc.py new file mode 100644 index 0000000..d9c2614 --- /dev/null +++ b/krpc.py @@ -0,0 +1,144 @@ +""" +The MIT License + +Copyright (c) 2014-2015 Fred Stober + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import time, socket, select, threading, logging +from bencode import bencode, bdecode +from utils import client_version, start_thread, AsyncResult, AsyncTimeout, encode_int + +krpc_version = client_version[0] + chr(client_version[1]) + chr(client_version[2]) + +class KRPCError(RuntimeError): + pass + +class KRPCPeer(object): + def __init__(self, connection, handle_query, cleanup_timeout = 60, cleanup_interval = 10): + """ Start listening on the connection given by (addr, port) + Incoming messages are given to the handle_query function, + with arguments (send_krpc_response, rec). + send_krpc_response(**kwargs) is a function to send a reply, + rec contains the dictionary with the incoming message. + """ + self._log = logging.getLogger(self.__class__.__name__) + self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._sock.setblocking(0) + self._sock.bind(connection) + + self._transaction = {} + self._transaction_id = 0 + self._transaction_lock = threading.Lock() + self._handle_query = handle_query + + self._shutdown_flag = False + self._listen_thread = start_thread(self._listen) + self._cleanup_thread = start_thread(self._cleanup_transactions, + timeout = cleanup_timeout, interval = cleanup_interval) + + def shutdown(self): + """ This function allows to cleanly shutdown the KRPCPeer. """ + self._shutdown_flag = True + + def _cleanup_transactions(self, timeout = 60, interval = 10): + while not self._shutdown_flag: + # Remove transactions older than 1min + with self._transaction_lock: + timeout_transactions = list(filter(lambda t: self._transaction[t].get_age() > timeout, self._transaction)) + if self._log.isEnabledFor(logging.DEBUG): + self._log.debug('Transactions: %d id=%d timeout=%d' % (len(self._transaction), self._transaction_id, len(timeout_transactions))) + for t in timeout_transactions: + self._transaction.pop(t).set_result(AsyncTimeout('Transaction %r: timeout' % t)) + time.sleep(interval) + + def _listen(self): + while not self._shutdown_flag: + try: + if select.select([self._sock], [], [], 10)[0]: + (encoded_rec, source_connection) = self._sock.recvfrom(64*1024) + try: + rec = bdecode(encoded_rec) + except: + self._log.exception('Exception while parsing KRPC requests from %r:\n\t%r' % (source_connection, encoded_rec)) + continue + if rec['y'] in ['r', 'e']: # Response / Error message + t = rec['t'] + if rec['y'] == 'e': + if self._log.isEnabledFor(logging.DEBUG): + self._log.debug('KRPC error message from %r:\n\t%r' % (source_connection, rec)) + rec = KRPCError('Error while processing transaction %r:\n\t%r' % (t, rec)) + else: + if self._log.isEnabledFor(logging.DEBUG): + self._log.debug('KRPC answer from %r:\n\t%r' % (source_connection, rec)) + with self._transaction_lock: + if self._transaction.get(t): + self._transaction.pop(t).set_result(rec, source = source_connection) + elif self._log.isEnabledFor(logging.INFO): + self._log.info('Received response from %r without associated transaction:\n%r' % (source_connection, rec)) + elif rec['y'] == 'q': + if self._log.isEnabledFor(logging.DEBUG): + self._log.debug('KRPC request from %r:\n\t%r' % (source_connection, rec)) + send_krpc_response = lambda message, top_level_message = {}:\ + self._send_krpc_response(source_connection, rec.pop('t'), message, top_level_message) + self._handle_query(send_krpc_response, rec, source_connection) + else: + if self._log.isEnabledFor(logging.INFO): + self._log.info('Unknown type of KRPC message from %r:\n\t%r' % (source_connection, rec)) + except Exception: + self._log.exception('Exception while handling KRPC requests from %r:\n\t%r' % (source_connection, rec)) + self._sock.close() + + def _send_krpc_response(self, source_connection, remote_transaction, message, top_level_message = {}): + with self._transaction_lock: + resp = {'y': 'r', 't': remote_transaction, 'v': krpc_version, 'r': message} + resp.update(top_level_message) + if self._log.isEnabledFor(logging.DEBUG): + self._log.debug('KRPC response to %r:\n\t%r' % (source_connection, resp)) + self._sock.sendto(bencode(resp), source_connection) + + def send_krpc_query(self, target_connection, method, **kwargs): + """ Invoke method on the node at target_connection. + The arguments for the method are given in kwargs. + Returns an AsyncResult (waitable) that will + eventually contain the peer response. + """ + target_connection = (socket.gethostbyname(target_connection[0]), target_connection[1]) + with self._transaction_lock: + while True: # Generate transaction id + self._transaction_id += 1 + local_transaction = encode_int(self._transaction_id).lstrip('\x00') + if local_transaction not in self._transaction: + break + req = {'y': 'q', 't': local_transaction, 'v': krpc_version, 'q': method, 'a': kwargs} + self._transaction[local_transaction] = AsyncResult(source = (method, kwargs, target_connection)) + if self._log.isEnabledFor(logging.DEBUG): + self._log.debug('KRPC request to %r:\n\t%r' % (target_connection, req)) + self._sock.sendto(bencode(req), target_connection) + return self._transaction[local_transaction] + + +if __name__ == '__main__': + logging.basicConfig() + logging.getLogger().setLevel(logging.INFO) + # Implement an echo message + peer = KRPCPeer(('0.0.0.0', 1111), handle_query = lambda send_krpc_response, rec, source_connection: + send_krpc_response(message = 'Hello %s!' % rec['a']['message'])) + print peer.send_krpc_query(('localhost', 1111), 'echo', message = 'World').get_result(2) diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..ef0bc15 --- /dev/null +++ b/utils.py @@ -0,0 +1,94 @@ +""" +The MIT License + +Copyright (c) 2014 Fred Stober + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import socket, struct, threading, time + +client_version = ('XK', 0, 0x01) # eXperimental Klient 0.0.1 + +encode_short = lambda value: struct.pack('!H', value) +encode_int = lambda value: struct.pack('!I', value) +encode_ip = lambda value: socket.inet_aton(value) + +def encode_connection(con): + return encode_ip(con[0]) + encode_short(con[1]) + +def encode_nodes(nodes): + result = '' + for node in nodes: + result += struct.pack('20s', node.id) + encode_connection(node.connection) + return result + +decode_short = lambda value: struct.unpack('!H', value)[0] +decode_int = lambda value: struct.unpack('!I', value)[0] +decode_ip = lambda value: socket.inet_ntoa(value) + +def decode_connection(con): + return (decode_ip(con[0:4]), decode_short(con[4:6])) + +def decode_nodes(nodes): + while nodes: + node_id = struct.unpack('20s', nodes[:20])[0] + node_connection = decode_connection(nodes[20:26]) + yield (node_id, node_connection) + nodes = nodes[26:] + +def start_thread(fun, *args, **kwargs): + thread = threading.Thread(target=fun, args=args, kwargs=kwargs) + thread.daemon = True + thread.start() + return thread + +class AsyncTimeout(RuntimeError): + pass + +class AsyncResult(object): + def __init__(self, source = None): + self._event = threading.Event() + self._value = None + self._source = source + self._time = time.time() + + def get_age(self): + return time.time() - self._time + + def discard_result(self): + self._time = 0 + + def set_result(self, result, source = None): + self._value = result + self._source = source + self._event.set() + + def has_result(self): + return self._event.is_set() + + def get_source(self): + return self._source + + def get_result(self, timeout = None): + if not self._event.wait(timeout): + raise AsyncTimeout + if isinstance(self._value, Exception): + raise self._value + return self._value