land dev
commit
1bd845af2d
18
CHANGELOG.md
18
CHANGELOG.md
|
@ -7,6 +7,22 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
|
||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
- Add `errors` option in `TorrentFileParser` and `parse_torrent_file` to let user set the encoding error handler. (Thanks [@yasuotakei](https://github.com/yasuotakei))
|
||||||
|
- Add `-e`/`--error` to CLI option to set the `errors` option of `parse_torrent_file`.
|
||||||
|
- `BDecoder` class and `decode` shortcut function to directly decode bytes.
|
||||||
|
- `decode` shortcut function to directly encode data to bytes.
|
||||||
|
- Added `hash_fields` parameter and method to customize hash field list.
|
||||||
|
- Added `hash_raw` parameter to let all hash field be parsed as raw bytes.
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- **BreakChange** `TorrentFileCreator` rename to `BEncoder` as the origin name don't describe its function.
|
||||||
|
- `TorrentFileParser` don't need the outmost level of parsed data to be a `dict` now.
|
||||||
|
- `BEncoder` don't need the outmost level of encoded data to be a `dict` now.
|
||||||
|
- `BEncoder` now support encode raw bytes.
|
||||||
|
|
||||||
## [0.2.0] - 2018.5.25
|
## [0.2.0] - 2018.5.25
|
||||||
|
|
||||||
### Change
|
### Change
|
||||||
|
@ -17,7 +33,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
- `TorrentFileCreator` class and `create_torrent_file` shortcut function for write back data to a torrent file
|
- `TorrentFileCreator` class and `create_torrent_file` shortcut function for write back data to a torrent file.
|
||||||
|
|
||||||
## [0.1.4] - 2018-04-06
|
## [0.1.4] - 2018-04-06
|
||||||
|
|
||||||
|
|
2
LICENSE
2
LICENSE
|
@ -1,6 +1,6 @@
|
||||||
The MIT License (MIT)
|
The MIT License (MIT)
|
||||||
|
|
||||||
Copyright (c) 2017 7sDream
|
Copyright (c) 2017 - 2018 7sDream
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
|
|
@ -1,3 +1 @@
|
||||||
include README.md LICENSE CHANGELOG.md
|
include README.md LICENSE CHANGELOG.md
|
||||||
include test.py
|
|
||||||
include test.torrent
|
|
||||||
|
|
23
README.md
23
README.md
|
@ -4,6 +4,14 @@ A simple parser for `.torrent` file.
|
||||||
|
|
||||||
Can also edit and write back to torrent format after version 0.2.0.
|
Can also edit and write back to torrent format after version 0.2.0.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- Decoder and encoder for torrent files
|
||||||
|
- Auto decode bytes field to string with used specified encoding and error handler
|
||||||
|
- Auto detect encoding when use `auto` as encoding(need `chardet` installed)
|
||||||
|
- Auto decode hash value filed to hash blocks, also customizable
|
||||||
|
- CLI provided, with JSON output
|
||||||
|
|
||||||
## Install
|
## Install
|
||||||
|
|
||||||
```
|
```
|
||||||
|
@ -34,12 +42,23 @@ $ cat test.torrent | pytp
|
||||||
```pycon
|
```pycon
|
||||||
>>> import torrent_parser as tp
|
>>> import torrent_parser as tp
|
||||||
>>> data = tp.parse_torrent_file('test.torrent')
|
>>> data = tp.parse_torrent_file('test.torrent')
|
||||||
>>> print(data['announce'])
|
>>> data['announce']
|
||||||
http://tracker.trackerfix.com:80/announce
|
http://tracker.trackerfix.com:80/announce
|
||||||
>>> data['announce'] = 'http://127.0.0.1:12345'
|
>>> data['announce'] = 'http://127.0.0.1:12345'
|
||||||
>>> tp.create_torrent_file('new.torrent', data)
|
>>> tp.create_torrent_file('new.torrent', data)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
or you don't operate with file, just raw bytes:
|
||||||
|
|
||||||
|
```pycon
|
||||||
|
>>> import torrent_parser as tp
|
||||||
|
>>> data = tp.decode(b'd3:negi-1ee')
|
||||||
|
>>> data['neg']
|
||||||
|
-1
|
||||||
|
>>> tp.encode(data)
|
||||||
|
b'd3:negi-1ee'
|
||||||
|
```
|
||||||
|
|
||||||
## Test
|
## Test
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
@ -58,4 +77,4 @@ See [License][LICENSE].
|
||||||
[screenshots-normal]: http://rikka-10066868.image.myqcloud.com/1492616d-9f14-4fe2-9146-9a3ac06c6868.png
|
[screenshots-normal]: http://rikka-10066868.image.myqcloud.com/1492616d-9f14-4fe2-9146-9a3ac06c6868.png
|
||||||
[screenshots-indent]: http://rikka-10066868.image.myqcloud.com/eadc4184-6deb-42eb-bfd4-239da8f50c08.png
|
[screenshots-indent]: http://rikka-10066868.image.myqcloud.com/eadc4184-6deb-42eb-bfd4-239da8f50c08.png
|
||||||
[LICENSE]: https://github.com/7sDream/torrent_parser/blob/master/LICENSE
|
[LICENSE]: https://github.com/7sDream/torrent_parser/blob/master/LICENSE
|
||||||
[CHANGELOG]: https://github.com/7sDream/torrent_parser/blob/master/CHANGELOG.md
|
[CHANGELOG]: https://github.com/7sDream/torrent_parser/blob/master/CHANGELOG.md
|
||||||
|
|
|
@ -1,2 +1,7 @@
|
||||||
from .test_create import *
|
from .test_create import *
|
||||||
from .test_parse import *
|
from .test_parse import *
|
||||||
|
from .test_encoding_error import *
|
||||||
|
from .test_encode import *
|
||||||
|
from .test_decode import *
|
||||||
|
from .test_hash_field import *
|
||||||
|
from .test_hash_raw import *
|
|
@ -6,7 +6,7 @@ import io
|
||||||
import os.path
|
import os.path
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from torrent_parser import TorrentFileParser, TorrentFileCreator
|
from torrent_parser import TorrentFileParser, BEncoder
|
||||||
|
|
||||||
|
|
||||||
class TestCreate(unittest.TestCase):
|
class TestCreate(unittest.TestCase):
|
||||||
|
@ -17,15 +17,19 @@ class TestCreate(unittest.TestCase):
|
||||||
data = collections.OrderedDict()
|
data = collections.OrderedDict()
|
||||||
data['a'] = 1
|
data['a'] = 1
|
||||||
data['b'] = 2
|
data['b'] = 2
|
||||||
self.assertEqual(TorrentFileCreator(data).encode(), b'd1:ai1e1:bi2ee')
|
self.assertEqual(BEncoder(data).encode(), b'd1:ai1e1:bi2ee')
|
||||||
|
|
||||||
def test_same_output_if_no_edit(self):
|
def test_same_output_if_no_edit(self):
|
||||||
with open(self.REAL_FILE, 'rb') as fp:
|
with open(self.REAL_FILE, 'rb') as fp:
|
||||||
in_data = fp.read()
|
in_data = fp.read()
|
||||||
data = TorrentFileParser(io.BytesIO(in_data), True).parse()
|
data = TorrentFileParser(io.BytesIO(in_data), True).parse()
|
||||||
out_data = TorrentFileCreator(data).encode()
|
out_data = BEncoder(data).encode()
|
||||||
m1 = hashlib.md5()
|
m1 = hashlib.md5()
|
||||||
m1.update(in_data)
|
m1.update(in_data)
|
||||||
m2 = hashlib.md5()
|
m2 = hashlib.md5()
|
||||||
m2.update(out_data)
|
m2.update(out_data)
|
||||||
self.assertEqual(m1.digest(), m2.digest())
|
self.assertEqual(m1.digest(), m2.digest())
|
||||||
|
|
||||||
|
def test_dont_need_dict_outmost(self):
|
||||||
|
data = 123456
|
||||||
|
self.assertEqual(BEncoder(data).encode(), b'i123456e')
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from torrent_parser import decode
|
||||||
|
|
||||||
|
|
||||||
|
class TestDecode(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_decode(self):
|
||||||
|
self.assertEqual(decode(b'i12345e'), 12345)
|
|
@ -0,0 +1,11 @@
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from torrent_parser import encode
|
||||||
|
|
||||||
|
|
||||||
|
class TestEncode(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_encode(self):
|
||||||
|
self.assertEqual(encode(12345), b'i12345e')
|
|
@ -0,0 +1,25 @@
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import os.path
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from torrent_parser import (
|
||||||
|
TorrentFileParser, parse_torrent_file, InvalidTorrentDataException
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDecodingError(unittest.TestCase):
|
||||||
|
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), 'test_files')
|
||||||
|
FILE = os.path.join(TEST_FILES_DIR, 'utf8.encoding.error.torrent')
|
||||||
|
|
||||||
|
def test_default_option_will_raise_exception(self):
|
||||||
|
with self.assertRaises(InvalidTorrentDataException):
|
||||||
|
parse_torrent_file(self.FILE)
|
||||||
|
with self.assertRaises(InvalidTorrentDataException):
|
||||||
|
with open(self.FILE, 'rb') as f:
|
||||||
|
TorrentFileParser(f).parse()
|
||||||
|
|
||||||
|
def test_not_raise_exception_when_use_ignore(self):
|
||||||
|
parse_torrent_file(self.FILE, errors='ignore')
|
||||||
|
with open(self.FILE, 'rb') as f:
|
||||||
|
TorrentFileParser(f, errors='ignore').parse()
|
|
@ -0,0 +1 @@
|
||||||
|
8:announce
|
Binary file not shown.
|
@ -0,0 +1,21 @@
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import os.path
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from torrent_parser import (
|
||||||
|
TorrentFileParser, parse_torrent_file, decode
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestHashField(unittest.TestCase):
|
||||||
|
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), 'test_files')
|
||||||
|
FILE = os.path.join(TEST_FILES_DIR, 'utf8.encoding.error.torrent')
|
||||||
|
|
||||||
|
def test_not_raise_exception_when_add_hash_fields(self):
|
||||||
|
parse_torrent_file(self.FILE, hash_fields={'info_hash': (20, False)})
|
||||||
|
with open(self.FILE, 'rb') as f:
|
||||||
|
TorrentFileParser(f).hash_field('info_hash').parse()
|
||||||
|
with open(self.FILE, 'rb') as f:
|
||||||
|
data = f.read()
|
||||||
|
decode(data, hash_fields={'info_hash': (20, False)})
|
|
@ -0,0 +1,23 @@
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import os.path
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from torrent_parser import decode, encode
|
||||||
|
|
||||||
|
|
||||||
|
class TestHashRaw(unittest.TestCase):
|
||||||
|
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), 'test_files')
|
||||||
|
FILE = os.path.join(TEST_FILES_DIR, 'utf8.encoding.error.torrent')
|
||||||
|
|
||||||
|
def test_hash_raw_decode(self):
|
||||||
|
data = b'd4:hash4:\xAA\xBB\xCC\xDDe'
|
||||||
|
res = decode(data, hash_fields={'hash': (4, False)}, hash_raw=False)
|
||||||
|
self.assertEqual(res['hash'], 'aabbccdd')
|
||||||
|
res = decode(data, hash_fields={'hash': (4, False)}, hash_raw=True)
|
||||||
|
self.assertEqual(res['hash'], b'\xAA\xBB\xCC\xDD')
|
||||||
|
|
||||||
|
def test_raw_bytes_encode(self):
|
||||||
|
res = {'hash': b'\xAA\xBB\xCC\xDD'}
|
||||||
|
data = encode(res)
|
||||||
|
self.assertEqual(data, b'd4:hash4:\xAA\xBB\xCC\xDDe')
|
|
@ -11,6 +11,7 @@ class TestParse(unittest.TestCase):
|
||||||
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), 'test_files')
|
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), 'test_files')
|
||||||
REAL_FILE = os.path.join(TEST_FILES_DIR, 'real.torrent')
|
REAL_FILE = os.path.join(TEST_FILES_DIR, 'real.torrent')
|
||||||
NEG_FILE = os.path.join(TEST_FILES_DIR, 'neg.torrent')
|
NEG_FILE = os.path.join(TEST_FILES_DIR, 'neg.torrent')
|
||||||
|
STRING_FILE = os.path.join(TEST_FILES_DIR, 'outmost.string.torrent')
|
||||||
|
|
||||||
def test_parse_torrent_file_use_shortcut(self):
|
def test_parse_torrent_file_use_shortcut(self):
|
||||||
parse_torrent_file(self.REAL_FILE)
|
parse_torrent_file(self.REAL_FILE)
|
||||||
|
@ -53,6 +54,10 @@ class TestParse(unittest.TestCase):
|
||||||
data = parse_torrent_file(self.NEG_FILE)
|
data = parse_torrent_file(self.NEG_FILE)
|
||||||
self.assertEqual(data['neg'], -1)
|
self.assertEqual(data['neg'], -1)
|
||||||
|
|
||||||
|
def test_dont_need_dict_outmost(self):
|
||||||
|
data = parse_torrent_file(self.STRING_FILE)
|
||||||
|
self.assertEqual(data, 'announce')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
|
@ -26,6 +26,11 @@ Usage:
|
||||||
with open('new.torrent', 'wb') as f:
|
with open('new.torrent', 'wb') as f:
|
||||||
f.write(TorrentFileCreator(data).encode())
|
f.write(TorrentFileCreator(data).encode())
|
||||||
|
|
||||||
|
# or you don't deal with file, just object in memory
|
||||||
|
|
||||||
|
data = decode(b'i12345e') # data = 12345
|
||||||
|
content = encode(data) # content = b'i12345e'
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import print_function, unicode_literals
|
from __future__ import print_function, unicode_literals
|
||||||
|
@ -62,11 +67,14 @@ except NameError:
|
||||||
str_type = str
|
str_type = str
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
'InvalidTorrentDataException',
|
||||||
|
'BEncoder',
|
||||||
|
'BDecoder',
|
||||||
|
'encode',
|
||||||
|
'decode',
|
||||||
|
'TorrentFileParser',
|
||||||
'create_torrent_file',
|
'create_torrent_file',
|
||||||
'parse_torrent_file',
|
'parse_torrent_file',
|
||||||
'InvalidTorrentDataException',
|
|
||||||
'TorrentFileCreator',
|
|
||||||
'TorrentFileParser',
|
|
||||||
]
|
]
|
||||||
|
|
||||||
__version__ = '0.2.0'
|
__version__ = '0.2.0'
|
||||||
|
@ -90,6 +98,12 @@ class __EndCls(object):
|
||||||
_END = __EndCls()
|
_END = __EndCls()
|
||||||
|
|
||||||
|
|
||||||
|
def _check_hash_field_params(name, value):
|
||||||
|
return isinstance(name, str_type) \
|
||||||
|
and isinstance(value, tuple) and len(value) == 2 \
|
||||||
|
and isinstance(value[0], int) and isinstance(value[1], bool)
|
||||||
|
|
||||||
|
|
||||||
class TorrentFileParser(object):
|
class TorrentFileParser(object):
|
||||||
|
|
||||||
TYPE_LIST = 'list'
|
TYPE_LIST = 'list'
|
||||||
|
@ -105,7 +119,7 @@ class TorrentFileParser(object):
|
||||||
STRING_INDICATOR = b''
|
STRING_INDICATOR = b''
|
||||||
STRING_DELIMITER = b':'
|
STRING_DELIMITER = b':'
|
||||||
|
|
||||||
RAW_FIELD_PARAMS = {
|
HASH_FIELD_PARAMS = {
|
||||||
# field length need_list
|
# field length need_list
|
||||||
'pieces': (20, True),
|
'pieces': (20, True),
|
||||||
'ed2k': (16, False),
|
'ed2k': (16, False),
|
||||||
|
@ -120,29 +134,69 @@ class TorrentFileParser(object):
|
||||||
(TYPE_STRING, STRING_INDICATOR),
|
(TYPE_STRING, STRING_INDICATOR),
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, fp, use_ordered_dict=False, encoding='utf-8'):
|
def __init__(
|
||||||
|
self, fp, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
||||||
|
hash_fields=None, hash_raw=False,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
:param fp: a **binary** file-like object to parse,
|
:param fp: a **binary** file-like object to parse,
|
||||||
which means need 'b' mode when use built-in open function
|
which means need 'b' mode when use built-in open function
|
||||||
:param encoding: file content encoding, default utf-8, use 'auto' to
|
:param bool use_ordered_dict: Use collections.OrderedDict as dict
|
||||||
enable charset auto detection ('chardet' package should be installed)
|
container default False, which mean use built-in dict
|
||||||
:param use_ordered_dict: Use collections.OrderedDict as dict container
|
:param str encoding: file content encoding, default utf-8, use 'auto'
|
||||||
default False, which mean use built-in dict
|
to enable charset auto detection (need 'chardet' package installed)
|
||||||
|
:param str errors: how to deal with encoding error when try to parse
|
||||||
|
string from content with ``encoding``
|
||||||
|
:param Dict[str, Tuple[int, bool]] hash_fields: extra fields should
|
||||||
|
be treated as hash value. dict key is the field name, value is a
|
||||||
|
two-element tuple of (hash_block_length, as_a_list).
|
||||||
|
See :any:`hash_field` for detail
|
||||||
"""
|
"""
|
||||||
if getattr(fp, 'read', ) is None \
|
if getattr(fp, 'read', ) is None \
|
||||||
or getattr(fp, 'seek') is None:
|
or getattr(fp, 'seek') is None:
|
||||||
raise ValueError('Argument fp needs a file like object')
|
raise ValueError('Parameter fp needs a file like object')
|
||||||
|
|
||||||
self._pos = 0
|
self._pos = 0
|
||||||
self._encoding = encoding
|
self._encoding = encoding
|
||||||
self._content = fp
|
self._content = fp
|
||||||
self._use_ordered_dict = use_ordered_dict
|
self._use_ordered_dict = use_ordered_dict
|
||||||
|
self._error_handler = errors
|
||||||
|
self._hash_fields = dict(TorrentFileParser.HASH_FIELD_PARAMS)
|
||||||
|
if hash_fields is not None:
|
||||||
|
for k, v in hash_fields.items():
|
||||||
|
if _check_hash_field_params(k, v):
|
||||||
|
self._hash_fields[k] = v
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid hash field parameter, it should be type of "
|
||||||
|
"Dict[str, Tuple[int, bool]]"
|
||||||
|
)
|
||||||
|
self._hash_raw = bool(hash_raw)
|
||||||
|
|
||||||
|
def hash_field(self, name, block_length=20, need_list=False):
|
||||||
|
"""
|
||||||
|
Let field with the `name` to be treated as hash value, don't decode it
|
||||||
|
as a string.
|
||||||
|
|
||||||
|
:param str name: field name
|
||||||
|
:param int block_length: hash block length for split
|
||||||
|
:param bool need_list: if True, when the field only has one block(
|
||||||
|
or even empty) its parse result will be a one-element list(
|
||||||
|
or empty list); If False, will be a string in 0 or 1 block condition
|
||||||
|
:return: return self, so you can chained call
|
||||||
|
"""
|
||||||
|
v = (block_length, need_list)
|
||||||
|
if _check_hash_field_params(name, v):
|
||||||
|
self._hash_fields[name] = v
|
||||||
|
else:
|
||||||
|
raise ValueError("Invalid hash field parameter")
|
||||||
|
return self
|
||||||
|
|
||||||
def parse(self):
|
def parse(self):
|
||||||
"""
|
"""
|
||||||
:return: the parse result
|
:rtype: dict|list|int|str|bytes
|
||||||
:type: depends on ``use_ordered_dict`` option when init the parser
|
:raise: :any:`InvalidTorrentDataException` when parse failed or error
|
||||||
see :any:`TorrentFileParser.__init__`
|
happened when decode string using specified encoding
|
||||||
"""
|
"""
|
||||||
self._restart()
|
self._restart()
|
||||||
data = self._next_element()
|
data = self._next_element()
|
||||||
|
@ -155,10 +209,7 @@ class TorrentFileParser(object):
|
||||||
except EOFError: # expect EOF
|
except EOFError: # expect EOF
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if isinstance(data, dict):
|
return data
|
||||||
return data
|
|
||||||
|
|
||||||
raise InvalidTorrentDataException('Outermost element is not a dict')
|
|
||||||
|
|
||||||
def _read_byte(self, count=1, raise_eof=False):
|
def _read_byte(self, count=1, raise_eof=False):
|
||||||
assert count >= 0
|
assert count >= 0
|
||||||
|
@ -186,11 +237,14 @@ class TorrentFileParser(object):
|
||||||
k = self._next_element()
|
k = self._next_element()
|
||||||
if k is _END:
|
if k is _END:
|
||||||
return
|
return
|
||||||
if k in self.RAW_FIELD_PARAMS:
|
if not isinstance(k, str_type):
|
||||||
length, need_list = self.RAW_FIELD_PARAMS[k]
|
raise InvalidTorrentDataException(
|
||||||
v = self._next_hash(length, need_list)
|
self._pos, "Type of dict key can't be " + type(k).__name__
|
||||||
|
)
|
||||||
|
if k in self._hash_fields:
|
||||||
|
v = self._next_hash(*self._hash_fields[k])
|
||||||
else:
|
else:
|
||||||
v = self._next_element()
|
v = self._next_element(k)
|
||||||
if k == 'encoding':
|
if k == 'encoding':
|
||||||
self._encoding = v
|
self._encoding = v
|
||||||
yield k, v
|
yield k, v
|
||||||
|
@ -225,29 +279,43 @@ class TorrentFileParser(object):
|
||||||
char = self._read_byte(1)
|
char = self._read_byte(1)
|
||||||
return -value if neg else value
|
return -value if neg else value
|
||||||
|
|
||||||
def _next_string(self, decode=True):
|
def _next_string(self, need_decode=True, field=None):
|
||||||
length = self._next_int(self.STRING_DELIMITER)
|
length = self._next_int(self.STRING_DELIMITER)
|
||||||
raw = self._read_byte(length)
|
raw = self._read_byte(length)
|
||||||
if decode:
|
if need_decode:
|
||||||
encoding = self._encoding
|
encoding = self._encoding
|
||||||
if encoding == 'auto':
|
if encoding == 'auto':
|
||||||
encoding = detect(raw)
|
self.encoding = encoding = detect(raw)
|
||||||
try:
|
try:
|
||||||
string = raw.decode(encoding)
|
string = raw.decode(encoding, self._error_handler)
|
||||||
except UnicodeDecodeError as e:
|
except UnicodeDecodeError as e:
|
||||||
|
msg = [
|
||||||
|
"Fail to decode string at pos {pos} using encoding ",
|
||||||
|
e.encoding
|
||||||
|
]
|
||||||
|
if field:
|
||||||
|
msg.extend([
|
||||||
|
' when parser field "', field, '"'
|
||||||
|
', maybe it is an hash field. ',
|
||||||
|
'You can use self.hash_field("', field, '") ',
|
||||||
|
'to let it be treated as hash value, ',
|
||||||
|
'so this error may disappear'
|
||||||
|
])
|
||||||
raise InvalidTorrentDataException(
|
raise InvalidTorrentDataException(
|
||||||
self._pos - length + e.start,
|
self._pos - length + e.start,
|
||||||
"Fail to decode string at pos {pos} using " + e.encoding
|
''.join(msg)
|
||||||
)
|
)
|
||||||
return string
|
return string
|
||||||
return raw
|
return raw
|
||||||
|
|
||||||
def _next_hash(self, p_len, need_list):
|
def _next_hash(self, p_len, need_list):
|
||||||
raw = self._next_string(decode=False)
|
raw = self._next_string(need_decode=False)
|
||||||
if len(raw) % p_len != 0:
|
if len(raw) % p_len != 0:
|
||||||
raise InvalidTorrentDataException(
|
raise InvalidTorrentDataException(
|
||||||
self._pos - len(raw), "Hash bit length not match at pos {pos}"
|
self._pos - len(raw), "Hash bit length not match at pos {pos}"
|
||||||
)
|
)
|
||||||
|
if self._hash_raw:
|
||||||
|
return raw
|
||||||
res = [
|
res = [
|
||||||
binascii.hexlify(chunk).decode('ascii')
|
binascii.hexlify(chunk).decode('ascii')
|
||||||
for chunk in (raw[x:x+p_len] for x in range(0, len(raw), p_len))
|
for chunk in (raw[x:x+p_len] for x in range(0, len(raw), p_len))
|
||||||
|
@ -274,44 +342,59 @@ class TorrentFileParser(object):
|
||||||
def _type_to_func(self, t):
|
def _type_to_func(self, t):
|
||||||
return getattr(self, '_next_' + t)
|
return getattr(self, '_next_' + t)
|
||||||
|
|
||||||
def _next_element(self):
|
def _next_element(self, field=None):
|
||||||
element_type = self._next_type()
|
element_type = self._next_type()
|
||||||
element = self._type_to_func(element_type)()
|
if element_type is TorrentFileParser.TYPE_STRING and field is not None:
|
||||||
|
element = self._type_to_func(element_type)(field=field)
|
||||||
|
else:
|
||||||
|
element = self._type_to_func(element_type)()
|
||||||
return element
|
return element
|
||||||
|
|
||||||
|
|
||||||
class TorrentFileCreator(object):
|
class BEncoder(object):
|
||||||
|
|
||||||
TYPES = {
|
TYPES = {
|
||||||
(dict,): TorrentFileParser.TYPE_DICT,
|
(dict,): TorrentFileParser.TYPE_DICT,
|
||||||
(list,): TorrentFileParser.TYPE_LIST,
|
(list,): TorrentFileParser.TYPE_LIST,
|
||||||
(int,): TorrentFileParser.TYPE_INT,
|
(int,): TorrentFileParser.TYPE_INT,
|
||||||
(str_type,): TorrentFileParser.TYPE_STRING,
|
(str_type, bytes): TorrentFileParser.TYPE_STRING,
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, data, encoding='utf-8'):
|
def __init__(self, data, encoding='utf-8', hash_fields=None):
|
||||||
"""
|
"""
|
||||||
:param data: torrent data, must be a dict or OrderedDict
|
:param dict|list|int|str data: data will be encoded
|
||||||
:param encoding: string field output encoding
|
:param str encoding: string field output encoding
|
||||||
|
:param List[str] hash_fields: see
|
||||||
|
:any:`TorrentFileParser.__init__`
|
||||||
"""
|
"""
|
||||||
if not isinstance(data, dict):
|
|
||||||
raise InvalidTorrentDataException(
|
|
||||||
None,
|
|
||||||
"Top level structure should be a dict"
|
|
||||||
)
|
|
||||||
self._data = data
|
self._data = data
|
||||||
self._encoding = encoding
|
self._encoding = encoding
|
||||||
|
self._hash_fields = list(TorrentFileParser.HASH_FIELD_PARAMS.keys())
|
||||||
|
if hash_fields is not None:
|
||||||
|
self._hash_fields.extend(str_type(hash_fields))
|
||||||
|
|
||||||
|
def hash_field(self, name):
|
||||||
|
"""
|
||||||
|
see :any:`TorrentFileParser.hash_field`
|
||||||
|
|
||||||
|
:param str name:
|
||||||
|
:return: return self, so you can chained call
|
||||||
|
"""
|
||||||
|
return self._hash_fields.append(str_type(name))
|
||||||
|
|
||||||
def encode(self):
|
def encode(self):
|
||||||
"""
|
"""
|
||||||
Encode data to bytes that conform to torrent file format
|
Encode to bytes
|
||||||
|
|
||||||
|
:rtype: bytes
|
||||||
"""
|
"""
|
||||||
return b''.join(self._output_element(self._data))
|
return b''.join(self._output_element(self._data))
|
||||||
|
|
||||||
def encode_to_readable(self):
|
def encode_to_filelike(self):
|
||||||
"""
|
"""
|
||||||
Encode data to a file-like(BytesIO) object which contains the result of
|
Encode to a file-like(BytesIO) object
|
||||||
`TorrentFileCreator.encode()`
|
|
||||||
|
:rtype: BytesIO
|
||||||
"""
|
"""
|
||||||
return io.BytesIO(self.encode())
|
return io.BytesIO(self.encode())
|
||||||
|
|
||||||
|
@ -364,7 +447,7 @@ class TorrentFileCreator(object):
|
||||||
)
|
)
|
||||||
for x in self._output_element(k):
|
for x in self._output_element(k):
|
||||||
yield x
|
yield x
|
||||||
if k in TorrentFileParser.RAW_FIELD_PARAMS:
|
if k in self._hash_fields:
|
||||||
for x in self._output_decode_hash(v):
|
for x in self._output_decode_hash(v):
|
||||||
yield x
|
yield x
|
||||||
else:
|
else:
|
||||||
|
@ -393,29 +476,119 @@ class TorrentFileCreator(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def parse_torrent_file(filename, use_ordered_dict=False):
|
class BDecoder(object):
|
||||||
|
def __init__(
|
||||||
|
self, data, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
||||||
|
hash_fields=None, hash_raw=False,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
See :any:`TorrentFileParser.__init__` for parameter description.
|
||||||
|
|
||||||
|
:param bytes data: raw data to be decoded
|
||||||
|
:param bool use_ordered_dict:
|
||||||
|
:param str encoding:
|
||||||
|
:param str errors:
|
||||||
|
:param Dict[str, Tuple[int, bool]] hash_fields:
|
||||||
|
:param bool hash_raw:
|
||||||
|
"""
|
||||||
|
self._parser = TorrentFileParser(
|
||||||
|
io.BytesIO(bytes(data)),
|
||||||
|
use_ordered_dict,
|
||||||
|
encoding,
|
||||||
|
errors,
|
||||||
|
hash_fields,
|
||||||
|
hash_raw,
|
||||||
|
)
|
||||||
|
|
||||||
|
def hash_field(self, name, block_length=20, need_dict=False):
|
||||||
|
"""
|
||||||
|
See :any:`TorrentFileParser.hash_field` for parameter description
|
||||||
|
|
||||||
|
:param name:
|
||||||
|
:param block_length:
|
||||||
|
:param need_dict:
|
||||||
|
:return: return self, so you can chained call
|
||||||
|
"""
|
||||||
|
self._parser.hash_field(name, block_length, need_dict)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def decode(self):
|
||||||
|
return self._parser.parse()
|
||||||
|
|
||||||
|
|
||||||
|
def encode(data, encoding='utf-8', hash_fields=None):
|
||||||
|
"""
|
||||||
|
Shortcut function for encode python object to torrent file format(bencode)
|
||||||
|
|
||||||
|
See :any:`BEncoder.__init__` for parameter description
|
||||||
|
|
||||||
|
:param dict|list|int|str|bytes data: data to be encoded
|
||||||
|
:param str encoding:
|
||||||
|
:param List[str] hash_fields:
|
||||||
|
:rtype: bytes
|
||||||
|
"""
|
||||||
|
return BEncoder(data, encoding, hash_fields).encode()
|
||||||
|
|
||||||
|
|
||||||
|
def decode(
|
||||||
|
data, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
||||||
|
hash_fields=None, hash_raw=False,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Shortcut function for decode bytes as torrent file format(bencode) to python
|
||||||
|
object
|
||||||
|
|
||||||
|
See :any:`BDecoder.__init__` for parameter description
|
||||||
|
|
||||||
|
:param bytes data: raw data to be decoded
|
||||||
|
:param bool use_ordered_dict:
|
||||||
|
:param str encoding:
|
||||||
|
:param str errors:
|
||||||
|
:param Dict[str, Tuple[int, bool]] hash_fields:
|
||||||
|
:param bool hash_raw:
|
||||||
|
:rtype: dict|list|int|str|bytes|bytes
|
||||||
|
"""
|
||||||
|
return BDecoder(
|
||||||
|
data, use_ordered_dict, encoding, errors, hash_fields, hash_raw,
|
||||||
|
).decode()
|
||||||
|
|
||||||
|
|
||||||
|
def parse_torrent_file(
|
||||||
|
filename, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
||||||
|
hash_fields=None, hash_raw=False,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Shortcut function for parse torrent object using TorrentFileParser
|
Shortcut function for parse torrent object using TorrentFileParser
|
||||||
|
|
||||||
:param string filename: torrent filename
|
See :any:`TorrentFileParser.__init__` for parameter description
|
||||||
:param bool use_ordered_dict: see :any:`TorrentFileParser.__init__`
|
|
||||||
:rtype: dict if ``use_ordered_dict`` is false,
|
:param str filename: torrent filename
|
||||||
collections.OrderedDict otherwise
|
:param bool use_ordered_dict:
|
||||||
|
:param str encoding:
|
||||||
|
:param str errors:
|
||||||
|
:param Dict[str, Tuple[int, bool]] hash_fields:
|
||||||
|
:param bool hash_raw:
|
||||||
|
:rtype: dict|list|int|str|bytes
|
||||||
"""
|
"""
|
||||||
with open(filename, 'rb') as f:
|
with open(filename, 'rb') as f:
|
||||||
return TorrentFileParser(f, use_ordered_dict).parse()
|
return TorrentFileParser(
|
||||||
|
f, use_ordered_dict, encoding, errors, hash_fields, hash_raw,
|
||||||
|
).parse()
|
||||||
|
|
||||||
|
|
||||||
def create_torrent_file(filename, data, encoding='utf-8'):
|
def create_torrent_file(filename, data, encoding='utf-8', hash_fields=None):
|
||||||
"""
|
"""
|
||||||
Shortcut function for create a torrent file using TorrentFileCreator
|
Shortcut function for create a torrent file using BEncoder
|
||||||
|
|
||||||
:param filename: output torrent filename
|
see :any:`BDecoder.__init__` for parameter description
|
||||||
:param data: torrent data, must be a dict or OrderedDict
|
|
||||||
:param encoding: string field output encoding
|
:param str filename: output torrent filename
|
||||||
|
:param dict|list|int|str|bytes data:
|
||||||
|
:param str encoding:
|
||||||
|
:param List[str] hash_fields:
|
||||||
"""
|
"""
|
||||||
with open(filename, 'wb') as f:
|
with open(filename, 'wb') as f:
|
||||||
f.write(TorrentFileCreator(data, encoding).encode())
|
f.write(BEncoder(data, encoding, hash_fields).encode())
|
||||||
|
|
||||||
|
|
||||||
def __main():
|
def __main():
|
||||||
|
@ -432,7 +605,10 @@ def __main():
|
||||||
help='ensure output json use ascii char, '
|
help='ensure output json use ascii char, '
|
||||||
'escape other char use \\u')
|
'escape other char use \\u')
|
||||||
parser.add_argument('--coding', '-c', default='utf-8',
|
parser.add_argument('--coding', '-c', default='utf-8',
|
||||||
help='string encoding, default utf-8')
|
help='string encoding, default "utf-8"')
|
||||||
|
parser.add_argument('--errors', '-e', default='strict',
|
||||||
|
help='decoding error handler, default "strict", you can'
|
||||||
|
' use "ignore" or "replace" to avoid exception')
|
||||||
parser.add_argument('--version', '-v', action='store_true', default=False,
|
parser.add_argument('--version', '-v', action='store_true', default=False,
|
||||||
help='print version and exit')
|
help='print version and exit')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
@ -453,7 +629,9 @@ def __main():
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
# noinspection PyUnboundLocalVariable
|
# noinspection PyUnboundLocalVariable
|
||||||
data = TorrentFileParser(target_file, not args.dict, args.coding).parse()
|
data = TorrentFileParser(
|
||||||
|
target_file, not args.dict, args.coding, args.errors
|
||||||
|
).parse()
|
||||||
|
|
||||||
data = json.dumps(
|
data = json.dumps(
|
||||||
data, ensure_ascii=args.ascii,
|
data, ensure_ascii=args.ascii,
|
||||||
|
|
Loading…
Reference in New Issue