Initial implementation of the KRPC protocol
commit
bd5828240a
|
@ -0,0 +1 @@
|
|||
*.pyc
|
|
@ -0,0 +1,21 @@
|
|||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2014 Fred Stober
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
|
@ -0,0 +1,26 @@
|
|||
tiny Bittorrent client
|
||||
======================
|
||||
|
||||
The goal is to supply an easy to use and simple to understand implementation
|
||||
of the BitTorrent specifications in python with no external dependencies
|
||||
except for the python standard library.
|
||||
|
||||
The implementation is spread over several files, each implementing
|
||||
a single component.
|
||||
|
||||
krpc.py - implements the basic UDP Kademila-RPC protocol layer
|
||||
|
||||
|
||||
KRPC Implementation
|
||||
-------------------
|
||||
|
||||
The KRPCPeer only exposes three methods:
|
||||
- __init__((host, port), query_handler)
|
||||
That takes the (host, port) tuple where it should listen and the second
|
||||
argument is the function that processes incoming messages.
|
||||
- shutdown()
|
||||
Shutdown of all threads and connections of the KRPC peer.
|
||||
- send_krpc_query((host, port), method, **kwargs)
|
||||
This method sends a query to a remote host specified by a (host, pool) tuple.
|
||||
The name and arguments to call on the remote host is given as well.
|
||||
An async result holder is returned, that allows to wait for a reply.
|
|
@ -0,0 +1,129 @@
|
|||
# The contents of this file are subject to the BitTorrent Open Source License
|
||||
# Version 1.1 (the License). You may not copy or use this file, in either
|
||||
# source code or executable form, except in compliance with the License. You
|
||||
# may obtain a copy of the License at http://www.bittorrent.com/license/.
|
||||
#
|
||||
# Software distributed under the License is distributed on an AS IS basis,
|
||||
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
|
||||
# for the specific language governing rights and limitations under the
|
||||
# License.
|
||||
|
||||
# Written by Petru Paler
|
||||
|
||||
class BTFailure(Exception):
|
||||
pass
|
||||
|
||||
def decode_int(x, f):
|
||||
f += 1
|
||||
newf = x.index('e', f)
|
||||
n = int(x[f:newf])
|
||||
if x[f] == '-':
|
||||
if x[f + 1] == '0':
|
||||
raise ValueError
|
||||
elif x[f] == '0' and newf != f+1:
|
||||
raise ValueError
|
||||
return (n, newf+1)
|
||||
|
||||
def decode_string(x, f):
|
||||
colon = x.index(':', f)
|
||||
n = int(x[f:colon])
|
||||
if x[f] == '0' and colon != f+1:
|
||||
raise ValueError
|
||||
colon += 1
|
||||
return (x[colon:colon+n], colon+n)
|
||||
|
||||
def decode_list(x, f):
|
||||
r, f = [], f+1
|
||||
while x[f] != 'e':
|
||||
v, f = decode_func[x[f]](x, f)
|
||||
r.append(v)
|
||||
return (r, f + 1)
|
||||
|
||||
def decode_dict(x, f):
|
||||
r, f = {}, f+1
|
||||
while x[f] != 'e':
|
||||
k, f = decode_string(x, f)
|
||||
r[k], f = decode_func[x[f]](x, f)
|
||||
return (r, f + 1)
|
||||
|
||||
decode_func = {}
|
||||
decode_func['l'] = decode_list
|
||||
decode_func['d'] = decode_dict
|
||||
decode_func['i'] = decode_int
|
||||
decode_func['0'] = decode_string
|
||||
decode_func['1'] = decode_string
|
||||
decode_func['2'] = decode_string
|
||||
decode_func['3'] = decode_string
|
||||
decode_func['4'] = decode_string
|
||||
decode_func['5'] = decode_string
|
||||
decode_func['6'] = decode_string
|
||||
decode_func['7'] = decode_string
|
||||
decode_func['8'] = decode_string
|
||||
decode_func['9'] = decode_string
|
||||
|
||||
def bdecode(x):
|
||||
try:
|
||||
r, l = decode_func[x[0]](x, 0)
|
||||
except (IndexError, KeyError, ValueError):
|
||||
raise BTFailure("not a valid bencoded string")
|
||||
if l != len(x):
|
||||
raise BTFailure("invalid bencoded value (data after valid prefix)")
|
||||
return r
|
||||
|
||||
from types import StringType, IntType, LongType, DictType, ListType, TupleType
|
||||
|
||||
|
||||
class Bencached(object):
|
||||
__slots__ = ['bencoded']
|
||||
def __init__(self, s):
|
||||
self.bencoded = s
|
||||
|
||||
def encode_bencached(x,r):
|
||||
r.append(x.bencoded)
|
||||
|
||||
def encode_int(x, r):
|
||||
r.extend(('i', str(x), 'e'))
|
||||
|
||||
def encode_bool(x, r):
|
||||
if x:
|
||||
encode_int(1, r)
|
||||
else:
|
||||
encode_int(0, r)
|
||||
|
||||
def encode_string(x, r):
|
||||
r.extend((str(len(x)), ':', x))
|
||||
|
||||
def encode_list(x, r):
|
||||
r.append('l')
|
||||
for i in x:
|
||||
encode_func[type(i)](i, r)
|
||||
r.append('e')
|
||||
|
||||
def encode_dict(x,r):
|
||||
r.append('d')
|
||||
ilist = x.items()
|
||||
ilist.sort()
|
||||
for k, v in ilist:
|
||||
r.extend((str(len(k)), ':', k))
|
||||
encode_func[type(v)](v, r)
|
||||
r.append('e')
|
||||
|
||||
encode_func = {}
|
||||
encode_func[Bencached] = encode_bencached
|
||||
encode_func[IntType] = encode_int
|
||||
encode_func[LongType] = encode_int
|
||||
encode_func[StringType] = encode_string
|
||||
encode_func[ListType] = encode_list
|
||||
encode_func[TupleType] = encode_list
|
||||
encode_func[DictType] = encode_dict
|
||||
|
||||
try:
|
||||
from types import BooleanType
|
||||
encode_func[BooleanType] = encode_bool
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
def bencode(x):
|
||||
r = []
|
||||
encode_func[type(x)](x, r)
|
||||
return ''.join(r)
|
|
@ -0,0 +1,144 @@
|
|||
"""
|
||||
The MIT License
|
||||
|
||||
Copyright (c) 2014-2015 Fred Stober
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
"""
|
||||
|
||||
import time, socket, select, threading, logging
|
||||
from bencode import bencode, bdecode
|
||||
from utils import client_version, start_thread, AsyncResult, AsyncTimeout, encode_int
|
||||
|
||||
krpc_version = client_version[0] + chr(client_version[1]) + chr(client_version[2])
|
||||
|
||||
class KRPCError(RuntimeError):
|
||||
pass
|
||||
|
||||
class KRPCPeer(object):
|
||||
def __init__(self, connection, handle_query, cleanup_timeout = 60, cleanup_interval = 10):
|
||||
""" Start listening on the connection given by (addr, port)
|
||||
Incoming messages are given to the handle_query function,
|
||||
with arguments (send_krpc_response, rec).
|
||||
send_krpc_response(**kwargs) is a function to send a reply,
|
||||
rec contains the dictionary with the incoming message.
|
||||
"""
|
||||
self._log = logging.getLogger(self.__class__.__name__)
|
||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self._sock.setblocking(0)
|
||||
self._sock.bind(connection)
|
||||
|
||||
self._transaction = {}
|
||||
self._transaction_id = 0
|
||||
self._transaction_lock = threading.Lock()
|
||||
self._handle_query = handle_query
|
||||
|
||||
self._shutdown_flag = False
|
||||
self._listen_thread = start_thread(self._listen)
|
||||
self._cleanup_thread = start_thread(self._cleanup_transactions,
|
||||
timeout = cleanup_timeout, interval = cleanup_interval)
|
||||
|
||||
def shutdown(self):
|
||||
""" This function allows to cleanly shutdown the KRPCPeer. """
|
||||
self._shutdown_flag = True
|
||||
|
||||
def _cleanup_transactions(self, timeout = 60, interval = 10):
|
||||
while not self._shutdown_flag:
|
||||
# Remove transactions older than 1min
|
||||
with self._transaction_lock:
|
||||
timeout_transactions = list(filter(lambda t: self._transaction[t].get_age() > timeout, self._transaction))
|
||||
if self._log.isEnabledFor(logging.DEBUG):
|
||||
self._log.debug('Transactions: %d id=%d timeout=%d' % (len(self._transaction), self._transaction_id, len(timeout_transactions)))
|
||||
for t in timeout_transactions:
|
||||
self._transaction.pop(t).set_result(AsyncTimeout('Transaction %r: timeout' % t))
|
||||
time.sleep(interval)
|
||||
|
||||
def _listen(self):
|
||||
while not self._shutdown_flag:
|
||||
try:
|
||||
if select.select([self._sock], [], [], 10)[0]:
|
||||
(encoded_rec, source_connection) = self._sock.recvfrom(64*1024)
|
||||
try:
|
||||
rec = bdecode(encoded_rec)
|
||||
except:
|
||||
self._log.exception('Exception while parsing KRPC requests from %r:\n\t%r' % (source_connection, encoded_rec))
|
||||
continue
|
||||
if rec['y'] in ['r', 'e']: # Response / Error message
|
||||
t = rec['t']
|
||||
if rec['y'] == 'e':
|
||||
if self._log.isEnabledFor(logging.DEBUG):
|
||||
self._log.debug('KRPC error message from %r:\n\t%r' % (source_connection, rec))
|
||||
rec = KRPCError('Error while processing transaction %r:\n\t%r' % (t, rec))
|
||||
else:
|
||||
if self._log.isEnabledFor(logging.DEBUG):
|
||||
self._log.debug('KRPC answer from %r:\n\t%r' % (source_connection, rec))
|
||||
with self._transaction_lock:
|
||||
if self._transaction.get(t):
|
||||
self._transaction.pop(t).set_result(rec, source = source_connection)
|
||||
elif self._log.isEnabledFor(logging.INFO):
|
||||
self._log.info('Received response from %r without associated transaction:\n%r' % (source_connection, rec))
|
||||
elif rec['y'] == 'q':
|
||||
if self._log.isEnabledFor(logging.DEBUG):
|
||||
self._log.debug('KRPC request from %r:\n\t%r' % (source_connection, rec))
|
||||
send_krpc_response = lambda message, top_level_message = {}:\
|
||||
self._send_krpc_response(source_connection, rec.pop('t'), message, top_level_message)
|
||||
self._handle_query(send_krpc_response, rec, source_connection)
|
||||
else:
|
||||
if self._log.isEnabledFor(logging.INFO):
|
||||
self._log.info('Unknown type of KRPC message from %r:\n\t%r' % (source_connection, rec))
|
||||
except Exception:
|
||||
self._log.exception('Exception while handling KRPC requests from %r:\n\t%r' % (source_connection, rec))
|
||||
self._sock.close()
|
||||
|
||||
def _send_krpc_response(self, source_connection, remote_transaction, message, top_level_message = {}):
|
||||
with self._transaction_lock:
|
||||
resp = {'y': 'r', 't': remote_transaction, 'v': krpc_version, 'r': message}
|
||||
resp.update(top_level_message)
|
||||
if self._log.isEnabledFor(logging.DEBUG):
|
||||
self._log.debug('KRPC response to %r:\n\t%r' % (source_connection, resp))
|
||||
self._sock.sendto(bencode(resp), source_connection)
|
||||
|
||||
def send_krpc_query(self, target_connection, method, **kwargs):
|
||||
""" Invoke method on the node at target_connection.
|
||||
The arguments for the method are given in kwargs.
|
||||
Returns an AsyncResult (waitable) that will
|
||||
eventually contain the peer response.
|
||||
"""
|
||||
target_connection = (socket.gethostbyname(target_connection[0]), target_connection[1])
|
||||
with self._transaction_lock:
|
||||
while True: # Generate transaction id
|
||||
self._transaction_id += 1
|
||||
local_transaction = encode_int(self._transaction_id).lstrip('\x00')
|
||||
if local_transaction not in self._transaction:
|
||||
break
|
||||
req = {'y': 'q', 't': local_transaction, 'v': krpc_version, 'q': method, 'a': kwargs}
|
||||
self._transaction[local_transaction] = AsyncResult(source = (method, kwargs, target_connection))
|
||||
if self._log.isEnabledFor(logging.DEBUG):
|
||||
self._log.debug('KRPC request to %r:\n\t%r' % (target_connection, req))
|
||||
self._sock.sendto(bencode(req), target_connection)
|
||||
return self._transaction[local_transaction]
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig()
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
# Implement an echo message
|
||||
peer = KRPCPeer(('0.0.0.0', 1111), handle_query = lambda send_krpc_response, rec, source_connection:
|
||||
send_krpc_response(message = 'Hello %s!' % rec['a']['message']))
|
||||
print peer.send_krpc_query(('localhost', 1111), 'echo', message = 'World').get_result(2)
|
|
@ -0,0 +1,94 @@
|
|||
"""
|
||||
The MIT License
|
||||
|
||||
Copyright (c) 2014 Fred Stober
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
"""
|
||||
|
||||
import socket, struct, threading, time
|
||||
|
||||
client_version = ('XK', 0, 0x01) # eXperimental Klient 0.0.1
|
||||
|
||||
encode_short = lambda value: struct.pack('!H', value)
|
||||
encode_int = lambda value: struct.pack('!I', value)
|
||||
encode_ip = lambda value: socket.inet_aton(value)
|
||||
|
||||
def encode_connection(con):
|
||||
return encode_ip(con[0]) + encode_short(con[1])
|
||||
|
||||
def encode_nodes(nodes):
|
||||
result = ''
|
||||
for node in nodes:
|
||||
result += struct.pack('20s', node.id) + encode_connection(node.connection)
|
||||
return result
|
||||
|
||||
decode_short = lambda value: struct.unpack('!H', value)[0]
|
||||
decode_int = lambda value: struct.unpack('!I', value)[0]
|
||||
decode_ip = lambda value: socket.inet_ntoa(value)
|
||||
|
||||
def decode_connection(con):
|
||||
return (decode_ip(con[0:4]), decode_short(con[4:6]))
|
||||
|
||||
def decode_nodes(nodes):
|
||||
while nodes:
|
||||
node_id = struct.unpack('20s', nodes[:20])[0]
|
||||
node_connection = decode_connection(nodes[20:26])
|
||||
yield (node_id, node_connection)
|
||||
nodes = nodes[26:]
|
||||
|
||||
def start_thread(fun, *args, **kwargs):
|
||||
thread = threading.Thread(target=fun, args=args, kwargs=kwargs)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
return thread
|
||||
|
||||
class AsyncTimeout(RuntimeError):
|
||||
pass
|
||||
|
||||
class AsyncResult(object):
|
||||
def __init__(self, source = None):
|
||||
self._event = threading.Event()
|
||||
self._value = None
|
||||
self._source = source
|
||||
self._time = time.time()
|
||||
|
||||
def get_age(self):
|
||||
return time.time() - self._time
|
||||
|
||||
def discard_result(self):
|
||||
self._time = 0
|
||||
|
||||
def set_result(self, result, source = None):
|
||||
self._value = result
|
||||
self._source = source
|
||||
self._event.set()
|
||||
|
||||
def has_result(self):
|
||||
return self._event.is_set()
|
||||
|
||||
def get_source(self):
|
||||
return self._source
|
||||
|
||||
def get_result(self, timeout = None):
|
||||
if not self._event.wait(timeout):
|
||||
raise AsyncTimeout
|
||||
if isinstance(self._value, Exception):
|
||||
raise self._value
|
||||
return self._value
|
Loading…
Reference in New Issue