hash_fields(finish #4) and hash raw:
- hash fields method and parameter allow user customize hash field list - hash raw parameter allow the output of hash field to be raw bytes - BEncoder now support encode raw bytesdev
parent
ee3128b32b
commit
177d1c7de9
|
@ -13,12 +13,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
|
|||
- Add `-e`/`--error` to CLI option to set the `errors` option of `parse_torrent_file`.
|
||||
- `BDecoder` class and `decode` shortcut function to directly decode bytes.
|
||||
- `decode` shortcut function to directly encode data to bytes.
|
||||
- Added `hash_fields` parameter and method to customize hash field list.
|
||||
- Added `hash_raw` parameter to let all hash field be parsed as raw bytes.
|
||||
|
||||
### Changed
|
||||
|
||||
- **BreakChange** `TorrentFileCreator` rename to `BEncoder` as the origin name don't describe its function.
|
||||
- `TorrentFileParser` don't need the outmost level of parsed data to be a `dict` now
|
||||
- `BEncoder` don't need the outmost level of encoded data to be a `dict` now
|
||||
- `TorrentFileParser` don't need the outmost level of parsed data to be a `dict` now.
|
||||
- `BEncoder` don't need the outmost level of encoded data to be a `dict` now.
|
||||
- `BEncoder` now support encode raw bytes.
|
||||
|
||||
## [0.2.0] - 2018.5.25
|
||||
|
||||
|
|
|
@ -9,8 +9,7 @@ Can also edit and write back to torrent format after version 0.2.0.
|
|||
- Decoder and encoder for torrent files
|
||||
- Auto decode bytes field to string with used specified encoding and error handler
|
||||
- Auto detect encoding when use `auto` as encoding(need `chardet` installed)
|
||||
- Auto decode hash value filed to hash blocks
|
||||
- Uniform exception type
|
||||
- Auto decode hash value filed to hash blocks, also customizable
|
||||
- CLI provided, with JSON output
|
||||
|
||||
## Install
|
||||
|
|
|
@ -3,3 +3,5 @@ from .test_parse import *
|
|||
from .test_encoding_error import *
|
||||
from .test_encode import *
|
||||
from .test_decode import *
|
||||
from .test_hash_field import *
|
||||
from .test_hash_raw import *
|
|
@ -0,0 +1,21 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import os.path
|
||||
import unittest
|
||||
|
||||
from torrent_parser import (
|
||||
TorrentFileParser, parse_torrent_file, decode
|
||||
)
|
||||
|
||||
|
||||
class TestHashField(unittest.TestCase):
|
||||
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), 'test_files')
|
||||
FILE = os.path.join(TEST_FILES_DIR, 'utf8.encoding.error.torrent')
|
||||
|
||||
def test_not_raise_exception_when_add_hash_fields(self):
|
||||
parse_torrent_file(self.FILE, hash_fields={'info_hash': (20, False)})
|
||||
with open(self.FILE, 'rb') as f:
|
||||
TorrentFileParser(f).hash_field('info_hash').parse()
|
||||
with open(self.FILE, 'rb') as f:
|
||||
data = f.read()
|
||||
decode(data, hash_fields={'info_hash': (20, False)})
|
|
@ -0,0 +1,23 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import os.path
|
||||
import unittest
|
||||
|
||||
from torrent_parser import decode, encode
|
||||
|
||||
|
||||
class TestHashRaw(unittest.TestCase):
|
||||
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), 'test_files')
|
||||
FILE = os.path.join(TEST_FILES_DIR, 'utf8.encoding.error.torrent')
|
||||
|
||||
def test_hash_raw_decode(self):
|
||||
data = b'd4:hash4:\xAA\xBB\xCC\xDDe'
|
||||
res = decode(data, hash_fields={'hash': (4, False)}, hash_raw=False)
|
||||
self.assertEqual(res['hash'], 'aabbccdd')
|
||||
res = decode(data, hash_fields={'hash': (4, False)}, hash_raw=True)
|
||||
self.assertEqual(res['hash'], b'\xAA\xBB\xCC\xDD')
|
||||
|
||||
def test_raw_bytes_encode(self):
|
||||
res = {'hash': b'\xAA\xBB\xCC\xDD'}
|
||||
data = encode(res)
|
||||
self.assertEqual(data, b'd4:hash4:\xAA\xBB\xCC\xDDe')
|
|
@ -26,6 +26,11 @@ Usage:
|
|||
with open('new.torrent', 'wb') as f:
|
||||
f.write(TorrentFileCreator(data).encode())
|
||||
|
||||
# or you don't deal with file, just object in memory
|
||||
|
||||
data = decode(b'i12345e') # data = 12345
|
||||
content = encode(data) # content = b'i12345e'
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import print_function, unicode_literals
|
||||
|
@ -93,6 +98,12 @@ class __EndCls(object):
|
|||
_END = __EndCls()
|
||||
|
||||
|
||||
def _check_hash_field_params(name, value):
|
||||
return isinstance(name, str_type) \
|
||||
and isinstance(value, tuple) and len(value) == 2 \
|
||||
and isinstance(value[0], int) and isinstance(value[1], bool)
|
||||
|
||||
|
||||
class TorrentFileParser(object):
|
||||
|
||||
TYPE_LIST = 'list'
|
||||
|
@ -124,32 +135,66 @@ class TorrentFileParser(object):
|
|||
]
|
||||
|
||||
def __init__(
|
||||
self, fp, use_ordered_dict=False, encoding='utf-8', errors='strict'
|
||||
self, fp, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
||||
hash_fields=None, hash_raw=False,
|
||||
):
|
||||
"""
|
||||
:param fp: a **binary** file-like object to parse,
|
||||
which means need 'b' mode when use built-in open function
|
||||
:param bool use_ordered_dict: Use collections.OrderedDict as dict
|
||||
container default False, which mean use built-in dict
|
||||
:param string encoding: file content encoding, default utf-8, use 'auto'
|
||||
:param str encoding: file content encoding, default utf-8, use 'auto'
|
||||
to enable charset auto detection (need 'chardet' package installed)
|
||||
:param string errors: how to deal with encoding error when try to parse
|
||||
:param str errors: how to deal with encoding error when try to parse
|
||||
string from content with ``encoding``
|
||||
:param Dict[str, Tuple[int, bool]] hash_fields: extra fields should
|
||||
be treated as hash value. dict key is the field name, value is a
|
||||
two-element tuple of (hash_block_length, as_a_list).
|
||||
See :any:`hash_field` for detail
|
||||
"""
|
||||
if getattr(fp, 'read', ) is None \
|
||||
or getattr(fp, 'seek') is None:
|
||||
raise ValueError('Argument fp needs a file like object')
|
||||
raise ValueError('Parameter fp needs a file like object')
|
||||
|
||||
self._pos = 0
|
||||
self._encoding = encoding
|
||||
self._content = fp
|
||||
self._use_ordered_dict = use_ordered_dict
|
||||
self._error_handler = errors
|
||||
self._hash_fields = dict(TorrentFileParser.HASH_FIELD_PARAMS)
|
||||
if hash_fields is not None:
|
||||
for k, v in hash_fields.items():
|
||||
if _check_hash_field_params(k, v):
|
||||
self._hash_fields[k] = v
|
||||
else:
|
||||
raise ValueError(
|
||||
"Invalid hash field parameter, it should be type of "
|
||||
"Dict[str, Tuple[int, bool]]"
|
||||
)
|
||||
self._hash_raw = bool(hash_raw)
|
||||
|
||||
def hash_field(self, name, block_length=20, need_list=False):
|
||||
"""
|
||||
Let field with the `name` to be treated as hash value, don't decode it
|
||||
as a string.
|
||||
|
||||
:param str name: field name
|
||||
:param int block_length: hash block length for split
|
||||
:param bool need_list: if True, when the field only has one block(
|
||||
or even empty) its parse result will be a one-element list(
|
||||
or empty list); If False, will be a string in 0 or 1 block condition
|
||||
:return: return self, so you can chained call
|
||||
"""
|
||||
v = (block_length, need_list)
|
||||
if _check_hash_field_params(name, v):
|
||||
self._hash_fields[name] = v
|
||||
else:
|
||||
raise ValueError("Invalid hash field parameter")
|
||||
return self
|
||||
|
||||
def parse(self):
|
||||
"""
|
||||
:return: the parse result
|
||||
:rtype: dict|list|int|string
|
||||
:rtype: dict|list|int|str|bytes
|
||||
:raise: :any:`InvalidTorrentDataException` when parse failed or error
|
||||
happened when decode string using specified encoding
|
||||
"""
|
||||
|
@ -192,10 +237,14 @@ class TorrentFileParser(object):
|
|||
k = self._next_element()
|
||||
if k is _END:
|
||||
return
|
||||
if k in self.HASH_FIELD_PARAMS:
|
||||
v = self._next_hash(*self.HASH_FIELD_PARAMS[k])
|
||||
if not isinstance(k, str_type):
|
||||
raise InvalidTorrentDataException(
|
||||
self._pos, "Type of dict key can't be " + type(k).__name__
|
||||
)
|
||||
if k in self._hash_fields:
|
||||
v = self._next_hash(*self._hash_fields[k])
|
||||
else:
|
||||
v = self._next_element()
|
||||
v = self._next_element(k)
|
||||
if k == 'encoding':
|
||||
self._encoding = v
|
||||
yield k, v
|
||||
|
@ -230,7 +279,7 @@ class TorrentFileParser(object):
|
|||
char = self._read_byte(1)
|
||||
return -value if neg else value
|
||||
|
||||
def _next_string(self, need_decode=True):
|
||||
def _next_string(self, need_decode=True, field=None):
|
||||
length = self._next_int(self.STRING_DELIMITER)
|
||||
raw = self._read_byte(length)
|
||||
if need_decode:
|
||||
|
@ -240,10 +289,21 @@ class TorrentFileParser(object):
|
|||
try:
|
||||
string = raw.decode(encoding, self._error_handler)
|
||||
except UnicodeDecodeError as e:
|
||||
msg = [
|
||||
"Fail to decode string at pos {pos} using encoding ",
|
||||
e.encoding
|
||||
]
|
||||
if field:
|
||||
msg.extend([
|
||||
' when parser field "', field, '"'
|
||||
', maybe it is an hash field. ',
|
||||
'You can use self.hash_field("', field, '") ',
|
||||
'to let it be treated as hash value, ',
|
||||
'so this error may disappear'
|
||||
])
|
||||
raise InvalidTorrentDataException(
|
||||
self._pos - length + e.start,
|
||||
"Fail to decode string at pos {pos} using encoding " +
|
||||
e.encoding
|
||||
''.join(msg)
|
||||
)
|
||||
return string
|
||||
return raw
|
||||
|
@ -254,6 +314,8 @@ class TorrentFileParser(object):
|
|||
raise InvalidTorrentDataException(
|
||||
self._pos - len(raw), "Hash bit length not match at pos {pos}"
|
||||
)
|
||||
if self._hash_raw:
|
||||
return raw
|
||||
res = [
|
||||
binascii.hexlify(chunk).decode('ascii')
|
||||
for chunk in (raw[x:x+p_len] for x in range(0, len(raw), p_len))
|
||||
|
@ -280,9 +342,12 @@ class TorrentFileParser(object):
|
|||
def _type_to_func(self, t):
|
||||
return getattr(self, '_next_' + t)
|
||||
|
||||
def _next_element(self):
|
||||
def _next_element(self, field=None):
|
||||
element_type = self._next_type()
|
||||
element = self._type_to_func(element_type)()
|
||||
if element_type is TorrentFileParser.TYPE_STRING and field is not None:
|
||||
element = self._type_to_func(element_type)(field=field)
|
||||
else:
|
||||
element = self._type_to_func(element_type)()
|
||||
return element
|
||||
|
||||
|
||||
|
@ -292,16 +357,30 @@ class BEncoder(object):
|
|||
(dict,): TorrentFileParser.TYPE_DICT,
|
||||
(list,): TorrentFileParser.TYPE_LIST,
|
||||
(int,): TorrentFileParser.TYPE_INT,
|
||||
(str_type,): TorrentFileParser.TYPE_STRING,
|
||||
(str_type, bytes): TorrentFileParser.TYPE_STRING,
|
||||
}
|
||||
|
||||
def __init__(self, data, encoding='utf-8'):
|
||||
def __init__(self, data, encoding='utf-8', hash_fields=None):
|
||||
"""
|
||||
:param dict|list|int|string data: data will be encoded
|
||||
:param string encoding: string field output encoding
|
||||
:param dict|list|int|str data: data will be encoded
|
||||
:param str encoding: string field output encoding
|
||||
:param List[str] hash_fields: see
|
||||
:any:`TorrentFileParser.__init__`
|
||||
"""
|
||||
self._data = data
|
||||
self._encoding = encoding
|
||||
self._hash_fields = list(TorrentFileParser.HASH_FIELD_PARAMS.keys())
|
||||
if hash_fields is not None:
|
||||
self._hash_fields.extend(str_type(hash_fields))
|
||||
|
||||
def hash_fields(self, name):
|
||||
"""
|
||||
see :any:`TorrentFileParser.hash_field`
|
||||
|
||||
:param str name:
|
||||
:return: return self, so you can chained call
|
||||
"""
|
||||
return self._hash_fields.append(str_type(name))
|
||||
|
||||
def encode(self):
|
||||
"""
|
||||
|
@ -368,7 +447,7 @@ class BEncoder(object):
|
|||
)
|
||||
for x in self._output_element(k):
|
||||
yield x
|
||||
if k in TorrentFileParser.HASH_FIELD_PARAMS:
|
||||
if k in self._hash_fields:
|
||||
for x in self._output_decode_hash(v):
|
||||
yield x
|
||||
else:
|
||||
|
@ -399,77 +478,117 @@ class BEncoder(object):
|
|||
|
||||
class BDecoder(object):
|
||||
def __init__(
|
||||
self, data, use_ordered_dict=False, encoding='utf-8', errors='strict'
|
||||
self, data, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
||||
hash_fields=None, hash_raw=False,
|
||||
):
|
||||
"""
|
||||
See :any:`TorrentFileParser.__init__` for parameter description.
|
||||
|
||||
:param bytes data: raw data to be decoded
|
||||
:param bool use_ordered_dict: see :any:`TorrentFileParser.__init__`
|
||||
:param string encoding: see :any:`TorrentFileParser.__init__`
|
||||
:param string errors: see :any:`TorrentFileParser.__init__`
|
||||
:param bool use_ordered_dict:
|
||||
:param str encoding:
|
||||
:param str errors:
|
||||
:param Dict[str, Tuple[int, bool]] hash_fields:
|
||||
:param bool hash_raw:
|
||||
"""
|
||||
self._data = bytes(data)
|
||||
self._use_ordered_dict = use_ordered_dict
|
||||
self._encoding = encoding
|
||||
self._errors = errors
|
||||
self._parser = TorrentFileParser(
|
||||
io.BytesIO(bytes(data)),
|
||||
use_ordered_dict,
|
||||
encoding,
|
||||
errors,
|
||||
hash_fields,
|
||||
hash_raw,
|
||||
)
|
||||
|
||||
def hash_field(self, name, block_length=20, need_dict=False):
|
||||
"""
|
||||
See :any:`TorrentFileParser.hash_field` for parameter description
|
||||
|
||||
:param name:
|
||||
:param block_length:
|
||||
:param need_dict:
|
||||
:return: return self, so you can chained call
|
||||
"""
|
||||
self._parser.hash_field(name, block_length, need_dict)
|
||||
return self
|
||||
|
||||
def decode(self):
|
||||
return TorrentFileParser(
|
||||
io.BytesIO(self._data), self._use_ordered_dict, self._encoding,
|
||||
self._errors,
|
||||
).parse()
|
||||
return self._parser.parse()
|
||||
|
||||
|
||||
def encode(data, encoding='utf-8'):
|
||||
def encode(data, encoding='utf-8', hash_fields=None):
|
||||
"""
|
||||
Shortcut function for encode python object to torrent file format(bencode)
|
||||
|
||||
:param dict|list|int|string data: data to be encoded
|
||||
:param string encoding: see :any:`TorrentFileParser.__init__`
|
||||
See :any:`BEncoder.__init__` for parameter description
|
||||
|
||||
:param dict|list|int|str|bytes data: data to be encoded
|
||||
:param str encoding:
|
||||
:param List[str] hash_fields:
|
||||
:rtype: bytes
|
||||
"""
|
||||
return BEncoder(data, encoding).encode()
|
||||
return BEncoder(data, encoding, hash_fields).encode()
|
||||
|
||||
|
||||
def decode(data, use_ordered_dict=False, encoding='utf-8', errors='strict'):
|
||||
def decode(
|
||||
data, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
||||
hash_fields=None, hash_raw=False,
|
||||
):
|
||||
"""
|
||||
Shortcut function for decode bytes as torrent file format(bencode) to python
|
||||
object
|
||||
|
||||
See :any:`BDecoder.__init__` for parameter description
|
||||
|
||||
:param bytes data: raw data to be decoded
|
||||
:param bool use_ordered_dict: see :any:`TorrentFileParser.__init__`
|
||||
:param string encoding: see :any:`TorrentFileParser.__init__`
|
||||
:param string errors: see :any:`TorrentFileParser.__init__`
|
||||
:rtype: dict|list|int|string
|
||||
:param bool use_ordered_dict:
|
||||
:param str encoding:
|
||||
:param str errors:
|
||||
:param Dict[str, Tuple[int, bool]] hash_fields:
|
||||
:param bool hash_raw:
|
||||
:rtype: dict|list|int|str|bytes|bytes
|
||||
"""
|
||||
return BDecoder(data, use_ordered_dict, encoding, errors).decode()
|
||||
return BDecoder(
|
||||
data, use_ordered_dict, encoding, errors, hash_fields, hash_raw,
|
||||
).decode()
|
||||
|
||||
|
||||
def parse_torrent_file(
|
||||
filename, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
||||
hash_fields=None, hash_raw=False,
|
||||
):
|
||||
"""
|
||||
Shortcut function for parse torrent object using TorrentFileParser
|
||||
|
||||
:param string filename: torrent filename
|
||||
:param bool use_ordered_dict: see :any:`TorrentFileParser.__init__`
|
||||
:param string encoding: see :any:`TorrentFileParser.__init__`
|
||||
:param string errors: see :any:`TorrentFileParser.__init__`
|
||||
:rtype: dict|list|int|string
|
||||
See :any:`TorrentFileParser.__init__` for parameter description
|
||||
|
||||
:param str filename: torrent filename
|
||||
:param bool use_ordered_dict:
|
||||
:param str encoding:
|
||||
:param str errors:
|
||||
:param Dict[str, Tuple[int, bool]] hash_fields:
|
||||
:param bool hash_raw:
|
||||
:rtype: dict|list|int|str|bytes
|
||||
"""
|
||||
with open(filename, 'rb') as f:
|
||||
return TorrentFileParser(f, use_ordered_dict, encoding, errors).parse()
|
||||
return TorrentFileParser(
|
||||
f, use_ordered_dict, encoding, errors, hash_fields, hash_raw,
|
||||
).parse()
|
||||
|
||||
|
||||
def create_torrent_file(filename, data, encoding='utf-8'):
|
||||
def create_torrent_file(filename, data, encoding='utf-8', hash_fields=None):
|
||||
"""
|
||||
Shortcut function for create a torrent file using TorrentFileCreator
|
||||
Shortcut function for create a torrent file using BEncoder
|
||||
|
||||
:param string filename: output torrent filename
|
||||
:param dict|list|int|string data: torrent data
|
||||
:param string encoding: string field output encoding
|
||||
see :any:`BDecoder.__init__` for parameter description
|
||||
|
||||
:param str filename: output torrent filename
|
||||
:param dict|list|int|str|bytes data:
|
||||
:param str encoding:
|
||||
:param List[str] hash_fields:
|
||||
"""
|
||||
with open(filename, 'wb') as f:
|
||||
f.write(BEncoder(data, encoding).encode())
|
||||
f.write(BEncoder(data, encoding, hash_fields).encode())
|
||||
|
||||
|
||||
def __main():
|
||||
|
|
Loading…
Reference in New Issue