include btclient

pull/1/head
DiMartinoXBMC 2015-08-02 22:47:42 +03:00
parent 449dd4a202
commit 9631c07c9b
12 changed files with 3171 additions and 945 deletions

File diff suppressed because it is too large Load Diff

View File

@ -23,6 +23,8 @@ import urllib
import json
import sys
from contextlib import contextmanager, closing, nested
import traceback
from argparse import Namespace
import xbmc
import xbmcgui
@ -31,14 +33,8 @@ import xbmcgui
import xbmcvfs
import Localization
from platform_pulsar import get_platform
import traceback
from btclient import *
from functions import calculate, showMessage, clearStorage, DownloadDB, get_ids_video, log, debug, is_writable
from argparse import Namespace
from functions import showMessage, DownloadDB, get_ids_video, log, debug
from Player import OverlayText
from Libtorrent import Libtorrent
ROOT = sys.modules["__main__"].__root__
RESOURCES_PATH = os.path.join(ROOT, 'resources')

View File

@ -1,9 +1,10 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="plugin.video.torrenter" name="Torrenter" version="2.3.6" provider-name="vadim.skorba, DiMartino">
<addon id="plugin.video.torrenter" name="Torrenter" version="2.3.7" provider-name="vadim.skorba, DiMartino">
<requires>
<import addon="xbmc.python" version="2.1.0"/>
<import addon="script.module.libtorrent"/>
<import addon="script.module.btclient"/>
<import addon="script.module.beautifulsoup4"/>
<import addon="script.module.hachoir"/>
<import addon="script.module.torrent.ts"/>
</requires>
<extension point="xbmc.python.pluginsource" provides="video" library="default.py">

View File

@ -0,0 +1,5 @@
#-*- coding: utf-8 -*-
'''
Torrenter v2 plugin for XBMC/Kodi
Copyright (C) 2015 DiMartino
'''

View File

View File

@ -0,0 +1 @@
__version__ = '0.4.2'

View File

@ -0,0 +1,819 @@
#!/usr/bin/env python
import time
import sys
import argparse
import os.path
from threading import Thread
import re
import urlparse
import BaseHTTPServer as htserver
import types
import logging
import logging.handlers
import traceback
import urllib
import SocketServer
import socket
import pickle
import thread
import json
import shutil
from cachebt import CacheBT
#from player import Player
from common import AbstractFile, Hasher, BaseMonitor, BaseClient, Resolver
from htclient import HTClient
#import plugins # @UnresolvedImport
logging.basicConfig()
logger = logging.getLogger()
INITIAL_TRACKERS = ['udp://tracker.openbittorrent.com:80',
'udp://tracker.istole.it:80',
'udp://open.demonii.com:80',
'udp://tracker.coppersurfer.tk:80',
'udp://tracker.leechers-paradise.org:6969',
'udp://exodus.desync.com:6969',
'udp://tracker.publicbt.com:80']
VIDEO_EXTS = {'.avi': 'video/x-msvideo', '.mp4': 'video/mp4', '.mkv': 'video/x-matroska',
'.m4v': 'video/mp4', '.mov': 'video/quicktime', '.mpg': 'video/mpeg', '.ogv': 'video/ogg',
'.ogg': 'video/ogg', '.webm': 'video/webm', '.ts': 'video/mp2t', '.3gp': 'video/3gpp'}
RANGE_RE = re.compile(r'bytes=(\d+)-')
# offset from end to download first
FILE_TAIL = 10000
class x:
lol=''
def parse_range(range): # @ReservedAssignment
if range:
m = RANGE_RE.match(range)
if m:
try:
return int(m.group(1))
except:
pass
return 0
class StreamServer(SocketServer.ThreadingMixIn, htserver.HTTPServer):
daemon_threads = True
def __init__(self, address, handler_class, tfile=None, allow_range=True, status_fn=None):
htserver.HTTPServer.__init__(self, address, handler_class)
self.file = tfile
self._running = True
self.allow_range = allow_range
self.status_fn = status_fn
def stop(self):
self._running = False
def set_file(self, f):
self.file = f
def serve(self, w=0.1):
while self._running:
try:
self.handle_request()
time.sleep(w)
except Exception, e:
print >> sys.stderr, str(e)
def run(self):
self.timeout = 0.1
t = Thread(target=self.serve, args=[0.1], name='HTTP Server')
t.daemon = True
t.start()
def handle_error(self, request, client_address):
"""Handle an error gracefully. May be overridden.
The default is to print a traceback and continue.
"""
_, e, _ = sys.exc_info()
if isinstance(e, socket.error) and e.errno == 32:
logger.debug("Socket disconnect for client %s", client_address)
# pprint.pprint(e)
else:
logger.exception("HTTP Server Error")
# TODO: remove print
traceback.print_exc()
class BTFileHandler(htserver.BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def do_GET(self):
if self.do_HEAD(only_header=False):
with self.server.file.create_cursor(self._offset) as f:
send_something = False
while True:
buf = f.read(1024)
if not send_something and logger.level <= logging.DEBUG:
logger.debug('Start sending data')
send_something = True
if buf:
self.wfile.write(buf)
else:
if logger.level <= logging.DEBUG:
logger.debug('Finished sending data')
break
def _file_info(self):
size = self.server.file.size
ext = os.path.splitext(self.server.file.path)[1]
mime = (self.server.file.mime if hasattr(self.server.file, 'mime') else None) or VIDEO_EXTS.get(ext)
if not mime:
mime = 'application/octet-stream'
return size, mime
def do_HEAD(self, only_header=True):
parsed_url = urlparse.urlparse(self.path)
if parsed_url.path == "/status" and self.server.status_fn:
s = self.server.status_fn()
status = json.dumps(s)
self.send_response(200, 'OK')
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', len(status))
self._finish_header(only_header)
if not only_header:
self.wfile.write(status)
return False
elif self.server.file and urllib.unquote(parsed_url.path) == '/' + self.server.file.path:
self._offset = 0
size, mime = self._file_info()
range = None # @ReservedAssignment
if self.server.allow_range:
range = parse_range(self.headers.get('Range', None)) # @ReservedAssignment
if range:
self._offset = range
range = (range, size - 1, size) # @ReservedAssignment
logger.debug('Request range %s - (header is %s', range, self.headers.get('Range', None))
self.send_resp_header(mime, size, range, only_header)
return True
else:
logger.error('Requesting wrong path %s, but file is %s', parsed_url.path, '/' + self.server.file.path)
self.send_error(404, 'Not Found')
def send_resp_header(self, cont_type, cont_length, range=False, only_header=False): # @ReservedAssignment
# logger.debug('range is %s'% str(range))
if self.server.allow_range and range:
self.send_response(206, 'Partial Content')
else:
self.send_response(200, 'OK')
self.send_header('Content-Type', cont_type)
self.send_header('transferMode.dlna.org', 'Streaming')
self.send_header('contentFeatures.dlna.org',
'DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=01700000000000000000000000000000')
if self.server.allow_range:
self.send_header('Accept-Ranges', 'bytes')
else:
self.send_header('Accept-Ranges', 'none')
if self.server.allow_range and range:
if isinstance(range, (types.TupleType, types.ListType)) and len(range) == 3:
self.send_header('Content-Range', 'bytes %d-%d/%d' % range)
self.send_header('Content-Length', range[1] - range[0] + 1)
else:
raise ValueError('Invalid range value')
else:
self.send_header('Content-Length', cont_length)
self._finish_header(only_header)
def _finish_header(self, only_header):
self.send_header('Connection', 'close')
if not only_header: self.end_headers()
def log_message(self, format, *args): # @ReservedAssignment
logger.debug(format, *args)
class BTClient(BaseClient):
def __init__(self, path_to_store,
args=None,
state_file="",
lt=None,
**kwargs):
super(BTClient, self).__init__(path_to_store, args=args)
self.lt=lt
self._cache = CacheBT(path_to_store, self.lt)
self._torrent_params = {'save_path': path_to_store,
'storage_mode': self.lt.storage_mode_t.storage_mode_sparse
}
if not state_file:
state_file=os.path.join(path_to_store,'.btclient_state')
self._state_file = os.path.expanduser(state_file)
self._ses = self.lt.session()
if os.path.exists(self._state_file):
with open(self._state_file) as f:
state = pickle.load(f)
self._ses.load_state(state)
# self._ses.set_alert_mask(self.lt.alert.category_t.progress_notification)
if args:
s = self._ses.get_settings()
s['download_rate_limit'] = int(round(args.bt_download_limit * 1024))
s['upload_rate_limit'] = int(round(args.bt_upload_limit * 1024))
self._ses.set_settings(s)
self._ses.listen_on(args.listen_port_min, args.listen_port_max)
self.content_id=args.content_id
else:
self._ses.listen_on(6881, 6891)
self._start_services()
self._th = None
self._monitor.add_listener(self._check_ready)
self._dispatcher = BTClient.Dispatcher(self, lt=self.lt)
self._dispatcher.add_listener(self._update_ready_pieces)
self._hash = None
self._url = None
#if args and args.debug_log and args.trace:
# self.add_monitor_listener(self.debug_download_queue)
# self.add_dispatcher_listener(self.debug_alerts)
@property
def is_file_complete(self):
pcs = self._th.status().pieces[self._file.first_piece:self._file.last_piece + 1]
return all(pcs)
def _update_ready_pieces(self, alert_type, alert):
if alert_type == 'read_piece_alert' and self._file:
self._file.update_piece(alert.piece, alert.buffer)
def _check_ready(self, s, **kwargs):
if s.state in [3, 4, 5] and not self._file and s.progress > 0:
self._meta_ready(self._th.torrent_file())
logger.debug('Got torrent metadata and start download')
self.hash = Hasher(self._file, self._on_file_ready)
def _choose_file(self, files, i):
if not i and i!=0:
videos = filter(lambda f: VIDEO_EXTS.has_key(os.path.splitext(f.path)[1]), files)
if not videos:
raise Exception('No video files in torrent')
f = sorted(videos, key=lambda f: f.size)[-1]
i = files.index(f)
f.index = i
f=files[i]
f.index = i
return f
def _meta_ready(self, meta):
fs = meta.files()
files = fs if isinstance(fs, list) else [fs.at(i) for i in xrange(fs.num_files())]
f = self._choose_file(files, self.content_id)
fmap = meta.map_file(f.index, 0, 1)
self._file = BTFile(f.path, self._base_path, f.index, f.size, fmap, meta.piece_length(),
self.prioritize_piece)
self.prioritize_file()
print ('File %s pieces (pc=%d, ofs=%d, sz=%d), total_pieces=%d, pc_length=%d' %
(f.path, fmap.piece, fmap.start, fmap.length,
meta.num_pieces(), meta.piece_length()))
self._cache.file_complete(self._th.torrent_file(),
self._url if self._url and self._url.startswith('http') else None)
def prioritize_piece(self, pc, idx):
piece_duration = 1000
min_deadline = 2000
dl = idx * piece_duration + min_deadline
self._th.set_piece_deadline(pc, dl, self.lt.deadline_flags.alert_when_available)
logger.debug("Set deadline %d for piece %d", dl, pc)
# we do not need to download pieces that are lower then current index, but last two pieces are special because players sometime look at end of file
if idx == 0 and (self._file.last_piece - pc) > 2:
for i in xrange(pc - 1):
self._th.piece_priority(i, 0)
self._th.reset_piece_deadline(i)
def prioritize_file(self):
meta = self._th.torrent_file()
priorities = [1 if i >= self._file.first_piece and i <= self.file.last_piece else 0 \
for i in xrange(meta.num_pieces())]
self._th.prioritize_pieces(priorities)
def encrypt(self):
# Encryption settings
print 'Encryption enabling...'
try:
encryption_settings = self.lt.pe_settings()
encryption_settings.out_enc_policy = self.lt.enc_policy(self.lt.enc_policy.forced)
encryption_settings.in_enc_policy = self.lt.enc_policy(self.lt.enc_policy.forced)
encryption_settings.allowed_enc_level = self.lt.enc_level.both
encryption_settings.prefer_rc4 = True
self._ses.set_pe_settings(encryption_settings)
print 'Encryption on!'
except Exception, e:
print 'Encryption failed! Exception: ' + str(e)
pass
@property
def unique_file_id(self):
return str(self._th.torrent_file().info_hash())
@property
def pieces(self):
return self._th.status().pieces
def add_dispatcher_listener(self, cb):
self._dispatcher.add_listener(cb)
def remove_dispacher_listener(self, cb):
self._dispatcher.remove_listener(cb)
def remove_all_dispatcher_listeners(self):
self._dispatcher.remove_all_listeners()
def info_from_file(self, uri):
if os.access(uri, os.R_OK):
e = self.lt.bdecode(open(uri, 'rb').read())
info = self.lt.torrent_info(e)
tp = {'ti': info}
resume_data = self._cache.get_resume(info_hash=str(info.info_hash()))
if resume_data:
tp['resume_data'] = resume_data
return tp
raise ValueError('Invalid torrent path %s' % uri)
def start_url(self, uri):
if self._th:
raise Exception('Torrent is already started')
if uri.startswith('http://') or uri.startswith('https://'):
self._url = uri
stored = self._cache.get_torrent(url=uri)
if stored:
tp = self.info_from_file(stored)
else:
tp = {'url': uri}
resume_data = self._cache.get_resume(url=uri)
if resume_data:
tp['resume_data'] = resume_data
elif uri.startswith('magnet:'):
self._url = uri
stored = self._cache.get_torrent(info_hash=CacheBT.hash_from_magnet(uri))
if stored:
tp = self.info_from_file(stored)
else:
tp = {'url': uri}
resume_data = self._cache.get_resume(info_hash=CacheBT.hash_from_magnet(uri))
if resume_data:
tp['resume_data'] = resume_data
elif os.path.isfile(uri):
tp = self.info_from_file(uri)
else:
raise ValueError("Invalid torrent %s" % uri)
tp.update(self._torrent_params)
self._th = self._ses.add_torrent(tp)
for tr in INITIAL_TRACKERS:
self._th.add_tracker({'url': tr})
self._th.set_sequential_download(True)
time.sleep(1)
self._th.force_dht_announce()
# if tp.has_key('ti'):
# self._meta_ready(self._th.torrent_file())
self._monitor.start()
self._dispatcher.do_start(self._th, self._ses)
def stop(self):
BaseClient.stop(self)(self)
self._dispatcher.stop()
self._dispatcher.join()
def _start_services(self):
self._ses.add_dht_router('router.bittorrent.com', 6881)
self._ses.add_dht_router('router.utorrent.com', 6881)
self._ses.add_dht_router('router.bitcomet.com', 6881)
self._ses.start_dht()
self._ses.start_lsd()
self._ses.start_upnp()
self._ses.start_natpmp()
def _stop_services(self):
self._ses.stop_natpmp()
self._ses.stop_upnp()
self._ses.stop_lsd()
self._ses.stop_dht()
def save_state(self):
state = self._ses.save_state()
with open(self._state_file, 'wb') as f:
pickle.dump(state, f)
def save_resume(self):
if self._th.need_save_resume_data() and self._th.is_valid() and self._th.status().has_metadata:
r = BTClient.ResumeData(self)
start = time.time()
while (time.time() - start) <= 5:
if r.data or r.failed:
break
time.sleep(0.1)
if r.data:
logger.debug('Savig fast resume data')
self._cache.save_resume(self.unique_file_id, self.lt.bencode(r.data))
else:
logger.warn('Fast resume data not available')
def close(self):
self.remove_all_dispatcher_listeners()
self._monitor.stop()
if self._ses:
self._ses.pause()
if self._th:
self.save_resume()
self.save_state()
self._stop_services()
try:
self._ses.remove_torrent(self._th)
except:
print 'RuntimeError: invalid torrent handle used'
#BaseClient.close(self)
@property
def status(self):
if self._th:
s = self._th.status()
if self._file:
pieces = s.pieces[self._file.first_piece:self._file.last_piece]
if len(pieces)>0:
progress = float(sum(pieces)) / len(pieces)
else:
progress = 0
else:
progress = 0
size = self._file.size if self._file else 0
s.desired_rate = self._file.byte_rate if self._file and progress > 0.003 else 0
s.progress_file = progress
s.file_size = size
return s
class ResumeData(object):
def __init__(self, client):
self.data = None
self.failed = False
client.add_dispatcher_listener(self._process_alert)
client._th.save_resume_data()
def _process_alert(self, t, alert):
if t == 'save_resume_data_failed_alert':
logger.debug('Fast resume data generation failed')
self.failed = True
elif t == 'save_resume_data_alert':
self.data = alert.resume_data
class Dispatcher(BaseMonitor):
def __init__(self, client, lt=None):
super(BTClient.Dispatcher, self).__init__(client, name='Torrent Events Dispatcher')
self.lt=lt
def do_start(self, th, ses):
self._th = th
self._ses = ses
self.start()
def run(self):
if not self._ses:
raise Exception('Invalid state, session is not initialized')
while (self._running):
a = self._ses.wait_for_alert(1000)
if a:
alerts = self._ses.pop_alerts()
for alert in alerts:
with self._lock:
for cb in self._listeners:
if "udp_error_alert" not in self.lt.alert.what(alert):
cb(self.lt.alert.what(alert), alert)
STATE_STR = ['queued', 'checking', 'downloading metadata',
'downloading', 'finished', 'seeding', 'allocating', 'checking fastresume']
def print_status(self, s, client):
if self._th:
state_str = ['queued', 'checking', 'downloading metadata',
'downloading', 'finished', 'seeding', 'allocating', 'checking fastresume']
print('[%s] %.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d) %s' %
(self.lt.version, s.progress * 100, s.download_rate / 1000, s.upload_rate / 1000,
s.num_peers, state_str[s.state]))
def get_normalized_status(self):
s = self.status
if self._file:
pieces = s.pieces[self._file.first_piece: self._file.last_piece + 1]
downloaded = reduce(lambda s, x: s + (x and 1 or 0) * self._file.piece_size, pieces[:-1], 0)
if pieces[-1]:
rem = self._file.size % self._file.piece_size
downloaded += rem if rem else self._file.piece_size
else:
downloaded = 0
return {'source_type': 'bittorrent',
'state': BTClient.STATE_STR[s.state],
'downloaded': downloaded,
'total_size': s.file_size,
'download_rate': s.download_rate,
'upload_rate': s.upload_rate,
'desired_rate': s.desired_rate,
'piece_size': self._file.piece_size if self._file else 0,
'progress': s.progress_file,
# BT specific
'seeds_connected': s.num_seeds,
'seeds_total': s.num_complete,
'peers_connected': s.num_peers,
'peers_total': s.num_incomplete,
'num_pieces': s.num_pieces,
}
def debug_download_queue(self, s, client):
if s.state != 3:
return
download_queue = self._th.get_download_queue()
if self.file:
first = self.file.first_piece
else:
first = 0
q = map(lambda x: x['piece_index'] + first, download_queue)
logger.debug('Download queue: %s', q)
def debug_alerts(self, type, alert):
logger.debug("Alert %s - %s", type, alert)
class BTFile(AbstractFile):
def __init__(self, path, base, index, size, fmap, piece_size, prioritize_fn):
AbstractFile.__init__(self, path, base, size, piece_size)
self.index = index
self.first_piece = fmap.piece
self.last_piece = self.first_piece + max((size - 1 + fmap.start), 0) // piece_size
self.offset = fmap.start
self._prioritize_fn = prioritize_fn
def prioritize_piece(self, n, idx):
self._prioritize_fn(n, idx)
class LangAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
if nargs is not None:
raise ValueError("nargs not allowed")
super(LangAction, self).__init__(option_strings, dest, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
if len(values) != 3:
raise ValueError('subtitles language should be 3 letters code')
setattr(namespace, self.dest, values)
def main(args=None):
#from argparse import Namespace
#args=Namespace(bt_download_limit=0,
# bt_upload_limit=0,
# choose_subtitles=False,
# clear_older=0,
# debug_log='D:\\log.txt',
# delete_on_finish=True,#Flase,
# directory='D:\\',
# listen_port_max=6891,
# listen_port_min=6881,
# no_resume=False,
# player='vlc',#kodi
# port=5001,
# print_pieces=False,
# quiet=False,
# stdin=False,
# stream=False,
# subtitles=None,
# trace=True,
# url='D:\\ntest.torrent')
if not args:
p = argparse.ArgumentParser()
p.add_argument("url", help="Torrent file, link to file or magnet link")
p.add_argument("-d", "--directory", default="./", help="directory to save download files")
p.add_argument("-p", "--player", default="mplayer", choices=["mplayer", "vlc"], help="Video player")
p.add_argument("--port", type=int, default=5001, help="Port for http server")
p.add_argument("--debug-log", default='', help="File for debug logging")
p.add_argument("--stdin", action='store_true', help='sends video to player via stdin (no seek then)')
p.add_argument("--print-pieces", action="store_true",
help="Prints map of downloaded pieces and ends (X is downloaded piece, O is not downloaded)")
p.add_argument("-s", "--subtitles", action=LangAction,
help="language for subtitle 3 letter code eng,cze ... (will try to get subtitles from opensubtitles.org)")
p.add_argument("--stream", action="store_true", help="just file streaming, but will not start player")
p.add_argument("--no-resume", action="store_true", help="Do not resume from last known position")
p.add_argument("-q", "--quiet", action="store_true", help="Quiet - did not print progress to stdout")
p.add_argument('--delete-on-finish', action="store_true", help="Delete downloaded file when program finishes")
p.add_argument('--clear-older', type=int, default=0,
help="Deletes files older then x days from download directory, if set will slowdown start of client")
p.add_argument('--bt-download-limit', type=int, default=0, help='Download limit for torrents kB/s')
p.add_argument('--bt-upload-limit', type=int, default=0, help='Upload limit for torrents kB/s')
p.add_argument('--listen-port-min', type=int, default=6881, help='Bitorrent input port range - minimum port')
p.add_argument('--listen-port-max', type=int, default=6891, help='Bitorrent input port range - maximum port')
p.add_argument('--choose-subtitles', action="store_true",
help="Always manually choose subtitles (otherwise will try to use best match in many cases)")
p.add_argument('--trace', action='store_true', help='More detailed debug logging')
args = p.parse_args(args)
# str(args)
if args.debug_log:
logger.setLevel(logging.DEBUG)
h = logging.handlers.RotatingFileHandler(args.debug_log)
logger.addHandler(h)
else:
logger.setLevel(logging.CRITICAL)
logger.addHandler(logging.StreamHandler())
if args.clear_older:
days = args.clear_older
items = os.listdir(args.directory)
now = time.time()
for item in items:
if item != CacheBT.CACHE_DIR:
full_path = os.path.join(args.directory, item)
if now - os.path.getctime(full_path) > days * 24 * 3600:
logger.debug('Deleting path %s', full_path)
if os.path.isdir(full_path):
shutil.rmtree(full_path, ignore_errors=True)
else:
os.unlink(full_path)
if args.print_pieces:
print_pieces(args)
elif re.match('https?://localhost', args.url):
class TestResolver(Resolver):
SPEED_LIMIT = 300
THREADS = 2
stream(args, HTClient, TestResolver)
else:
#rclass = plugins.find_matching_plugin(args.url)
#if rclass:
# stream(args, HTClient, rclass)
#else:
#stream(args, BTClient)
return args
#def stream(args, client_class, resolver_class=None):
# c = client_class(args.directory, args=args, resolver_class=resolver_class)
# player = None
#
# def on_exit(sig=None, frame=None):
# c.close()
# if player:
# player.terminate()
# if sig:
# logger.info('Exiting by signal %d', sig)
# sys.exit(0)
#
# try:
#
# if not args.stream:
# player = Player.create(args.player, c.update_play_time)
#
# server = None
# # test if port if free, otherwise find free
# free_port = args.port
# while True:
#
# try:
# s = socket.socket()
# res = s.connect_ex(('127.0.0.1', free_port))
# if res:
# break
# finally:
# s.close()
# free_port += 1
# if not args.stdin:
# server = StreamServer(('127.0.0.1', free_port), BTFileHandler, allow_range=True,
# status_fn=c.get_normalized_status)
# logger.debug('Started http server on port %d', free_port)
# server.run()
# #thread.start_new_thread(server.run, ())
# if player:
# def start_play(f, finished):
# base = None
# if not args.stdin:
# server.set_file(f)
# base = 'http://127.0.0.1:' + str(free_port) + '/'
# sin = args.stdin
# if finished:
# base = args.directory
# sin = False
# logger.debug('File is already downloaded, will play it directly')
# args.play_file = True
#
# if args.no_resume:
# start_time = 0
# else:
# start_time = c.last_play_time or 0
# player.start(f, base, stdin=sin, sub_lang=args.subtitles, start_time=start_time,
# always_choose_subtitles=args.choose_subtitles)
# logger.debug('Started media player for %s', f)
#
# c.set_on_file_ready(start_play)
# else:
# def print_url(f, done):
# server.set_file(f)
# base = 'http://127.0.0.1:' + str(free_port) + '/'
# url = urlparse.urljoin(base, urllib.quote(f.path))
# print "\nServing file on %s" % url
# sys.stdout.flush()
#
# c.set_on_file_ready(print_url)
#
# logger.debug('Starting btclient - libtorrent version %s', self.lt.version)
# c.start_url(args.url)
# while not c.is_file_ready:
# time.sleep(1)
# if not args.stdin or hasattr(args, 'play_file') and args.play_file:
# f = None
# else:
# f = c.file.create_cursor()
#
# while True:
# if player and not player.is_playing():
# break
# if not f:
# time.sleep(1)
# else:
# buf = f.read(1024)
# if buf:
# try:
# player.write(buf)
# logger.debug("written to stdin")
# except IOError:
# pass
# else:
# player.close()
# if f:
# f.close()
# logger.debug('Play ended')
# if server:
# server.stop()
# if player:
# if player.rcode != 0:
# msg = 'Player ended with error %d\n' % (player.rcode or 0)
# sys.stderr.write(msg)
# logger.error(msg)
#
# logger.debug("Player output:\n %s", player.log)
# finally:
# on_exit()
# # logger.debug("Remaining threads %s", list(threading.enumerate()))
def pieces_map(pieces, w):
idx = 0
sz = len(pieces)
w(" " * 4)
for i in xrange(10):
w("%d " % i)
w('\n')
while idx < sz:
w("%3d " % (idx / 10))
for _c in xrange(min(10, sz - idx)):
if pieces[idx]:
w('X ')
else:
w('O ')
idx += 1
w('\n')
def print_pieces(args):
def w(x):
sys.stdout.write(x)
c = BTClient(args.directory)
c.start_url(args.url)
# c.add_listener(print_status)
start = time.time()
while time.time() - start < 60:
if c.file:
print "Pieces (each %.0f k) for file: %s" % (c.file.piece_size / 1024.0, c.file.path)
pieces = c.pieces
pieces_map(pieces, w)
return
time.sleep(1)
print >> sys.stderr, "Cannot get metadata"
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print >> sys.stderr, '\nProgram interrupted, exiting'
logger.info('Exiting by SIGINT')
except Exception:
traceback.print_exc()
logger.exception('General error')

View File

@ -0,0 +1,100 @@
'''
Created on Apr 28, 2015
@author: ivan
'''
import os.path
import shelve
import re
import logging
import base64
logger = logging.getLogger('cache')
class CacheBT(object):
CACHE_DIR = '.cache'
def __init__(self, path, lt):
if not os.path.isdir(path):
raise ValueError('Invalid base directory')
self.path = os.path.join(path, CacheBT.CACHE_DIR)
if not os.path.isdir(self.path):
os.mkdir(self.path)
self._index_path = os.path.join(self.path, 'index')
self._index = shelve.open(self._index_path)
self._last_pos_path = os.path.join(self.path, 'last_position')
self._last_pos = shelve.open(self._last_pos_path)
self.lt=lt
def save(self, url, info_hash):
self._index[url] = info_hash
self._index.sync()
def close(self):
self._index.close()
self._last_pos.close()
def _tname(self, info_hash):
return os.path.join(self.path, info_hash.upper() + '.torrent')
def _rname(self, info_hash):
return os.path.join(self.path, info_hash.upper() + '.resume')
def save_resume(self, info_hash, data):
with open(self._rname(info_hash), 'wb') as f:
f.write(data)
def get_resume(self, url=None, info_hash=None):
if url:
info_hash = self._index.get(url)
if not info_hash:
return
rname = self._rname(info_hash)
if os.access(rname, os.R_OK):
with open(rname, 'rb') as f:
return f.read()
def file_complete(self, torrent, url=None):
info_hash = str(torrent.info_hash())
nt = self.lt.create_torrent(torrent)
tname = self._tname(info_hash)
with open(tname, 'wb') as f:
f.write(self.lt.bencode(nt.generate()))
if url:
self.save(url, info_hash)
def get_torrent(self, url=None, info_hash=None):
if url:
info_hash = self._index.get(url)
if not info_hash:
return
tname = self._tname(info_hash)
if os.access(tname, os.R_OK):
logger.debug('Torrent is cached')
return tname
magnet_re = re.compile('xt=urn:btih:([0-9A-Za-z]+)')
hexa_chars = re.compile('^[0-9A-F]+$')
@staticmethod
def hash_from_magnet(m):
res = CacheBT.magnet_re.search(m)
if res:
ih = res.group(1).upper()
if len(ih) == 40 and CacheBT.hexa_chars.match(ih):
return res.group(1).upper()
elif len(ih) == 32:
s = base64.b32decode(ih)
return "".join("{:02X}".format(ord(c)) for c in s)
else:
raise ValueError('Not BT magnet link')
else:
raise ValueError('Not BT magnet link')
def play_position(self, info_hash, secs):
self._last_pos[info_hash] = secs
def get_last_position(self, info_hash):
return self._last_pos.get(info_hash) or 0

View File

@ -0,0 +1,477 @@
'''
Created on May 3, 2015
@author: ivan
'''
import os
from collections import deque
import logging
from threading import Lock, Event, Thread
import copy
import threading
import traceback
import shutil
import urlparse
from StringIO import StringIO
from hachoir_metadata import extractMetadata
from hachoir_parser import guessParser
import hachoir_core.config as hachoir_config
from hachoir_core.stream.input import InputIOStream
from opensubtitle import OpenSubtitles
from cachebt import CacheBT
logger = logging.getLogger('common')
hachoir_config.quiet = True
def enum(**enums):
return type('Enum', (), enums)
TerminalColor = enum(default='\033[39m', green='\033[32m', red='\033[31m', yellow='\033[33m')
def get_duration(fn):
# We need to provide just begining of file otherwise hachoir might try to read all file
with open(fn, 'rb') as f:
s = StringIO(f.read(1024 * 64))
p = guessParser(InputIOStream(s, filename=unicode(fn), tags=[]))
m = extractMetadata(p)
if m:
return m.getItem('duration', 0) and m.getItem('duration', 0).value
def debug_fn(fn):
def _fn(*args, **kwargs):
print "Entering %s, thread %s" % (fn.__name__, threading.current_thread().name)
traceback.print_stack()
ret = fn(*args, **kwargs)
print "Leaving %s, thread %s" % (fn.__name__, threading.current_thread().name)
return ret
return _fn
class Hasher(Thread):
def __init__(self, btfile, hash_cb):
Thread.__init__(self, name="Hasher")
if btfile is None:
raise ValueError('BTFile is None!')
self._btfile = btfile
self._hash_cb = hash_cb
self.hash = None
self.daemon = True
self.start()
def run(self):
with self._btfile.create_cursor() as c:
filehash = OpenSubtitles.hash_file(c, self._btfile.size)
self.hash = filehash
self._hash_cb(filehash)
class BaseMonitor(Thread):
def __init__(self, client, name):
Thread.__init__(self, name=name)
self.daemon = True
self._listeners = []
self._lock = Lock()
self._wait_event = Event()
self._running = True
self._client = client
self._ses = None
def add_to_ctx(self, key, val):
self._ctx[key] = val
def stop(self):
self._running = False
self._wait_event.set()
def add_listener(self, cb):
with self._lock:
if not cb in self._listeners:
self._listeners.append(cb)
def remove_listener(self, cb):
with self._lock:
try:
self._listeners.remove(cb)
except ValueError:
pass
def remove_all_listeners(self):
with self._lock:
self._listeners = []
class BaseClient(object):
class Monitor(BaseMonitor):
def __init__(self, client):
super(BaseClient.Monitor, self).__init__(client, name="Status Monitor")
self._client = client
def run(self):
while (self._running):
s = self._client.status
with self._lock:
for cb in self._listeners:
cb(s, client=self._client)
self._wait_event.wait(1.0)
def __init__(self, path_to_store, args=None):
self._base_path = path_to_store
self._ready = False
self._file = None
self._on_ready_action = None
self._monitor = BaseClient.Monitor(self)
if not args or not args.quiet:
self.add_monitor_listener(self.print_status)
self._delete_on_close = True if args and args.delete_on_finish else False
def _on_file_ready(self, filehash):
self._file.filehash = filehash
self._ready = True
if self._on_ready_action:
self._on_ready_action(self._file, self.is_file_complete)
@property
def status(self):
raise NotImplementedError()
def get_normalized_status(self):
s = self.status
return {'source_type': 'base',
'state': s.state,
'downloaded': s.downloaded,
'total_size': s.total_size,
'download_rate': s.download_rate,
'desired_rate': s.desired_rate,
'progress': s.progress,
'piece_size': self._file.piece_size if self._file else 0
}
@property
def file(self):
return self._file
def set_on_file_ready(self, action):
self._on_ready_action = action
@property
def is_file_ready(self):
return self._ready
def print_status(self, s, client):
raise NotImplementedError()
@property
def is_file_complete(self):
raise NotImplementedError()
def start_url(self, uri):
raise NotImplementedError()
def close(self):
if self._cache:
self._cache.close()
if self._delete_on_close and self._file:
self._file.remove()
@property
def unique_file_id(self):
raise NotImplementedError()
def update_play_time(self, playtime):
self._cache.play_position(self.unique_file_id, playtime)
@property
def last_play_time(self):
return self._cache.get_last_position(self.unique_file_id)
def add_monitor_listener(self, cb):
self._monitor.add_listener(cb)
def remove_monitor_listener(self, cb):
self._monitor.remove_listener(cb)
def stop(self):
self._monitor.stop()
self._monitor.join()
class PieceCache(object):
TIMEOUT = 30
size = 5
def __init__(self, btfile):
# self._btfile=btfile
self._cache = [None] * self.size
self._lock = Lock()
self._event = Event()
self._cache_first = btfile.first_piece
self._piece_size = btfile.piece_size
self._map_offset = btfile.map_piece
self._file_size = btfile.size
self._last_piece = btfile.last_piece
self._request_piece = btfile.prioritize_piece
self._btfile = btfile
def clone(self):
c = PieceCache(self._btfile)
with self._lock:
c._cache = copy.copy(self._cache)
c._cache_first = self._cache_first
return c
@property
def cached_piece(self):
self._cache_first
def fill_cache(self, first):
to_request = []
with self._lock:
diff = first - self._cache_first
if diff > 0:
for i in xrange(self.size):
if i + diff < self.size:
self._cache[i] = self._cache[i + diff]
else:
self._cache[i] = None
elif diff < 0:
for i in xrange(self.size - 1, -1, -1):
if i + diff >= 0:
self._cache[i] = self._cache[i + diff]
else:
self._cache[i] = None
self._cache_first = first
self._event.clear()
for i in xrange(self.size):
if self._cache[i] is None and (self._cache_first + i) <= self._last_piece:
to_request.append((self._cache_first + i, i))
for args in to_request:
self._request_piece(*args)
def add_piece(self, n, data):
with self._lock:
i = n - self._cache_first
if i >= 0 and i < self.size:
self._cache[i] = data
if i == 0:
self._event.set()
def has_piece(self, n):
with self._lock:
i = n - self._cache_first
if i >= 0 and i < self.size:
return not (self._cache[i] is None)
def _wait_piece(self, pc_no):
while not self.has_piece(pc_no):
self.fill_cache(pc_no)
# self._event.clear()
logger.debug('Waiting for piece %d' % pc_no)
self._event.wait(self.TIMEOUT)
def _get_piece(self, n):
with self._lock:
i = n - self._cache_first
if i < 0 or i > self.size:
raise ValueError('index of of scope of current cache')
return self._cache[i]
def get_piece(self, n):
self._wait_piece(n)
return self._get_piece(n)
def read(self, offset, size):
size = min(size, self._file_size - offset)
if not size:
return
pc_no, ofs = self._map_offset(offset)
data = self.get_piece(pc_no)
pc_size = self._piece_size - ofs
if pc_size > size:
return data[ofs: ofs + size]
else:
pieces = [data[ofs:self._piece_size]]
remains = size - pc_size
new_head = pc_no + 1
while remains and self.has_piece(new_head):
sz = min(remains, self._piece_size)
data = self.get_piece(new_head)
pieces.append(data[:sz])
remains -= sz
if remains:
new_head += 1
self.fill_cache(new_head)
return ''.join(pieces)
class BTCursor(object):
def __init__(self, btfile):
self._btfile = btfile
self._pos = 0
self._cache = PieceCache(btfile)
def clone(self):
c = BTCursor(self._btfile)
c._cache = self._cache.clone()
return c
def close(self):
self._btfile.remove_cursor(self)
def read(self, n=None):
sz = self._btfile.size - self._pos
if not n:
n = sz
else:
n = min(n, sz)
res = self._cache.read(self._pos, n)
if res:
self._pos += len(res)
return res
def seek(self, n):
if n > self._btfile.size:
n = self._btfile.size
# raise ValueError('Seeking beyond file size')
elif n < 0:
raise ValueError('Seeking negative')
self._pos = n
def tell(self):
return self._pos
def update_piece(self, n, data):
self._cache.add_piece(n, data)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class AbstractFile(object):
def __init__(self, path, base, size, piece_size):
self._base = base
self.size = size
self.path = path
self.piece_size = piece_size
self.offset = 0
self._full_path = os.path.join(base, path)
self._cursors = []
self._cursors_history = deque(maxlen=3)
self._lock = Lock()
self.first_piece = 0
self.last_piece = self.first_piece + (max(size - 1, 0)) // piece_size
self._rate = None
self._piece_duration = None
def add_cursor(self, c):
with self._lock:
self._cursors.append(c)
def remove_cursor(self, c):
with self._lock:
self._cursors.remove(c)
self._cursors_history.appendleft(c)
def create_cursor(self, offset=None):
c = None
if offset is not None:
with self._lock:
for e in reversed(self._cursors):
if abs(e.tell() - offset) < self.piece_size:
c = e.clone()
logger.debug('Cloning existing cursor')
break
if not c:
with self._lock:
for e in reversed(self._cursors_history):
if abs(e.tell() - offset) < self.piece_size:
c = e
logger.debug('Reusing previous cursor')
if not c:
c = BTCursor(self)
self.add_cursor(c)
if offset:
c.seek(offset)
return c
def map_piece(self, ofs):
return self.first_piece + (ofs + self.offset) // self.piece_size, \
(ofs + self.offset) % self.piece_size
def prioritize_piece(self, piece, idx):
raise NotImplementedError()
@property
def full_path(self):
return self._full_path
def close(self):
pass
def remove(self):
dirs = self.path.split(os.sep)
if len(dirs) > 1:
shutil.rmtree(os.path.join(self._base, dirs[0]), ignore_errors=True)
else:
os.unlink(self._full_path)
def update_piece(self, n, data):
for c in self._cursors:
c.update_piece(n, data)
@property
def duration(self):
if not hasattr(self, '_duration'):
self._duration = get_duration(self._full_path) if os.path.exists(self._full_path) else 0
return self._duration or 0
@property
def piece_duration_ms(self):
if not self._piece_duration:
if self.byte_rate:
self._piece_duration = self.piece_size / self.byte_rate / 1000
return self._piece_duration
@property
def byte_rate(self):
if not self._rate:
d = self.duration
if d:
self._rate = self.size / d.total_seconds()
return self._rate
def __str__(self):
return self.path
class Resolver(object):
URL_PATTERN = None
SPEED_LIMIT = None # kB/s
THREADS = 4
def __init__(self, loader):
self._client = loader
def resolve(self, url):
return url
@staticmethod
def url_to_file(uri):
path = urlparse.urlsplit(uri)[2]
if path.startswith('/'):
path = path[1:]
return path

View File

@ -0,0 +1,503 @@
'''
Created on May 3, 2015
@author: ivan
'''
import urllib2
import os.path
import pickle
import logging
from cookielib import CookieJar
from collections import namedtuple
import random
from httplib import BadStatusLine, IncompleteRead
import socket
import time
import re
import Queue
import collections
from threading import Lock, Thread
import sys
import threading
from urllib import urlencode
import zlib
from io import BytesIO
import gzip
import json
from collections import deque
from common import AbstractFile, BaseClient, Hasher, Resolver, TerminalColor
from bs4 import BeautifulSoup
logger = logging.getLogger('htclient')
Piece = namedtuple('Piece', ['piece', 'data', 'total_size', 'type'])
class HTTPLoader(object):
UA_STRING = ['Mozilla/5.0 (Windows NT 6.3; rv:36.0) Gecko/20100101 Firefox/36.0',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A']
PARSER = 'lxml' # 'html5lib'#'html.parser'#'lxml'
class Error(Exception):
pass
def __init__(self, url, id, resolver_class=None):
self.id = id
self.user_agent = self._choose_ua()
resolver_class = resolver_class or Resolver
self._client = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar()))
self.url = self.resolve_file_url(resolver_class, url)
if not self.url:
raise HTTPLoader.Error('Urlwas not resolved to file link')
def resolve_file_url(self, resolver_class, url):
r = resolver_class(self)
return r.resolve(url)
def _choose_ua(self):
return HTTPLoader.UA_STRING[random.randint(0, len(HTTPLoader.UA_STRING) - 1)]
RANGE_RE = re.compile(r'bytes\s+(\d+)-(\d+)/(\d+)')
def _parse_range(self, r):
m = HTTPLoader.RANGE_RE.match(r)
if not m:
raise HTTPLoader.Error('Invalid range header')
return int(m.group(1)), int(m.group(2)), int(m.group(3))
def open(self, url, data=None, headers={}, method='get'):
hdr = {'User-Agent': self.user_agent}
hdr.update(headers)
url, post_args = self._encode_data(url, data, method)
req = urllib2.Request(url, post_args, headers=headers)
res = None
retries = 5
while retries:
try:
res = self._client.open(req, timeout=10)
break
except (IOError, urllib2.HTTPError, BadStatusLine, IncompleteRead, socket.timeout) as e:
if isinstance(e, urllib2.HTTPError) and hasattr(e, 'code') and str(e.code) == '404':
raise HTTPLoader.Error('Url %s not found', url)
logging.warn('Retry on (%s) due to IO or HTTPError (%s) ', threading.current_thread().name, e)
retries -= 1
time.sleep(1)
if not res:
raise HTTPLoader.Error('Cannot open resource %s' % url)
return res
def load_piece(self, piece_no, piece_size):
start = piece_no * piece_size
headers = {'Range': 'bytes=%d-' % start}
res = self.open(self.url, headers=headers)
allow_range_header = res.info().getheader('Accept-Ranges')
if allow_range_header and allow_range_header.lower() == 'none':
raise HTTPLoader.Error('Ranges are not supported')
size_header = res.info().getheader('Content-Length')
total_size = int(size_header) if size_header else None
range_header = res.info().getheader('Content-Range')
if not range_header:
if piece_no and not total_size:
raise HTTPLoader.Error('Ranges are not supported')
else:
from_pos, to_pos, size = 0, total_size - 1, total_size
else:
from_pos, to_pos, size = self._parse_range(range_header)
type_header = res.info().getheader('Content-Type')
if not type_header:
raise HTTPLoader.Error('Content Type is missing')
data = res.read(piece_size)
res.close()
return Piece(piece_no, data, size, type_header)
@staticmethod
def decode_data(res):
header = res.info()
data = res.read()
if header.get('Content-Encoding') == 'gzip':
tmp_stream = gzip.GzipFile(fileobj=BytesIO(data))
data = tmp_stream.read()
elif header.get('Content-Encoding') == 'deflate':
data = zlib.decompress(data)
return data
def _encode_data(self, url, data, method='post'):
if not data:
return url, None
if method.lower() == 'post':
return url, urlencode(data)
else:
return url + '?' + urlencode(data), None
def load_page(self, url, data=None, method='get'):
res = self.open(url, data, headers={'Accept-Encoding': "gzip, deflate"}, method=method)
# Content-Type:"text/html; charset=utf-8"
type_header = res.info().getheader('Content-Type')
if not type_header.startswith('text/html'):
raise HTTPLoader.Error("%s is not HTML page" % url)
# Content-Encoding:"gzip"
data = HTTPLoader.decode_data(res)
pg = BeautifulSoup(data, HTTPLoader.PARSER)
return pg
def load_json(self, url, data, method='get'):
res = self.open(url, data, headers={'Accept-Encoding': "gzip, deflate", 'X-Requested-With': 'XMLHttpRequest'},
method=method)
type_header = res.info().getheader('Content-Type')
if not type_header.startswith('application/json'):
raise HTTPLoader.Error("%s is not JSON" % url)
# Content-Encoding:"gzip"
data = HTTPLoader.decode_data(res)
return json.loads(data, encoding='utf8')
def get_redirect(self, url, data, method='get'):
pass
class PriorityQueue(Queue.PriorityQueue):
NORMAL_PRIORITY = 99
NO_DOWNLOAD = 999
NO_PIECE = -1
def __init__(self):
Queue.PriorityQueue.__init__(self, maxsize=0)
self._wset = {}
self._lock = Lock()
def put_piece(self, piece, priority, remove_previous=False):
# do not put piece which is already in queue
with self._lock:
if piece in self._wset:
if self._wset[piece][0] == priority:
return
else:
self.remove_piece(piece)
if remove_previous:
for k in self._wset:
if k < piece:
self.remove_piece(k)
entry = [priority, piece]
self._wset[piece] = entry
self.put(entry, block=False)
def remove_piece(self, piece):
entry = self._wset.pop(piece)
entry[-1] = PriorityQueue.NO_PIECE
def get_piece(self, timeout=None):
while True:
_priority, piece = self.get(timeout=timeout)
with self._lock:
if piece is not PriorityQueue.NO_PIECE:
del self._wset[piece]
return piece
class Pool(object):
def __init__(self, piece_size, loaders, cb, speed_limit=None):
self.piece_size = piece_size
self._cb = cb
self.speed_limit = speed_limit
self._queue = PriorityQueue()
self._running = True
self._threads = [Thread(name="worker %d" % i, target=self.work, args=[l]) for i, l in enumerate(loaders)]
for t in self._threads:
t.daemon = True
t.start()
def add_worker_async(self, id, gen_fn, args=[], kwargs={}):
def resolve():
l = gen_fn(*args, **kwargs)
t = Thread(name="worker %d" % id, target=self.work, args=[l])
t.daemon = True
t.start()
self._threads.append(t)
adder = Thread(name="Adder", target=resolve)
adder.daemon = True
adder.start()
def work(self, loader):
while self._running:
pc = self._queue.get_piece()
if not self._running:
break
try:
start = time.time()
p = loader.load_piece(pc, self.piece_size)
self._cb(p.piece, p.data)
if self.speed_limit:
dur = time.time() - start
expected = self.piece_size / 1024.0 / self.speed_limit
wait_time = expected - dur
if wait_time > 0:
logger.debug('Waiting %f on %s', wait_time, threading.current_thread().name)
time.sleep(wait_time)
except Exception, e:
logger.exception('(%s) Error when loading piece %d: %s', threading.current_thread().name, pc, e)
def stop(self):
self._running = False
# push some dummy tasks to assure workers ends
for i in xrange(len(self._threads)):
self._queue.put_piece(i, PriorityQueue.NO_DOWNLOAD)
def add_piece(self, piece, priority=PriorityQueue.NORMAL_PRIORITY, remove_previous=False):
self._queue.put_piece(piece, priority, remove_previous)
class HTClient(BaseClient):
def __init__(self, path_to_store, args=None, piece_size=2 * 1024 * 1024, no_threads=2, resolver_class=None):
BaseClient.__init__(self, path_to_store, args=args)
self._pool = None
self.resolver_class = resolver_class
self._no_threads = self.resolver_class.THREADS if self.resolver_class and hasattr(self.resolver_class,
'THREADS') else no_threads
self.piece_size = piece_size
self._last_downloaded = deque(maxlen=60)
def update_piece(self, piece, data):
self._file.update_piece(piece, data)
if not self._ready and hasattr(self._file, 'filehash') and self._file.filehash \
and all(self._file.pieces[:5]):
self._set_ready(self._file.is_complete)
def request_piece(self, piece, priority):
if self._pool:
self._pool.add_piece(piece, priority)
else:
raise Exception('Pool not started')
def _on_file_ready(self, filehash):
self._file.filehash = filehash
if self.is_file_complete:
self._set_ready(True)
def _set_ready(self, complete):
self._ready = True
if self._on_ready_action:
self._on_ready_action(self._file, complete)
def start_url(self, uri):
self._monitor.start()
path = self.resolver_class.url_to_file(uri)
c0 = None
try:
self._file = HTFile(path, self._base_path, 0, self.piece_size, self.request_piece)
except HTFile.UnknownSize:
c0 = HTTPLoader(uri, 0, self.resolver_class)
p = c0.load_piece(0, self.piece_size)
self._file = HTFile(path, self._base_path, p.total_size, self.piece_size, self.request_piece)
self.update_piece(0, p.data)
self._file.mime = p.type
if not self._file.is_complete:
c0 = c0 or HTTPLoader(uri, 0, self.resolver_class)
self._pool = Pool(self.piece_size, [c0],
self.update_piece,
speed_limit=self.resolver_class.SPEED_LIMIT if hasattr(self.resolver_class,
'SPEED_LIMIT') else None)
def gen_loader(i):
return HTTPLoader(uri, i, self.resolver_class)
for i in xrange(1, self._no_threads):
self._pool.add_worker_async(i, gen_loader, (i,))
# get remaining pieces with normal priority
for i in xrange(1, self._file.last_piece + 1):
if not self._file.pieces[i]:
self._pool.add_piece(i)
self.hash = Hasher(self._file, self._on_file_ready)
@property
def is_file_complete(self):
return self.file.is_complete if self.file else False
@property
def unique_file_id(self):
return self.file.filehash
Status = collections.namedtuple('Status',
['state', 'downloaded', 'download_rate', 'total_size', 'threads', 'desired_rate'])
@property
def status(self):
tick = time.time()
state = 'starting' if not self.file else 'finished' if self.is_file_complete else 'downloading'
downloaded = self.file.downloaded if self.file else 0
if self._last_downloaded:
prev_time, prev_down = self._last_downloaded[0]
download_rate = (downloaded - prev_down) / (tick - prev_time)
else:
download_rate = 0
total_size = self.file.size if self.file else 0
threads = self._no_threads
desired_rate = self._file.byte_rate if self._file else 0
if self.file:
self._last_downloaded.append((tick, downloaded))
return HTClient.Status(state, downloaded, download_rate, total_size, threads, desired_rate)
def get_normalized_status(self):
s = self.status
return {'source_type': 'http',
'state': s.state,
'downloaded': s.downloaded,
'total_size': s.total_size,
'download_rate': s.download_rate,
'desired_rate': s.desired_rate,
'piece_size': self._file.piece_size if self._file else 0,
'progress': s.downloaded / float(s.total_size) if s.total_size else 0,
# HTTP specific
'threads': s.threads
}
def close(self):
if self._file:
self._file.close()
BaseClient.close(self)
def print_status(self, s, client):
progress = s.downloaded / float(s.total_size) * 100 if s.total_size else 0
total_size = s.total_size / 1048576.0
color = ''
if progress >= 100.0 or not s.desired_rate or s.state in ('finished', 'starting'):
color = TerminalColor.default
elif s.desired_rate > s.download_rate:
color = TerminalColor.red
elif s.download_rate > s.desired_rate and s.download_rate < s.desired_rate * 1.2:
color = TerminalColor.yellow
else:
color = TerminalColor.green
print '\r%.2f%% (of %.1fMB) down: %s%.1f kB/s\033[39m(need %.1f) %s' % \
(progress, total_size,
color, s.download_rate / 1000.0, s.desired_rate / 1000.0 if s.desired_rate else 0.0,
s.state),
sys.stdout.write("\033[K")
sys.stdout.flush()
class HTFile(AbstractFile):
def __init__(self, path, base, size, piece_size=2097152, prioritize_fn=None):
self._full_path = os.path.join(base, path)
self._prioritize_fn = prioritize_fn
self.pieces = None
self.mime = None
size = self._load_cached(size)
AbstractFile.__init__(self, path, base, size, piece_size)
if not self.pieces or len(self.pieces) != self.last_piece + 1:
self.pieces = [False for _i in xrange(self.last_piece + 1)]
self._file = open(self.full_path, 'r+b')
class UnknownSize(Exception):
pass
def _load_cached(self, size):
if not os.path.exists(self.full_path) and size:
self._allocate_file(size)
return size
elif os.path.exists(self.full_path):
sz = os.stat(self.full_path).st_size
if size and size != sz:
logger.warn('Invalid cached file')
self._allocate_file(size)
return size
pcs_file = self.pieces_index_file
if os.access(pcs_file, os.R_OK):
with open(pcs_file, 'rb') as f:
pcs = pickle.load(f)
if isinstance(pcs, tuple):
self.pieces = pcs[1]
self.mime = pcs[0]
else:
logger.warn('Invalid pieces file %s', pcs_file)
return sz
else:
raise HTFile.UnknownSize()
@property
def pieces_index_file(self):
return self.full_path + '.pieces'
def _allocate_file(self, size):
path = os.path.split(self.full_path)[0]
if not os.path.exists(path):
os.makedirs(path, mode=0755)
# sparecelly allocate file
with open(self.full_path, 'ab') as f:
f.truncate(size)
def update_piece(self, n, data):
assert n != self.last_piece and len(data) == self.piece_size or \
n == self.last_piece and len(data) == (
self.size % self.piece_size) or self.piece_size, "Invalid size of piece %d - %d" % (n, len(data))
assert n >= self.first_piece and n <= self.last_piece, 'Invalid piece no %d' % n
with self._lock:
if not self.pieces[n]:
self._file.seek(n * self.piece_size)
self._file.write(data)
self._file.flush()
self.pieces[n] = True
logger.debug('Received piece %d in thread %s', n, threading.current_thread().name)
AbstractFile.update_piece(self, n, data)
def prioritize_piece(self, piece, idx):
assert piece >= self.first_piece and piece <= self.last_piece, 'Invalid piece no %d' % piece
data = None
with self._lock:
if self.pieces[piece]:
sz = self.piece_size if piece < self.last_piece else self.size % self.piece_size
self._file.seek(piece * self.piece_size)
data = self._file.read(sz)
assert len(data) == sz
if data:
AbstractFile.update_piece(self, piece, data)
elif self._prioritize_fn:
self._prioritize_fn(piece, idx)
else:
assert False, 'Missing prioritize fn'
@property
def is_complete(self):
return all(self.pieces)
@property
def downloaded(self):
sum = 0
for i, p in enumerate(self.pieces):
if p and i == self.last_piece:
sum += (self.size % self.piece_size) or self.piece_size
elif p:
sum += self.piece_size
return sum
def remove(self):
AbstractFile.remove(self)
if os.path.exists(self.pieces_index_file):
os.unlink(self.pieces_index_file)
def close(self):
self._file.close()
d = os.path.split(self.pieces_index_file)[0]
if d and os.path.isdir(d):
with open(self.pieces_index_file, 'wb') as f:
pickle.dump((self.mime, self.pieces), f)

View File

@ -0,0 +1,263 @@
'''
Created on Apr 2, 2015
@author: ivan
'''
import xmlrpclib
import urllib2
import os.path
import gzip
from StringIO import StringIO
import logging
import subprocess
import struct
import time
from _version import __version__
logger = logging.getLogger('opensubtitles')
class Urllib2Transport(xmlrpclib.Transport):
def __init__(self, opener=None, https=False, use_datetime=0):
xmlrpclib.Transport.__init__(self, use_datetime)
self.opener = opener or urllib2.build_opener()
self.https = https
def request(self, host, handler, request_body, verbose=0):
proto = ('http', 'https')[bool(self.https)]
req = urllib2.Request('%s://%s%s' % (proto, host, handler), request_body)
req.add_header('User-agent', self.user_agent)
self.verbose = verbose
return self.parse_response(self.opener.open(req))
class OpenSubtitles(object):
USER_AGENT = 'BTClient v%s' % __version__
def __init__(self, lang, user='', pwd=''):
self._lang = lang
self._proxy = xmlrpclib.ServerProxy('http://api.opensubtitles.org/xml-rpc',
Urllib2Transport(use_datetime=True),
allow_none=True, use_datetime=True)
self._token = None
self._user = user
self._pwd = pwd
def login(self):
res = self._proxy.LogIn(self._user, self._pwd, 'en', self.USER_AGENT)
self._parse_status(res)
token = res.get('token')
if token:
self._token = token
else:
raise xmlrpclib.Fault('NO_TOKEN', 'No token!')
def _parse_status(self, res):
if res.has_key('status'):
code = res['status'].split()[0]
if code != '200':
raise xmlrpclib.Fault('ERROR_CODE_RETURENED', 'Returned error status: %s (%s)' % (code, res))
return True
else:
raise xmlrpclib.Fault('NO_STATUS', 'No status!')
def search(self, filename, filesize=None, filehash=None, limit=20):
filename = os.path.split(filename)[1]
name = os.path.splitext(filename)[0]
query = []
if filehash and filesize:
query.append({'sublanguageid': self._lang, 'moviehash': filehash, 'moviebytesize': str(filesize)})
query.append({'sublanguageid': self._lang, 'tag': filename})
query.append({'sublanguageid': self._lang, 'query': name})
res = self._proxy.SearchSubtitles(self._token, query, {'limit': limit})
self._parse_status(res)
data = res.get('data')
return data if data else []
@staticmethod
def _sub_file(filename, lang, ext):
lang = lang.lower()
path, fname = os.path.split(filename)
fname = os.path.splitext(fname)[0]
return os.path.join(path, fname + '.' + lang + '.' + ext)
@staticmethod
def _base_name(filename):
fname = os.path.split(filename)[1]
return os.path.splitext(fname)[0]
@staticmethod
def hash_file(f, filesize):
longlongformat = '<q' # little-endian long long
bytesize = struct.calcsize(longlongformat)
hash = filesize # @ReservedAssignment
if filesize < 65536 * 2:
raise ValueError("SizeError")
for _x in range(65536 / bytesize):
buffer = f.read(bytesize) # @ReservedAssignment
(l_value,) = struct.unpack(longlongformat, buffer)
hash += l_value
hash = hash & 0xFFFFFFFFFFFFFFFF # to remain as 64bit number @ReservedAssignment
f.seek(max(0, filesize - 65536))
for _x in range(65536 / bytesize):
buffer = f.read(bytesize) # @ReservedAssignment
(l_value,) = struct.unpack(longlongformat, buffer)
hash += l_value
hash = hash & 0xFFFFFFFFFFFFFFFF # @ReservedAssignment
returnedhash = "%016x" % hash
return returnedhash
def choose(self, data):
null = f = open(os.devnull, "w")
items = []
for l in data:
items.append(l['SubDownloadLink'])
items.append(l['SubFileName'])
items.append(l['SubDownloadsCnt'])
p = subprocess.Popen(
'zenity --list --title "Select subtitles" --text "Select best matching subtitles" --width 1024 --height 600 --column Link --column Name --column Downloads --hide-column=1',
stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=null, shell=True)
res, _ = p.communicate(u'\n'.join(items).encode('utf-8'))
null.close()
return res if res.startswith('http') else None
@staticmethod
def download_if_not_exists(filename, lang, filesize=None, filehash=None, sub_ext='srt',
can_choose=True, overwrite=False, retries=3):
sfile = OpenSubtitles._sub_file(filename, lang, sub_ext)
if os.path.exists(sfile) and os.stat(sfile).st_size > 0 and not overwrite:
logger.debug('subs %s are already downloaded', sfile)
return sfile
else:
while True:
try:
with OpenSubtitles(lang) as opensub:
res = opensub.download(filename, filesize, filehash, can_choose)
if res:
logger.debug('Subtitles %s downloaded', res)
return res
else:
logger.debug('No subtitles found for file %s in language %s', filename, lang)
return
except urllib2.HTTPError, e:
retries -= 1
if retries <= 0:
raise e
logger.debug('Retrying to load subtitles due to HTTP error %d, remains %d attempts', e.code,
retries)
time.sleep(1)
def download(self, filename, filesize=None, filehash=None, can_choose=True):
data = self.search(filename, filesize, filehash)
if not data:
return None
media_file = OpenSubtitles._base_name(filename).lower()
def same_name(b):
return media_file == OpenSubtitles._base_name(b['SubFileName']).lower()
if filehash and filesize:
match = filter(lambda x: x.get('QueryNumber', 0) == 0, data)
logger.debug('Got results by filehash')
else:
match = filter(same_name, data)
if match and can_choose != 'always':
sub = match[0]
link = sub['SubDownloadLink']
ext = sub['SubFormat']
logger.debug('Find exact match for media file, subtitle is %s', sub['SubFileName'])
elif can_choose:
link = self.choose(data)
ext = 'srt'
else:
sub = data[0]
link = sub['SubDownloadLink']
ext = sub['SubFormat']
if link:
return self.download_link(filename, link, ext)
def download_link(self, filename, link, ext):
out_file = OpenSubtitles._sub_file(filename, self._lang, ext)
res = urllib2.urlopen(link, timeout=10)
data = StringIO(res.read())
data.seek(0)
res.close()
z = gzip.GzipFile(fileobj=data)
with open(out_file, 'wb') as f:
while True:
d = z.read(1024)
if not d:
break
f.write(d)
z.close()
return out_file
def logout(self):
try:
res = self._proxy.LogOut(self._token)
self._parse_status(res)
except urllib2.HTTPError:
logger.warn('Failed to logout')
def __enter__(self):
self.login()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.logout()
def down(f, lang, overwrite=False):
filesize, filehash = calc_hash(f)
OpenSubtitles.download_if_not_exists(f, lang, filesize=filesize,
filehash=filehash, can_choose=True, overwrite=overwrite)
def calc_hash(f):
if not os.access(f, os.R_OK):
raise ValueError('Cannot read from file %s' % f)
filesize = os.stat(f).st_size
with open(f, 'rb') as fs:
filehash = OpenSubtitles.hash_file(fs, filesize)
return filesize, filehash
def list_subs(f, lang):
import pprint
filesize, filehash = calc_hash(f)
with OpenSubtitles(lang) as opensub:
res = opensub.search(f, filesize, filehash)
res = map(lambda x: {'SubFileName': x['SubFileName'],
'SubDownloadsCnt': x['SubDownloadsCnt'],
'QueryNumber': x.get('QueryNumber', 0),
},
res)
pprint.pprint(res)
if __name__ == '__main__':
from argparse import ArgumentParser
p = ArgumentParser()
p.add_argument("video_file", help="video file")
p.add_argument("-d", "--download", action="store_true", help="Download subtitles for video files")
p.add_argument("-l", "--list", action="store_true", help="List available subtitles")
p.add_argument("--lang", default='eng', help="Language")
p.add_argument("--debug", action="store_true", help="Print debug messages")
p.add_argument("--overwrite", action="store_true", help="Overwrite existing subtitles ")
args = p.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
if args.download:
down(args.video_file, args.lang, args.overwrite)
else:
list_subs(args.video_file, args.lang)

View File

@ -0,0 +1,5 @@
#-*- coding: utf-8 -*-
'''
Torrenter v2 plugin for XBMC/Kodi
Copyright (C) 2015 DiMartino
'''