Merge branch 'dev'

dev
7sDream 2018-04-29 00:09:44 +08:00
commit cb06fe00d8
No known key found for this signature in database
GPG Key ID: 72A6D9FCEDDAB75D
8 changed files with 217 additions and 22 deletions

View File

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
### Added
- `TorrentFileCreator` class and `create_torrent_file` shortcut function for write back data to a torrent file
## [0.1.4] - 2018-04-06
### Added

View File

@ -1,5 +1,9 @@
# Torrent file parser for Python
A simple parser for `.torrent` file.
Can also edit and write back to torrent format after version 0.1.5.
## Install
```
@ -32,6 +36,8 @@ $ cat test.torrent | pytp
>>> data = tp.parse_torrent_file('test.torrent')
>>> print(data['announce'])
http://tracker.trackerfix.com:80/announce
>>> data['announce'] = 'http://127.0.0.1:12345'
>>> tp.create_torrent_file('new.torrent', data)
```
## Test

View File

@ -1 +1,2 @@
from .test_all import *
from .test_create import *
from .test_parse import *

View File

@ -0,0 +1,31 @@
from __future__ import unicode_literals
import collections
import hashlib
import io
import os.path
import unittest
from torrent_parser import TorrentFileParser, TorrentFileCreator
class TestCreate(unittest.TestCase):
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), 'test_files')
REAL_FILE = os.path.join(TEST_FILES_DIR, 'real.torrent')
def test_simple_create(self):
data = collections.OrderedDict()
data['a'] = 1
data['b'] = 2
self.assertEqual(TorrentFileCreator(data).encode(), b'd1:ai1e1:bi2ee')
def test_same_output_if_no_edit(self):
with open(self.REAL_FILE, 'rb') as fp:
in_data = fp.read()
data = TorrentFileParser(io.BytesIO(in_data), True).parse()
out_data = TorrentFileCreator(data).encode()
m1 = hashlib.md5()
m1.update(in_data)
m2 = hashlib.md5()
m2.update(out_data)
self.assertEqual(m1.digest(), m2.digest())

View File

@ -1,14 +1,16 @@
from __future__ import unicode_literals
import unittest
import collections
import os.path
import unittest
from torrent_parser import TorrentFileParser, parse_torrent_file
class Test(unittest.TestCase):
REAL_FILE = 'tests/testfiles/real.torrent'
NEG_FILE = 'tests/testfiles/neg.torrent'
class TestParse(unittest.TestCase):
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), 'test_files')
REAL_FILE = os.path.join(TEST_FILES_DIR, 'real.torrent')
NEG_FILE = os.path.join(TEST_FILES_DIR, 'neg.torrent')
def test_parse_torrent_file_use_shortcut(self):
parse_torrent_file(self.REAL_FILE)
@ -53,4 +55,4 @@ class Test(unittest.TestCase):
if __name__ == '__main__':
unittest.main()
unittest.main()

View File

@ -12,11 +12,26 @@ Usage:
with open(filename, 'rb') as f: # the binary mode 'b' is necessary
data = TorrentFileParser(f).parse()
# then you can edit the data
data['announce-list'].append(['http://127.0.0.1:8080'])
# and create a new torrent file from data
create_torrent_file('new.torrent', data)
# or
with open('new.torrent', 'wb') as f:
f.write(TorrentFileCreator(data).encode())
"""
from __future__ import print_function, unicode_literals
import argparse
import binascii
import collections
import io
import json
@ -38,9 +53,19 @@ except ImportError:
warnings.warn("No chardet module installed, encoding will be utf-8")
return {'encoding': 'utf-8', 'confidence': 1}
try:
# noinspection PyUnresolvedReferences
# For Python 2
str_type = unicode
except NameError:
# For Python 3
str_type = str
__all__ = [
'InvalidTorrentDataException',
'create_torrent_file',
'parse_torrent_file',
'InvalidTorrentDataException',
'TorrentFileCreator',
'TorrentFileParser',
]
@ -78,6 +103,14 @@ class TorrentFileParser(object):
INT_INDICATOR = b'i'
END_INDICATOR = b'e'
STRING_INDICATOR = b''
STRING_DELIMITER = b':'
RAW_FIELD_PARAMS = {
# field length need_list
'pieces': (20, True),
'ed2k': (16, False),
'filehash': (20, False),
}
TYPES = [
(TYPE_LIST, LIST_INDICATOR),
@ -153,12 +186,9 @@ class TorrentFileParser(object):
k = self._next_element()
if k is _END:
return
if k == 'pieces':
v = self._next_hash()
elif k == 'ed2k':
v = self._next_hash(16, False)
elif k == 'filehash':
v = self._next_hash(20, False)
if k in self.RAW_FIELD_PARAMS:
length, need_list = self.RAW_FIELD_PARAMS[k]
v = self._next_hash(length, need_list)
else:
v = self._next_element()
if k == 'encoding':
@ -196,7 +226,7 @@ class TorrentFileParser(object):
return -value if neg else value
def _next_string(self, decode=True):
length = self._next_int(b':')
length = self._next_int(self.STRING_DELIMITER)
raw = self._read_byte(length)
if decode:
encoding = self._encoding
@ -212,19 +242,15 @@ class TorrentFileParser(object):
return string
return raw
@staticmethod
def __to_hex(v):
return hex(ord(v) if isinstance(v, str) else v)[2:].rjust(2, str(0))
def _next_hash(self, p_len=20, need_list=True):
def _next_hash(self, p_len, need_list):
raw = self._next_string(decode=False)
if len(raw) % p_len != 0:
raise InvalidTorrentDataException(
self._pos - len(raw), "Hash bit length not match at pos {pos}"
)
res = [
''.join([self.__to_hex(c) for c in h])
for h in (raw[x:x+p_len] for x in range(0, len(raw), p_len))
binascii.hexlify(chunk).decode('ascii')
for chunk in (raw[x:x+p_len] for x in range(0, len(raw), p_len))
]
if len(res) == 0 and not need_list:
return ''
@ -254,9 +280,122 @@ class TorrentFileParser(object):
return element
class TorrentFileCreator(object):
TYPES = {
(dict,): TorrentFileParser.TYPE_DICT,
(list,): TorrentFileParser.TYPE_LIST,
(int,): TorrentFileParser.TYPE_INT,
(str_type,): TorrentFileParser.TYPE_STRING,
}
def __init__(self, data, encoding='utf-8'):
"""
:param data: torrent data, must be a dict or OrderedDict
:param encoding: string field output encoding
"""
if not isinstance(data, dict):
raise InvalidTorrentDataException(
None,
"Top level structure should be a dict"
)
self._data = data
self._encoding = encoding
def encode(self):
"""
Encode data to bytes that conform to torrent file format
"""
return b''.join(self._output_element(self._data))
def encode_to_readable(self):
"""
Encode data to a file-like(BytesIO) object which contains the result of
`TorrentFileCreator.encode()`
"""
return io.BytesIO(self.encode())
def _output_string(self, data):
if isinstance(data, str_type):
data = data.encode(self._encoding)
yield str(len(data)).encode('ascii')
yield TorrentFileParser.STRING_DELIMITER
yield data
@staticmethod
def _output_int(data):
yield TorrentFileParser.INT_INDICATOR
yield str(data).encode('ascii')
yield TorrentFileParser.END_INDICATOR
def _output_decode_hash(self, data):
if isinstance(data, str_type):
data = [data]
result = []
for hash_line in data:
if not isinstance(hash_line, str_type):
raise InvalidTorrentDataException(
None,
"Hash must be " + str_type.__name__ + " not " +
type(hash_line).__name__,
)
if len(hash_line) % 2 != 0:
raise InvalidTorrentDataException(
None,
"Hash(" + hash_line + ") length(" + str(len(hash_line)) +
") is a not even number",
)
try:
raw = binascii.unhexlify(hash_line)
except binascii.Error as e:
raise InvalidTorrentDataException(
None, str(e),
)
result.append(raw)
for x in self._output_string(b''.join(result)):
yield x
def _output_dict(self, data):
yield TorrentFileParser.DICT_INDICATOR
for k, v in data.items():
if not isinstance(k, str_type):
raise InvalidTorrentDataException(
None, "Dict key must be " + str_type.__name__,
)
for x in self._output_element(k):
yield x
if k in TorrentFileParser.RAW_FIELD_PARAMS:
for x in self._output_decode_hash(v):
yield x
else:
for x in self._output_element(v):
yield x
yield TorrentFileParser.END_INDICATOR
def _output_list(self, data):
yield TorrentFileParser.LIST_INDICATOR
for v in data:
for x in self._output_element(v):
yield x
yield TorrentFileParser.END_INDICATOR
def _type_to_func(self, t):
return getattr(self, '_output_' + t)
def _output_element(self, data):
for types, t in self.TYPES.items():
if isinstance(data, types):
# noinspection PyCallingNonCallable
return self._type_to_func(t)(data)
raise InvalidTorrentDataException(
None,
"Invalid type for torrent file: " + type(data).__name__,
)
def parse_torrent_file(filename, use_ordered_dict=False):
"""
Shortcut function for parse torrent object use TorrentFileParser
Shortcut function for parse torrent object using TorrentFileParser
:param string filename: torrent filename
:param bool use_ordered_dict: see :any:`TorrentFileParser.__init__`
@ -267,6 +406,18 @@ def parse_torrent_file(filename, use_ordered_dict=False):
return TorrentFileParser(f, use_ordered_dict).parse()
def create_torrent_file(filename, data, encoding='utf-8'):
"""
Shortcut function for create a torrent file using TorrentFileCreator
:param filename: output torrent filename
:param data: torrent data, must be a dict or OrderedDict
:param encoding: string field output encoding
"""
with open(filename, 'wb') as f:
f.write(TorrentFileCreator(data, encoding).encode())
def __main():
parser = argparse.ArgumentParser()
parser.add_argument('file', nargs='?', default='',