diff --git a/.idea/workspace.xml b/.idea/workspace.xml index ccc99d6..f6b0c1c 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,7 +2,17 @@ - + + + + + + + + + + + @@ -33,265 +43,213 @@ - - - - - - - - - - - - - - + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + - - - - - - - - - - - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -301,9 +259,9 @@ - + - + @@ -313,7 +271,88 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -341,57 +380,57 @@ @@ -427,7 +466,6 @@ - @@ -542,11 +580,12 @@ + - + @@ -725,126 +764,6 @@ @@ -1044,28 +1083,28 @@ - + - + - - + + - + + + - - + - @@ -1080,24 +1119,6 @@ - - - - - - - - - - - - - - - - - - @@ -1105,373 +1126,37 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1479,7 +1164,6 @@ - @@ -1487,158 +1171,271 @@ - - + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1646,26 +1443,285 @@ - + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + - - + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/BTClientPlayer.py b/BTClientPlayer.py index 2fa0324..8305093 100644 --- a/BTClientPlayer.py +++ b/BTClientPlayer.py @@ -23,6 +23,8 @@ import urllib import json import sys from contextlib import contextmanager, closing, nested +import traceback +from argparse import Namespace import xbmc import xbmcgui @@ -31,14 +33,8 @@ import xbmcgui import xbmcvfs import Localization from platform_pulsar import get_platform -import traceback -from btclient import * -from functions import calculate, showMessage, clearStorage, DownloadDB, get_ids_video, log, debug, is_writable -from argparse import Namespace +from functions import showMessage, DownloadDB, get_ids_video, log, debug from Player import OverlayText -from Libtorrent import Libtorrent - - ROOT = sys.modules["__main__"].__root__ RESOURCES_PATH = os.path.join(ROOT, 'resources') diff --git a/addon.xml b/addon.xml index 123b9a0..c1385c4 100644 --- a/addon.xml +++ b/addon.xml @@ -1,9 +1,10 @@  - + - + + diff --git a/resources/__init__.py b/resources/__init__.py index e69de29..29440e1 100644 --- a/resources/__init__.py +++ b/resources/__init__.py @@ -0,0 +1,5 @@ +#-*- coding: utf-8 -*- +''' + Torrenter v2 plugin for XBMC/Kodi + Copyright (C) 2015 DiMartino +''' \ No newline at end of file diff --git a/resources/btclient/__init__.py b/resources/btclient/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/resources/btclient/_version.py b/resources/btclient/_version.py new file mode 100644 index 0000000..a987347 --- /dev/null +++ b/resources/btclient/_version.py @@ -0,0 +1 @@ +__version__ = '0.4.2' diff --git a/resources/btclient/btclient.py b/resources/btclient/btclient.py new file mode 100644 index 0000000..7c49138 --- /dev/null +++ b/resources/btclient/btclient.py @@ -0,0 +1,819 @@ +#!/usr/bin/env python +import time +import sys +import argparse +import os.path +from threading import Thread +import re +import urlparse +import BaseHTTPServer as htserver +import types +import logging +import logging.handlers +import traceback +import urllib +import SocketServer +import socket +import pickle +import thread +import json +import shutil + +from cachebt import CacheBT +#from player import Player +from common import AbstractFile, Hasher, BaseMonitor, BaseClient, Resolver +from htclient import HTClient +#import plugins # @UnresolvedImport + +logging.basicConfig() +logger = logging.getLogger() + + +INITIAL_TRACKERS = ['udp://tracker.openbittorrent.com:80', + 'udp://tracker.istole.it:80', + 'udp://open.demonii.com:80', + 'udp://tracker.coppersurfer.tk:80', + 'udp://tracker.leechers-paradise.org:6969', + 'udp://exodus.desync.com:6969', + 'udp://tracker.publicbt.com:80'] + +VIDEO_EXTS = {'.avi': 'video/x-msvideo', '.mp4': 'video/mp4', '.mkv': 'video/x-matroska', + '.m4v': 'video/mp4', '.mov': 'video/quicktime', '.mpg': 'video/mpeg', '.ogv': 'video/ogg', + '.ogg': 'video/ogg', '.webm': 'video/webm', '.ts': 'video/mp2t', '.3gp': 'video/3gpp'} + +RANGE_RE = re.compile(r'bytes=(\d+)-') + +# offset from end to download first +FILE_TAIL = 10000 + +class x: + lol='' + +def parse_range(range): # @ReservedAssignment + if range: + m = RANGE_RE.match(range) + if m: + try: + return int(m.group(1)) + except: + pass + return 0 + + +class StreamServer(SocketServer.ThreadingMixIn, htserver.HTTPServer): + daemon_threads = True + + def __init__(self, address, handler_class, tfile=None, allow_range=True, status_fn=None): + htserver.HTTPServer.__init__(self, address, handler_class) + self.file = tfile + self._running = True + self.allow_range = allow_range + self.status_fn = status_fn + + def stop(self): + self._running = False + + def set_file(self, f): + self.file = f + + def serve(self, w=0.1): + while self._running: + try: + self.handle_request() + time.sleep(w) + except Exception, e: + print >> sys.stderr, str(e) + + def run(self): + self.timeout = 0.1 + t = Thread(target=self.serve, args=[0.1], name='HTTP Server') + t.daemon = True + t.start() + + def handle_error(self, request, client_address): + """Handle an error gracefully. May be overridden. + + The default is to print a traceback and continue. + + """ + _, e, _ = sys.exc_info() + if isinstance(e, socket.error) and e.errno == 32: + logger.debug("Socket disconnect for client %s", client_address) + # pprint.pprint(e) + else: + logger.exception("HTTP Server Error") + # TODO: remove print + traceback.print_exc() + + +class BTFileHandler(htserver.BaseHTTPRequestHandler): + protocol_version = 'HTTP/1.1' + + def do_GET(self): + + if self.do_HEAD(only_header=False): + with self.server.file.create_cursor(self._offset) as f: + send_something = False + while True: + buf = f.read(1024) + if not send_something and logger.level <= logging.DEBUG: + logger.debug('Start sending data') + send_something = True + if buf: + self.wfile.write(buf) + else: + if logger.level <= logging.DEBUG: + logger.debug('Finished sending data') + break + + def _file_info(self): + size = self.server.file.size + ext = os.path.splitext(self.server.file.path)[1] + mime = (self.server.file.mime if hasattr(self.server.file, 'mime') else None) or VIDEO_EXTS.get(ext) + if not mime: + mime = 'application/octet-stream' + return size, mime + + def do_HEAD(self, only_header=True): + parsed_url = urlparse.urlparse(self.path) + if parsed_url.path == "/status" and self.server.status_fn: + s = self.server.status_fn() + status = json.dumps(s) + self.send_response(200, 'OK') + self.send_header('Content-Type', 'application/json') + self.send_header('Content-Length', len(status)) + self._finish_header(only_header) + if not only_header: + self.wfile.write(status) + return False + + elif self.server.file and urllib.unquote(parsed_url.path) == '/' + self.server.file.path: + self._offset = 0 + size, mime = self._file_info() + range = None # @ReservedAssignment + if self.server.allow_range: + range = parse_range(self.headers.get('Range', None)) # @ReservedAssignment + if range: + self._offset = range + range = (range, size - 1, size) # @ReservedAssignment + logger.debug('Request range %s - (header is %s', range, self.headers.get('Range', None)) + self.send_resp_header(mime, size, range, only_header) + return True + + else: + logger.error('Requesting wrong path %s, but file is %s', parsed_url.path, '/' + self.server.file.path) + self.send_error(404, 'Not Found') + + def send_resp_header(self, cont_type, cont_length, range=False, only_header=False): # @ReservedAssignment + # logger.debug('range is %s'% str(range)) + if self.server.allow_range and range: + self.send_response(206, 'Partial Content') + else: + self.send_response(200, 'OK') + self.send_header('Content-Type', cont_type) + self.send_header('transferMode.dlna.org', 'Streaming') + self.send_header('contentFeatures.dlna.org', + 'DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=01700000000000000000000000000000') + if self.server.allow_range: + self.send_header('Accept-Ranges', 'bytes') + else: + self.send_header('Accept-Ranges', 'none') + if self.server.allow_range and range: + if isinstance(range, (types.TupleType, types.ListType)) and len(range) == 3: + self.send_header('Content-Range', 'bytes %d-%d/%d' % range) + self.send_header('Content-Length', range[1] - range[0] + 1) + else: + raise ValueError('Invalid range value') + else: + self.send_header('Content-Length', cont_length) + self._finish_header(only_header) + + def _finish_header(self, only_header): + self.send_header('Connection', 'close') + if not only_header: self.end_headers() + + def log_message(self, format, *args): # @ReservedAssignment + logger.debug(format, *args) + + +class BTClient(BaseClient): + def __init__(self, path_to_store, + args=None, + state_file="", + lt=None, + **kwargs): + super(BTClient, self).__init__(path_to_store, args=args) + self.lt=lt + self._cache = CacheBT(path_to_store, self.lt) + self._torrent_params = {'save_path': path_to_store, + 'storage_mode': self.lt.storage_mode_t.storage_mode_sparse + } + if not state_file: + state_file=os.path.join(path_to_store,'.btclient_state') + self._state_file = os.path.expanduser(state_file) + self._ses = self.lt.session() + if os.path.exists(self._state_file): + with open(self._state_file) as f: + state = pickle.load(f) + self._ses.load_state(state) + # self._ses.set_alert_mask(self.lt.alert.category_t.progress_notification) + if args: + s = self._ses.get_settings() + s['download_rate_limit'] = int(round(args.bt_download_limit * 1024)) + s['upload_rate_limit'] = int(round(args.bt_upload_limit * 1024)) + self._ses.set_settings(s) + self._ses.listen_on(args.listen_port_min, args.listen_port_max) + self.content_id=args.content_id + else: + self._ses.listen_on(6881, 6891) + self._start_services() + self._th = None + + self._monitor.add_listener(self._check_ready) + self._dispatcher = BTClient.Dispatcher(self, lt=self.lt) + self._dispatcher.add_listener(self._update_ready_pieces) + self._hash = None + self._url = None + + #if args and args.debug_log and args.trace: + # self.add_monitor_listener(self.debug_download_queue) + # self.add_dispatcher_listener(self.debug_alerts) + + @property + def is_file_complete(self): + pcs = self._th.status().pieces[self._file.first_piece:self._file.last_piece + 1] + return all(pcs) + + def _update_ready_pieces(self, alert_type, alert): + if alert_type == 'read_piece_alert' and self._file: + self._file.update_piece(alert.piece, alert.buffer) + + def _check_ready(self, s, **kwargs): + if s.state in [3, 4, 5] and not self._file and s.progress > 0: + self._meta_ready(self._th.torrent_file()) + logger.debug('Got torrent metadata and start download') + self.hash = Hasher(self._file, self._on_file_ready) + + def _choose_file(self, files, i): + if not i and i!=0: + videos = filter(lambda f: VIDEO_EXTS.has_key(os.path.splitext(f.path)[1]), files) + if not videos: + raise Exception('No video files in torrent') + f = sorted(videos, key=lambda f: f.size)[-1] + i = files.index(f) + f.index = i + f=files[i] + f.index = i + return f + + def _meta_ready(self, meta): + fs = meta.files() + files = fs if isinstance(fs, list) else [fs.at(i) for i in xrange(fs.num_files())] + f = self._choose_file(files, self.content_id) + fmap = meta.map_file(f.index, 0, 1) + self._file = BTFile(f.path, self._base_path, f.index, f.size, fmap, meta.piece_length(), + self.prioritize_piece) + + self.prioritize_file() + print ('File %s pieces (pc=%d, ofs=%d, sz=%d), total_pieces=%d, pc_length=%d' % + (f.path, fmap.piece, fmap.start, fmap.length, + meta.num_pieces(), meta.piece_length())) + + self._cache.file_complete(self._th.torrent_file(), + self._url if self._url and self._url.startswith('http') else None) + + def prioritize_piece(self, pc, idx): + piece_duration = 1000 + min_deadline = 2000 + dl = idx * piece_duration + min_deadline + self._th.set_piece_deadline(pc, dl, self.lt.deadline_flags.alert_when_available) + logger.debug("Set deadline %d for piece %d", dl, pc) + + # we do not need to download pieces that are lower then current index, but last two pieces are special because players sometime look at end of file + if idx == 0 and (self._file.last_piece - pc) > 2: + for i in xrange(pc - 1): + self._th.piece_priority(i, 0) + self._th.reset_piece_deadline(i) + + def prioritize_file(self): + meta = self._th.torrent_file() + priorities = [1 if i >= self._file.first_piece and i <= self.file.last_piece else 0 \ + for i in xrange(meta.num_pieces())] + self._th.prioritize_pieces(priorities) + + def encrypt(self): + # Encryption settings + print 'Encryption enabling...' + try: + encryption_settings = self.lt.pe_settings() + encryption_settings.out_enc_policy = self.lt.enc_policy(self.lt.enc_policy.forced) + encryption_settings.in_enc_policy = self.lt.enc_policy(self.lt.enc_policy.forced) + encryption_settings.allowed_enc_level = self.lt.enc_level.both + encryption_settings.prefer_rc4 = True + self._ses.set_pe_settings(encryption_settings) + print 'Encryption on!' + except Exception, e: + print 'Encryption failed! Exception: ' + str(e) + pass + + @property + def unique_file_id(self): + return str(self._th.torrent_file().info_hash()) + + @property + def pieces(self): + return self._th.status().pieces + + def add_dispatcher_listener(self, cb): + self._dispatcher.add_listener(cb) + + def remove_dispacher_listener(self, cb): + self._dispatcher.remove_listener(cb) + + def remove_all_dispatcher_listeners(self): + self._dispatcher.remove_all_listeners() + + def info_from_file(self, uri): + if os.access(uri, os.R_OK): + e = self.lt.bdecode(open(uri, 'rb').read()) + info = self.lt.torrent_info(e) + tp = {'ti': info} + resume_data = self._cache.get_resume(info_hash=str(info.info_hash())) + if resume_data: + tp['resume_data'] = resume_data + return tp + raise ValueError('Invalid torrent path %s' % uri) + + def start_url(self, uri): + if self._th: + raise Exception('Torrent is already started') + + if uri.startswith('http://') or uri.startswith('https://'): + self._url = uri + stored = self._cache.get_torrent(url=uri) + if stored: + tp = self.info_from_file(stored) + else: + tp = {'url': uri} + resume_data = self._cache.get_resume(url=uri) + if resume_data: + tp['resume_data'] = resume_data + elif uri.startswith('magnet:'): + self._url = uri + stored = self._cache.get_torrent(info_hash=CacheBT.hash_from_magnet(uri)) + if stored: + tp = self.info_from_file(stored) + else: + tp = {'url': uri} + resume_data = self._cache.get_resume(info_hash=CacheBT.hash_from_magnet(uri)) + if resume_data: + tp['resume_data'] = resume_data + elif os.path.isfile(uri): + tp = self.info_from_file(uri) + else: + raise ValueError("Invalid torrent %s" % uri) + + tp.update(self._torrent_params) + self._th = self._ses.add_torrent(tp) + for tr in INITIAL_TRACKERS: + self._th.add_tracker({'url': tr}) + self._th.set_sequential_download(True) + time.sleep(1) + self._th.force_dht_announce() + # if tp.has_key('ti'): + # self._meta_ready(self._th.torrent_file()) + + self._monitor.start() + self._dispatcher.do_start(self._th, self._ses) + + def stop(self): + BaseClient.stop(self)(self) + self._dispatcher.stop() + self._dispatcher.join() + + def _start_services(self): + self._ses.add_dht_router('router.bittorrent.com', 6881) + self._ses.add_dht_router('router.utorrent.com', 6881) + self._ses.add_dht_router('router.bitcomet.com', 6881) + self._ses.start_dht() + self._ses.start_lsd() + self._ses.start_upnp() + self._ses.start_natpmp() + + def _stop_services(self): + self._ses.stop_natpmp() + self._ses.stop_upnp() + self._ses.stop_lsd() + self._ses.stop_dht() + + def save_state(self): + state = self._ses.save_state() + with open(self._state_file, 'wb') as f: + pickle.dump(state, f) + + def save_resume(self): + if self._th.need_save_resume_data() and self._th.is_valid() and self._th.status().has_metadata: + r = BTClient.ResumeData(self) + start = time.time() + while (time.time() - start) <= 5: + if r.data or r.failed: + break + time.sleep(0.1) + if r.data: + logger.debug('Savig fast resume data') + self._cache.save_resume(self.unique_file_id, self.lt.bencode(r.data)) + else: + logger.warn('Fast resume data not available') + + def close(self): + self.remove_all_dispatcher_listeners() + self._monitor.stop() + if self._ses: + self._ses.pause() + if self._th: + self.save_resume() + self.save_state() + self._stop_services() + try: + self._ses.remove_torrent(self._th) + except: + print 'RuntimeError: invalid torrent handle used' + #BaseClient.close(self) + + @property + def status(self): + if self._th: + s = self._th.status() + if self._file: + pieces = s.pieces[self._file.first_piece:self._file.last_piece] + if len(pieces)>0: + progress = float(sum(pieces)) / len(pieces) + else: + progress = 0 + else: + progress = 0 + size = self._file.size if self._file else 0 + s.desired_rate = self._file.byte_rate if self._file and progress > 0.003 else 0 + s.progress_file = progress + s.file_size = size + return s + + class ResumeData(object): + def __init__(self, client): + self.data = None + self.failed = False + client.add_dispatcher_listener(self._process_alert) + client._th.save_resume_data() + + def _process_alert(self, t, alert): + if t == 'save_resume_data_failed_alert': + logger.debug('Fast resume data generation failed') + self.failed = True + elif t == 'save_resume_data_alert': + self.data = alert.resume_data + + class Dispatcher(BaseMonitor): + def __init__(self, client, lt=None): + super(BTClient.Dispatcher, self).__init__(client, name='Torrent Events Dispatcher') + self.lt=lt + + def do_start(self, th, ses): + self._th = th + self._ses = ses + self.start() + + def run(self): + if not self._ses: + raise Exception('Invalid state, session is not initialized') + + while (self._running): + a = self._ses.wait_for_alert(1000) + if a: + alerts = self._ses.pop_alerts() + for alert in alerts: + with self._lock: + for cb in self._listeners: + if "udp_error_alert" not in self.lt.alert.what(alert): + cb(self.lt.alert.what(alert), alert) + + STATE_STR = ['queued', 'checking', 'downloading metadata', + 'downloading', 'finished', 'seeding', 'allocating', 'checking fastresume'] + + def print_status(self, s, client): + if self._th: + + state_str = ['queued', 'checking', 'downloading metadata', + 'downloading', 'finished', 'seeding', 'allocating', 'checking fastresume'] + print('[%s] %.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d) %s' % + (self.lt.version, s.progress * 100, s.download_rate / 1000, s.upload_rate / 1000, + s.num_peers, state_str[s.state])) + + + def get_normalized_status(self): + s = self.status + if self._file: + pieces = s.pieces[self._file.first_piece: self._file.last_piece + 1] + downloaded = reduce(lambda s, x: s + (x and 1 or 0) * self._file.piece_size, pieces[:-1], 0) + if pieces[-1]: + rem = self._file.size % self._file.piece_size + downloaded += rem if rem else self._file.piece_size + else: + downloaded = 0 + return {'source_type': 'bittorrent', + 'state': BTClient.STATE_STR[s.state], + 'downloaded': downloaded, + 'total_size': s.file_size, + 'download_rate': s.download_rate, + 'upload_rate': s.upload_rate, + 'desired_rate': s.desired_rate, + 'piece_size': self._file.piece_size if self._file else 0, + 'progress': s.progress_file, + # BT specific + 'seeds_connected': s.num_seeds, + 'seeds_total': s.num_complete, + 'peers_connected': s.num_peers, + 'peers_total': s.num_incomplete, + 'num_pieces': s.num_pieces, + + } + + def debug_download_queue(self, s, client): + if s.state != 3: + return + download_queue = self._th.get_download_queue() + if self.file: + first = self.file.first_piece + else: + first = 0 + q = map(lambda x: x['piece_index'] + first, download_queue) + logger.debug('Download queue: %s', q) + + def debug_alerts(self, type, alert): + logger.debug("Alert %s - %s", type, alert) + + +class BTFile(AbstractFile): + def __init__(self, path, base, index, size, fmap, piece_size, prioritize_fn): + AbstractFile.__init__(self, path, base, size, piece_size) + self.index = index + self.first_piece = fmap.piece + self.last_piece = self.first_piece + max((size - 1 + fmap.start), 0) // piece_size + self.offset = fmap.start + self._prioritize_fn = prioritize_fn + + def prioritize_piece(self, n, idx): + self._prioritize_fn(n, idx) + + +class LangAction(argparse.Action): + def __init__(self, option_strings, dest, nargs=None, **kwargs): + if nargs is not None: + raise ValueError("nargs not allowed") + super(LangAction, self).__init__(option_strings, dest, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + if len(values) != 3: + raise ValueError('subtitles language should be 3 letters code') + setattr(namespace, self.dest, values) + + +def main(args=None): + #from argparse import Namespace + #args=Namespace(bt_download_limit=0, + # bt_upload_limit=0, + # choose_subtitles=False, + # clear_older=0, + # debug_log='D:\\log.txt', + # delete_on_finish=True,#Flase, + # directory='D:\\', + # listen_port_max=6891, + # listen_port_min=6881, + # no_resume=False, + # player='vlc',#kodi + # port=5001, + # print_pieces=False, + # quiet=False, + # stdin=False, + # stream=False, + # subtitles=None, + # trace=True, + # url='D:\\ntest.torrent') + if not args: + p = argparse.ArgumentParser() + p.add_argument("url", help="Torrent file, link to file or magnet link") + p.add_argument("-d", "--directory", default="./", help="directory to save download files") + p.add_argument("-p", "--player", default="mplayer", choices=["mplayer", "vlc"], help="Video player") + p.add_argument("--port", type=int, default=5001, help="Port for http server") + p.add_argument("--debug-log", default='', help="File for debug logging") + p.add_argument("--stdin", action='store_true', help='sends video to player via stdin (no seek then)') + p.add_argument("--print-pieces", action="store_true", + help="Prints map of downloaded pieces and ends (X is downloaded piece, O is not downloaded)") + p.add_argument("-s", "--subtitles", action=LangAction, + help="language for subtitle 3 letter code eng,cze ... (will try to get subtitles from opensubtitles.org)") + p.add_argument("--stream", action="store_true", help="just file streaming, but will not start player") + p.add_argument("--no-resume", action="store_true", help="Do not resume from last known position") + p.add_argument("-q", "--quiet", action="store_true", help="Quiet - did not print progress to stdout") + p.add_argument('--delete-on-finish', action="store_true", help="Delete downloaded file when program finishes") + p.add_argument('--clear-older', type=int, default=0, + help="Deletes files older then x days from download directory, if set will slowdown start of client") + p.add_argument('--bt-download-limit', type=int, default=0, help='Download limit for torrents kB/s') + p.add_argument('--bt-upload-limit', type=int, default=0, help='Upload limit for torrents kB/s') + p.add_argument('--listen-port-min', type=int, default=6881, help='Bitorrent input port range - minimum port') + p.add_argument('--listen-port-max', type=int, default=6891, help='Bitorrent input port range - maximum port') + p.add_argument('--choose-subtitles', action="store_true", + help="Always manually choose subtitles (otherwise will try to use best match in many cases)") + p.add_argument('--trace', action='store_true', help='More detailed debug logging') + args = p.parse_args(args) + # str(args) + if args.debug_log: + logger.setLevel(logging.DEBUG) + h = logging.handlers.RotatingFileHandler(args.debug_log) + logger.addHandler(h) + else: + logger.setLevel(logging.CRITICAL) + logger.addHandler(logging.StreamHandler()) + + if args.clear_older: + days = args.clear_older + items = os.listdir(args.directory) + now = time.time() + for item in items: + if item != CacheBT.CACHE_DIR: + full_path = os.path.join(args.directory, item) + if now - os.path.getctime(full_path) > days * 24 * 3600: + logger.debug('Deleting path %s', full_path) + if os.path.isdir(full_path): + shutil.rmtree(full_path, ignore_errors=True) + else: + os.unlink(full_path) + + if args.print_pieces: + print_pieces(args) + elif re.match('https?://localhost', args.url): + class TestResolver(Resolver): + SPEED_LIMIT = 300 + THREADS = 2 + + stream(args, HTClient, TestResolver) + else: + #rclass = plugins.find_matching_plugin(args.url) + #if rclass: + # stream(args, HTClient, rclass) + #else: + #stream(args, BTClient) + return args + +#def stream(args, client_class, resolver_class=None): +# c = client_class(args.directory, args=args, resolver_class=resolver_class) +# player = None +# +# def on_exit(sig=None, frame=None): +# c.close() +# if player: +# player.terminate() +# if sig: +# logger.info('Exiting by signal %d', sig) +# sys.exit(0) +# +# try: +# +# if not args.stream: +# player = Player.create(args.player, c.update_play_time) +# +# server = None +# # test if port if free, otherwise find free +# free_port = args.port +# while True: +# +# try: +# s = socket.socket() +# res = s.connect_ex(('127.0.0.1', free_port)) +# if res: +# break +# finally: +# s.close() +# free_port += 1 +# if not args.stdin: +# server = StreamServer(('127.0.0.1', free_port), BTFileHandler, allow_range=True, +# status_fn=c.get_normalized_status) +# logger.debug('Started http server on port %d', free_port) +# server.run() +# #thread.start_new_thread(server.run, ()) +# if player: +# def start_play(f, finished): +# base = None +# if not args.stdin: +# server.set_file(f) +# base = 'http://127.0.0.1:' + str(free_port) + '/' +# sin = args.stdin +# if finished: +# base = args.directory +# sin = False +# logger.debug('File is already downloaded, will play it directly') +# args.play_file = True +# +# if args.no_resume: +# start_time = 0 +# else: +# start_time = c.last_play_time or 0 +# player.start(f, base, stdin=sin, sub_lang=args.subtitles, start_time=start_time, +# always_choose_subtitles=args.choose_subtitles) +# logger.debug('Started media player for %s', f) +# +# c.set_on_file_ready(start_play) +# else: +# def print_url(f, done): +# server.set_file(f) +# base = 'http://127.0.0.1:' + str(free_port) + '/' +# url = urlparse.urljoin(base, urllib.quote(f.path)) +# print "\nServing file on %s" % url +# sys.stdout.flush() +# +# c.set_on_file_ready(print_url) +# +# logger.debug('Starting btclient - libtorrent version %s', self.lt.version) +# c.start_url(args.url) +# while not c.is_file_ready: +# time.sleep(1) +# if not args.stdin or hasattr(args, 'play_file') and args.play_file: +# f = None +# else: +# f = c.file.create_cursor() +# +# while True: +# if player and not player.is_playing(): +# break +# if not f: +# time.sleep(1) +# else: +# buf = f.read(1024) +# if buf: +# try: +# player.write(buf) +# logger.debug("written to stdin") +# except IOError: +# pass +# else: +# player.close() +# if f: +# f.close() +# logger.debug('Play ended') +# if server: +# server.stop() +# if player: +# if player.rcode != 0: +# msg = 'Player ended with error %d\n' % (player.rcode or 0) +# sys.stderr.write(msg) +# logger.error(msg) +# +# logger.debug("Player output:\n %s", player.log) +# finally: +# on_exit() +# # logger.debug("Remaining threads %s", list(threading.enumerate())) + + +def pieces_map(pieces, w): + idx = 0 + sz = len(pieces) + w(" " * 4) + for i in xrange(10): + w("%d " % i) + w('\n') + while idx < sz: + w("%3d " % (idx / 10)) + for _c in xrange(min(10, sz - idx)): + if pieces[idx]: + w('X ') + else: + w('O ') + idx += 1 + w('\n') + + +def print_pieces(args): + def w(x): + sys.stdout.write(x) + + c = BTClient(args.directory) + c.start_url(args.url) + # c.add_listener(print_status) + start = time.time() + while time.time() - start < 60: + if c.file: + print "Pieces (each %.0f k) for file: %s" % (c.file.piece_size / 1024.0, c.file.path) + pieces = c.pieces + pieces_map(pieces, w) + return + time.sleep(1) + print >> sys.stderr, "Cannot get metadata" + + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print >> sys.stderr, '\nProgram interrupted, exiting' + logger.info('Exiting by SIGINT') + except Exception: + traceback.print_exc() + logger.exception('General error') diff --git a/resources/btclient/cachebt.py b/resources/btclient/cachebt.py new file mode 100644 index 0000000..be1a9d4 --- /dev/null +++ b/resources/btclient/cachebt.py @@ -0,0 +1,100 @@ +''' +Created on Apr 28, 2015 + +@author: ivan +''' +import os.path +import shelve +import re +import logging +import base64 + +logger = logging.getLogger('cache') + + +class CacheBT(object): + CACHE_DIR = '.cache' + + def __init__(self, path, lt): + if not os.path.isdir(path): + raise ValueError('Invalid base directory') + self.path = os.path.join(path, CacheBT.CACHE_DIR) + if not os.path.isdir(self.path): + os.mkdir(self.path) + self._index_path = os.path.join(self.path, 'index') + self._index = shelve.open(self._index_path) + self._last_pos_path = os.path.join(self.path, 'last_position') + self._last_pos = shelve.open(self._last_pos_path) + self.lt=lt + + def save(self, url, info_hash): + self._index[url] = info_hash + self._index.sync() + + def close(self): + self._index.close() + self._last_pos.close() + + def _tname(self, info_hash): + return os.path.join(self.path, info_hash.upper() + '.torrent') + + def _rname(self, info_hash): + return os.path.join(self.path, info_hash.upper() + '.resume') + + def save_resume(self, info_hash, data): + with open(self._rname(info_hash), 'wb') as f: + f.write(data) + + def get_resume(self, url=None, info_hash=None): + if url: + info_hash = self._index.get(url) + if not info_hash: + return + rname = self._rname(info_hash) + if os.access(rname, os.R_OK): + with open(rname, 'rb') as f: + return f.read() + + def file_complete(self, torrent, url=None): + info_hash = str(torrent.info_hash()) + nt = self.lt.create_torrent(torrent) + tname = self._tname(info_hash) + with open(tname, 'wb') as f: + f.write(self.lt.bencode(nt.generate())) + if url: + self.save(url, info_hash) + + def get_torrent(self, url=None, info_hash=None): + if url: + info_hash = self._index.get(url) + if not info_hash: + return + tname = self._tname(info_hash) + if os.access(tname, os.R_OK): + logger.debug('Torrent is cached') + return tname + + magnet_re = re.compile('xt=urn:btih:([0-9A-Za-z]+)') + hexa_chars = re.compile('^[0-9A-F]+$') + + @staticmethod + def hash_from_magnet(m): + res = CacheBT.magnet_re.search(m) + if res: + ih = res.group(1).upper() + if len(ih) == 40 and CacheBT.hexa_chars.match(ih): + return res.group(1).upper() + elif len(ih) == 32: + s = base64.b32decode(ih) + return "".join("{:02X}".format(ord(c)) for c in s) + else: + raise ValueError('Not BT magnet link') + + else: + raise ValueError('Not BT magnet link') + + def play_position(self, info_hash, secs): + self._last_pos[info_hash] = secs + + def get_last_position(self, info_hash): + return self._last_pos.get(info_hash) or 0 diff --git a/resources/btclient/common.py b/resources/btclient/common.py new file mode 100644 index 0000000..44e17f0 --- /dev/null +++ b/resources/btclient/common.py @@ -0,0 +1,477 @@ +''' +Created on May 3, 2015 + +@author: ivan +''' +import os +from collections import deque +import logging +from threading import Lock, Event, Thread +import copy +import threading +import traceback +import shutil +import urlparse +from StringIO import StringIO + +from hachoir_metadata import extractMetadata +from hachoir_parser import guessParser +import hachoir_core.config as hachoir_config +from hachoir_core.stream.input import InputIOStream +from opensubtitle import OpenSubtitles +from cachebt import CacheBT + +logger = logging.getLogger('common') +hachoir_config.quiet = True + + +def enum(**enums): + return type('Enum', (), enums) + + +TerminalColor = enum(default='\033[39m', green='\033[32m', red='\033[31m', yellow='\033[33m') + + +def get_duration(fn): + # We need to provide just begining of file otherwise hachoir might try to read all file + with open(fn, 'rb') as f: + s = StringIO(f.read(1024 * 64)) + p = guessParser(InputIOStream(s, filename=unicode(fn), tags=[])) + m = extractMetadata(p) + if m: + return m.getItem('duration', 0) and m.getItem('duration', 0).value + + +def debug_fn(fn): + def _fn(*args, **kwargs): + print "Entering %s, thread %s" % (fn.__name__, threading.current_thread().name) + traceback.print_stack() + ret = fn(*args, **kwargs) + print "Leaving %s, thread %s" % (fn.__name__, threading.current_thread().name) + return ret + + return _fn + + +class Hasher(Thread): + def __init__(self, btfile, hash_cb): + Thread.__init__(self, name="Hasher") + if btfile is None: + raise ValueError('BTFile is None!') + self._btfile = btfile + self._hash_cb = hash_cb + self.hash = None + self.daemon = True + self.start() + + def run(self): + with self._btfile.create_cursor() as c: + filehash = OpenSubtitles.hash_file(c, self._btfile.size) + self.hash = filehash + self._hash_cb(filehash) + + +class BaseMonitor(Thread): + def __init__(self, client, name): + Thread.__init__(self, name=name) + self.daemon = True + self._listeners = [] + self._lock = Lock() + self._wait_event = Event() + self._running = True + self._client = client + self._ses = None + + def add_to_ctx(self, key, val): + self._ctx[key] = val + + def stop(self): + self._running = False + self._wait_event.set() + + def add_listener(self, cb): + with self._lock: + if not cb in self._listeners: + self._listeners.append(cb) + + def remove_listener(self, cb): + with self._lock: + try: + self._listeners.remove(cb) + except ValueError: + pass + + def remove_all_listeners(self): + with self._lock: + self._listeners = [] + + +class BaseClient(object): + class Monitor(BaseMonitor): + def __init__(self, client): + super(BaseClient.Monitor, self).__init__(client, name="Status Monitor") + self._client = client + + def run(self): + + while (self._running): + s = self._client.status + with self._lock: + for cb in self._listeners: + cb(s, client=self._client) + self._wait_event.wait(1.0) + + def __init__(self, path_to_store, args=None): + self._base_path = path_to_store + self._ready = False + self._file = None + self._on_ready_action = None + self._monitor = BaseClient.Monitor(self) + if not args or not args.quiet: + self.add_monitor_listener(self.print_status) + self._delete_on_close = True if args and args.delete_on_finish else False + + def _on_file_ready(self, filehash): + self._file.filehash = filehash + self._ready = True + if self._on_ready_action: + self._on_ready_action(self._file, self.is_file_complete) + + @property + def status(self): + raise NotImplementedError() + + def get_normalized_status(self): + s = self.status + return {'source_type': 'base', + 'state': s.state, + 'downloaded': s.downloaded, + 'total_size': s.total_size, + 'download_rate': s.download_rate, + 'desired_rate': s.desired_rate, + 'progress': s.progress, + 'piece_size': self._file.piece_size if self._file else 0 + } + + @property + def file(self): + return self._file + + def set_on_file_ready(self, action): + self._on_ready_action = action + + @property + def is_file_ready(self): + return self._ready + + def print_status(self, s, client): + raise NotImplementedError() + + @property + def is_file_complete(self): + raise NotImplementedError() + + def start_url(self, uri): + raise NotImplementedError() + + def close(self): + if self._cache: + self._cache.close() + if self._delete_on_close and self._file: + self._file.remove() + + @property + def unique_file_id(self): + raise NotImplementedError() + + def update_play_time(self, playtime): + self._cache.play_position(self.unique_file_id, playtime) + + @property + def last_play_time(self): + return self._cache.get_last_position(self.unique_file_id) + + def add_monitor_listener(self, cb): + self._monitor.add_listener(cb) + + def remove_monitor_listener(self, cb): + self._monitor.remove_listener(cb) + + def stop(self): + self._monitor.stop() + self._monitor.join() + + +class PieceCache(object): + TIMEOUT = 30 + size = 5 + + def __init__(self, btfile): + # self._btfile=btfile + self._cache = [None] * self.size + self._lock = Lock() + self._event = Event() + self._cache_first = btfile.first_piece + self._piece_size = btfile.piece_size + self._map_offset = btfile.map_piece + self._file_size = btfile.size + self._last_piece = btfile.last_piece + self._request_piece = btfile.prioritize_piece + self._btfile = btfile + + def clone(self): + c = PieceCache(self._btfile) + with self._lock: + c._cache = copy.copy(self._cache) + c._cache_first = self._cache_first + return c + + @property + def cached_piece(self): + self._cache_first + + def fill_cache(self, first): + to_request = [] + with self._lock: + diff = first - self._cache_first + if diff > 0: + for i in xrange(self.size): + if i + diff < self.size: + self._cache[i] = self._cache[i + diff] + else: + self._cache[i] = None + + elif diff < 0: + for i in xrange(self.size - 1, -1, -1): + if i + diff >= 0: + self._cache[i] = self._cache[i + diff] + else: + self._cache[i] = None + + self._cache_first = first + self._event.clear() + for i in xrange(self.size): + if self._cache[i] is None and (self._cache_first + i) <= self._last_piece: + to_request.append((self._cache_first + i, i)) + for args in to_request: + self._request_piece(*args) + + def add_piece(self, n, data): + with self._lock: + i = n - self._cache_first + if i >= 0 and i < self.size: + self._cache[i] = data + if i == 0: + self._event.set() + + def has_piece(self, n): + with self._lock: + i = n - self._cache_first + if i >= 0 and i < self.size: + return not (self._cache[i] is None) + + def _wait_piece(self, pc_no): + while not self.has_piece(pc_no): + self.fill_cache(pc_no) + # self._event.clear() + logger.debug('Waiting for piece %d' % pc_no) + self._event.wait(self.TIMEOUT) + + def _get_piece(self, n): + with self._lock: + i = n - self._cache_first + if i < 0 or i > self.size: + raise ValueError('index of of scope of current cache') + return self._cache[i] + + def get_piece(self, n): + self._wait_piece(n) + return self._get_piece(n) + + def read(self, offset, size): + size = min(size, self._file_size - offset) + if not size: + return + + pc_no, ofs = self._map_offset(offset) + data = self.get_piece(pc_no) + pc_size = self._piece_size - ofs + if pc_size > size: + return data[ofs: ofs + size] + else: + pieces = [data[ofs:self._piece_size]] + remains = size - pc_size + new_head = pc_no + 1 + while remains and self.has_piece(new_head): + sz = min(remains, self._piece_size) + data = self.get_piece(new_head) + pieces.append(data[:sz]) + remains -= sz + if remains: + new_head += 1 + self.fill_cache(new_head) + return ''.join(pieces) + + +class BTCursor(object): + def __init__(self, btfile): + self._btfile = btfile + self._pos = 0 + self._cache = PieceCache(btfile) + + def clone(self): + c = BTCursor(self._btfile) + c._cache = self._cache.clone() + return c + + def close(self): + self._btfile.remove_cursor(self) + + def read(self, n=None): + sz = self._btfile.size - self._pos + if not n: + n = sz + else: + n = min(n, sz) + res = self._cache.read(self._pos, n) + if res: + self._pos += len(res) + return res + + def seek(self, n): + if n > self._btfile.size: + n = self._btfile.size + # raise ValueError('Seeking beyond file size') + elif n < 0: + raise ValueError('Seeking negative') + self._pos = n + + def tell(self): + return self._pos + + def update_piece(self, n, data): + self._cache.add_piece(n, data) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + +class AbstractFile(object): + def __init__(self, path, base, size, piece_size): + self._base = base + self.size = size + self.path = path + self.piece_size = piece_size + self.offset = 0 + self._full_path = os.path.join(base, path) + self._cursors = [] + self._cursors_history = deque(maxlen=3) + self._lock = Lock() + self.first_piece = 0 + self.last_piece = self.first_piece + (max(size - 1, 0)) // piece_size + + self._rate = None + self._piece_duration = None + + def add_cursor(self, c): + with self._lock: + self._cursors.append(c) + + def remove_cursor(self, c): + with self._lock: + self._cursors.remove(c) + self._cursors_history.appendleft(c) + + def create_cursor(self, offset=None): + c = None + if offset is not None: + with self._lock: + for e in reversed(self._cursors): + if abs(e.tell() - offset) < self.piece_size: + c = e.clone() + logger.debug('Cloning existing cursor') + break + if not c: + with self._lock: + for e in reversed(self._cursors_history): + if abs(e.tell() - offset) < self.piece_size: + c = e + logger.debug('Reusing previous cursor') + if not c: + c = BTCursor(self) + self.add_cursor(c) + if offset: + c.seek(offset) + return c + + def map_piece(self, ofs): + return self.first_piece + (ofs + self.offset) // self.piece_size, \ + (ofs + self.offset) % self.piece_size + + def prioritize_piece(self, piece, idx): + raise NotImplementedError() + + @property + def full_path(self): + return self._full_path + + def close(self): + pass + + def remove(self): + dirs = self.path.split(os.sep) + if len(dirs) > 1: + shutil.rmtree(os.path.join(self._base, dirs[0]), ignore_errors=True) + else: + os.unlink(self._full_path) + + def update_piece(self, n, data): + for c in self._cursors: + c.update_piece(n, data) + + @property + def duration(self): + if not hasattr(self, '_duration'): + self._duration = get_duration(self._full_path) if os.path.exists(self._full_path) else 0 + return self._duration or 0 + + @property + def piece_duration_ms(self): + if not self._piece_duration: + if self.byte_rate: + self._piece_duration = self.piece_size / self.byte_rate / 1000 + + return self._piece_duration + + @property + def byte_rate(self): + if not self._rate: + d = self.duration + if d: + self._rate = self.size / d.total_seconds() + return self._rate + + def __str__(self): + return self.path + + +class Resolver(object): + URL_PATTERN = None + SPEED_LIMIT = None # kB/s + THREADS = 4 + + def __init__(self, loader): + self._client = loader + + def resolve(self, url): + return url + + @staticmethod + def url_to_file(uri): + path = urlparse.urlsplit(uri)[2] + if path.startswith('/'): + path = path[1:] + return path diff --git a/resources/btclient/htclient.py b/resources/btclient/htclient.py new file mode 100644 index 0000000..e88b9be --- /dev/null +++ b/resources/btclient/htclient.py @@ -0,0 +1,503 @@ +''' +Created on May 3, 2015 + +@author: ivan +''' + +import urllib2 +import os.path +import pickle +import logging +from cookielib import CookieJar +from collections import namedtuple +import random +from httplib import BadStatusLine, IncompleteRead +import socket +import time +import re +import Queue +import collections +from threading import Lock, Thread +import sys +import threading +from urllib import urlencode +import zlib +from io import BytesIO +import gzip +import json +from collections import deque + +from common import AbstractFile, BaseClient, Hasher, Resolver, TerminalColor +from bs4 import BeautifulSoup + +logger = logging.getLogger('htclient') + +Piece = namedtuple('Piece', ['piece', 'data', 'total_size', 'type']) + + +class HTTPLoader(object): + UA_STRING = ['Mozilla/5.0 (Windows NT 6.3; rv:36.0) Gecko/20100101 Firefox/36.0', + 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.0 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A'] + + PARSER = 'lxml' # 'html5lib'#'html.parser'#'lxml' + + class Error(Exception): + pass + + def __init__(self, url, id, resolver_class=None): + self.id = id + self.user_agent = self._choose_ua() + resolver_class = resolver_class or Resolver + self._client = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar())) + self.url = self.resolve_file_url(resolver_class, url) + if not self.url: + raise HTTPLoader.Error('Urlwas not resolved to file link') + + def resolve_file_url(self, resolver_class, url): + r = resolver_class(self) + return r.resolve(url) + + def _choose_ua(self): + return HTTPLoader.UA_STRING[random.randint(0, len(HTTPLoader.UA_STRING) - 1)] + + RANGE_RE = re.compile(r'bytes\s+(\d+)-(\d+)/(\d+)') + + def _parse_range(self, r): + m = HTTPLoader.RANGE_RE.match(r) + if not m: + raise HTTPLoader.Error('Invalid range header') + return int(m.group(1)), int(m.group(2)), int(m.group(3)) + + def open(self, url, data=None, headers={}, method='get'): + hdr = {'User-Agent': self.user_agent} + hdr.update(headers) + url, post_args = self._encode_data(url, data, method) + req = urllib2.Request(url, post_args, headers=headers) + res = None + retries = 5 + while retries: + try: + res = self._client.open(req, timeout=10) + break + except (IOError, urllib2.HTTPError, BadStatusLine, IncompleteRead, socket.timeout) as e: + if isinstance(e, urllib2.HTTPError) and hasattr(e, 'code') and str(e.code) == '404': + raise HTTPLoader.Error('Url %s not found', url) + + logging.warn('Retry on (%s) due to IO or HTTPError (%s) ', threading.current_thread().name, e) + retries -= 1 + time.sleep(1) + if not res: + raise HTTPLoader.Error('Cannot open resource %s' % url) + return res + + def load_piece(self, piece_no, piece_size): + start = piece_no * piece_size + headers = {'Range': 'bytes=%d-' % start} + res = self.open(self.url, headers=headers) + allow_range_header = res.info().getheader('Accept-Ranges') + if allow_range_header and allow_range_header.lower() == 'none': + raise HTTPLoader.Error('Ranges are not supported') + + size_header = res.info().getheader('Content-Length') + total_size = int(size_header) if size_header else None + + range_header = res.info().getheader('Content-Range') + if not range_header: + if piece_no and not total_size: + raise HTTPLoader.Error('Ranges are not supported') + else: + from_pos, to_pos, size = 0, total_size - 1, total_size + else: + from_pos, to_pos, size = self._parse_range(range_header) + + type_header = res.info().getheader('Content-Type') + + if not type_header: + raise HTTPLoader.Error('Content Type is missing') + + data = res.read(piece_size) + + res.close() + return Piece(piece_no, data, size, type_header) + + @staticmethod + def decode_data(res): + header = res.info() + data = res.read() + if header.get('Content-Encoding') == 'gzip': + tmp_stream = gzip.GzipFile(fileobj=BytesIO(data)) + data = tmp_stream.read() + + elif header.get('Content-Encoding') == 'deflate': + data = zlib.decompress(data) + return data + + def _encode_data(self, url, data, method='post'): + if not data: + return url, None + if method.lower() == 'post': + return url, urlencode(data) + else: + return url + '?' + urlencode(data), None + + def load_page(self, url, data=None, method='get'): + res = self.open(url, data, headers={'Accept-Encoding': "gzip, deflate"}, method=method) + # Content-Type:"text/html; charset=utf-8" + type_header = res.info().getheader('Content-Type') + if not type_header.startswith('text/html'): + raise HTTPLoader.Error("%s is not HTML page" % url) + # Content-Encoding:"gzip" + data = HTTPLoader.decode_data(res) + pg = BeautifulSoup(data, HTTPLoader.PARSER) + return pg + + def load_json(self, url, data, method='get'): + res = self.open(url, data, headers={'Accept-Encoding': "gzip, deflate", 'X-Requested-With': 'XMLHttpRequest'}, + method=method) + type_header = res.info().getheader('Content-Type') + if not type_header.startswith('application/json'): + raise HTTPLoader.Error("%s is not JSON" % url) + # Content-Encoding:"gzip" + data = HTTPLoader.decode_data(res) + return json.loads(data, encoding='utf8') + + def get_redirect(self, url, data, method='get'): + pass + + +class PriorityQueue(Queue.PriorityQueue): + NORMAL_PRIORITY = 99 + NO_DOWNLOAD = 999 + NO_PIECE = -1 + + def __init__(self): + Queue.PriorityQueue.__init__(self, maxsize=0) + self._wset = {} + self._lock = Lock() + + def put_piece(self, piece, priority, remove_previous=False): + # do not put piece which is already in queue + with self._lock: + if piece in self._wset: + if self._wset[piece][0] == priority: + return + else: + self.remove_piece(piece) + if remove_previous: + for k in self._wset: + if k < piece: + self.remove_piece(k) + + entry = [priority, piece] + self._wset[piece] = entry + self.put(entry, block=False) + + def remove_piece(self, piece): + entry = self._wset.pop(piece) + entry[-1] = PriorityQueue.NO_PIECE + + def get_piece(self, timeout=None): + while True: + _priority, piece = self.get(timeout=timeout) + with self._lock: + if piece is not PriorityQueue.NO_PIECE: + del self._wset[piece] + return piece + + +class Pool(object): + def __init__(self, piece_size, loaders, cb, speed_limit=None): + self.piece_size = piece_size + self._cb = cb + self.speed_limit = speed_limit + self._queue = PriorityQueue() + self._running = True + self._threads = [Thread(name="worker %d" % i, target=self.work, args=[l]) for i, l in enumerate(loaders)] + for t in self._threads: + t.daemon = True + t.start() + + def add_worker_async(self, id, gen_fn, args=[], kwargs={}): + def resolve(): + l = gen_fn(*args, **kwargs) + t = Thread(name="worker %d" % id, target=self.work, args=[l]) + t.daemon = True + t.start() + self._threads.append(t) + + adder = Thread(name="Adder", target=resolve) + adder.daemon = True + adder.start() + + def work(self, loader): + while self._running: + pc = self._queue.get_piece() + if not self._running: + break + try: + start = time.time() + p = loader.load_piece(pc, self.piece_size) + self._cb(p.piece, p.data) + if self.speed_limit: + dur = time.time() - start + expected = self.piece_size / 1024.0 / self.speed_limit + wait_time = expected - dur + if wait_time > 0: + logger.debug('Waiting %f on %s', wait_time, threading.current_thread().name) + time.sleep(wait_time) + except Exception, e: + logger.exception('(%s) Error when loading piece %d: %s', threading.current_thread().name, pc, e) + + def stop(self): + self._running = False + # push some dummy tasks to assure workers ends + for i in xrange(len(self._threads)): + self._queue.put_piece(i, PriorityQueue.NO_DOWNLOAD) + + def add_piece(self, piece, priority=PriorityQueue.NORMAL_PRIORITY, remove_previous=False): + self._queue.put_piece(piece, priority, remove_previous) + + +class HTClient(BaseClient): + def __init__(self, path_to_store, args=None, piece_size=2 * 1024 * 1024, no_threads=2, resolver_class=None): + BaseClient.__init__(self, path_to_store, args=args) + self._pool = None + self.resolver_class = resolver_class + self._no_threads = self.resolver_class.THREADS if self.resolver_class and hasattr(self.resolver_class, + 'THREADS') else no_threads + self.piece_size = piece_size + self._last_downloaded = deque(maxlen=60) + + def update_piece(self, piece, data): + self._file.update_piece(piece, data) + if not self._ready and hasattr(self._file, 'filehash') and self._file.filehash \ + and all(self._file.pieces[:5]): + self._set_ready(self._file.is_complete) + + def request_piece(self, piece, priority): + if self._pool: + self._pool.add_piece(piece, priority) + else: + raise Exception('Pool not started') + + def _on_file_ready(self, filehash): + self._file.filehash = filehash + if self.is_file_complete: + self._set_ready(True) + + def _set_ready(self, complete): + self._ready = True + if self._on_ready_action: + self._on_ready_action(self._file, complete) + + def start_url(self, uri): + self._monitor.start() + path = self.resolver_class.url_to_file(uri) + c0 = None + try: + self._file = HTFile(path, self._base_path, 0, self.piece_size, self.request_piece) + except HTFile.UnknownSize: + c0 = HTTPLoader(uri, 0, self.resolver_class) + p = c0.load_piece(0, self.piece_size) + self._file = HTFile(path, self._base_path, p.total_size, self.piece_size, self.request_piece) + self.update_piece(0, p.data) + self._file.mime = p.type + + if not self._file.is_complete: + c0 = c0 or HTTPLoader(uri, 0, self.resolver_class) + self._pool = Pool(self.piece_size, [c0], + self.update_piece, + speed_limit=self.resolver_class.SPEED_LIMIT if hasattr(self.resolver_class, + 'SPEED_LIMIT') else None) + + def gen_loader(i): + return HTTPLoader(uri, i, self.resolver_class) + + for i in xrange(1, self._no_threads): + self._pool.add_worker_async(i, gen_loader, (i,)) + # get remaining pieces with normal priority + for i in xrange(1, self._file.last_piece + 1): + if not self._file.pieces[i]: + self._pool.add_piece(i) + + self.hash = Hasher(self._file, self._on_file_ready) + + @property + def is_file_complete(self): + return self.file.is_complete if self.file else False + + @property + def unique_file_id(self): + return self.file.filehash + + Status = collections.namedtuple('Status', + ['state', 'downloaded', 'download_rate', 'total_size', 'threads', 'desired_rate']) + + @property + def status(self): + tick = time.time() + + state = 'starting' if not self.file else 'finished' if self.is_file_complete else 'downloading' + downloaded = self.file.downloaded if self.file else 0 + if self._last_downloaded: + prev_time, prev_down = self._last_downloaded[0] + download_rate = (downloaded - prev_down) / (tick - prev_time) + else: + download_rate = 0 + total_size = self.file.size if self.file else 0 + threads = self._no_threads + desired_rate = self._file.byte_rate if self._file else 0 + + if self.file: + self._last_downloaded.append((tick, downloaded)) + return HTClient.Status(state, downloaded, download_rate, total_size, threads, desired_rate) + + def get_normalized_status(self): + s = self.status + return {'source_type': 'http', + 'state': s.state, + 'downloaded': s.downloaded, + 'total_size': s.total_size, + 'download_rate': s.download_rate, + 'desired_rate': s.desired_rate, + 'piece_size': self._file.piece_size if self._file else 0, + 'progress': s.downloaded / float(s.total_size) if s.total_size else 0, + # HTTP specific + 'threads': s.threads + } + + def close(self): + if self._file: + self._file.close() + BaseClient.close(self) + + def print_status(self, s, client): + progress = s.downloaded / float(s.total_size) * 100 if s.total_size else 0 + total_size = s.total_size / 1048576.0 + + color = '' + if progress >= 100.0 or not s.desired_rate or s.state in ('finished', 'starting'): + color = TerminalColor.default + elif s.desired_rate > s.download_rate: + color = TerminalColor.red + elif s.download_rate > s.desired_rate and s.download_rate < s.desired_rate * 1.2: + color = TerminalColor.yellow + else: + color = TerminalColor.green + + print '\r%.2f%% (of %.1fMB) down: %s%.1f kB/s\033[39m(need %.1f) %s' % \ + (progress, total_size, + color, s.download_rate / 1000.0, s.desired_rate / 1000.0 if s.desired_rate else 0.0, + s.state), + sys.stdout.write("\033[K") + sys.stdout.flush() + + +class HTFile(AbstractFile): + def __init__(self, path, base, size, piece_size=2097152, prioritize_fn=None): + self._full_path = os.path.join(base, path) + self._prioritize_fn = prioritize_fn + self.pieces = None + self.mime = None + size = self._load_cached(size) + AbstractFile.__init__(self, path, base, size, piece_size) + if not self.pieces or len(self.pieces) != self.last_piece + 1: + self.pieces = [False for _i in xrange(self.last_piece + 1)] + self._file = open(self.full_path, 'r+b') + + class UnknownSize(Exception): + pass + + def _load_cached(self, size): + if not os.path.exists(self.full_path) and size: + self._allocate_file(size) + return size + elif os.path.exists(self.full_path): + sz = os.stat(self.full_path).st_size + if size and size != sz: + logger.warn('Invalid cached file') + self._allocate_file(size) + return size + pcs_file = self.pieces_index_file + if os.access(pcs_file, os.R_OK): + with open(pcs_file, 'rb') as f: + pcs = pickle.load(f) + if isinstance(pcs, tuple): + self.pieces = pcs[1] + self.mime = pcs[0] + else: + logger.warn('Invalid pieces file %s', pcs_file) + return sz + else: + raise HTFile.UnknownSize() + + @property + def pieces_index_file(self): + return self.full_path + '.pieces' + + def _allocate_file(self, size): + path = os.path.split(self.full_path)[0] + if not os.path.exists(path): + os.makedirs(path, mode=0755) + # sparecelly allocate file + with open(self.full_path, 'ab') as f: + f.truncate(size) + + def update_piece(self, n, data): + assert n != self.last_piece and len(data) == self.piece_size or \ + n == self.last_piece and len(data) == ( + self.size % self.piece_size) or self.piece_size, "Invalid size of piece %d - %d" % (n, len(data)) + assert n >= self.first_piece and n <= self.last_piece, 'Invalid piece no %d' % n + + with self._lock: + if not self.pieces[n]: + self._file.seek(n * self.piece_size) + self._file.write(data) + self._file.flush() + self.pieces[n] = True + logger.debug('Received piece %d in thread %s', n, threading.current_thread().name) + AbstractFile.update_piece(self, n, data) + + def prioritize_piece(self, piece, idx): + assert piece >= self.first_piece and piece <= self.last_piece, 'Invalid piece no %d' % piece + data = None + with self._lock: + if self.pieces[piece]: + sz = self.piece_size if piece < self.last_piece else self.size % self.piece_size + self._file.seek(piece * self.piece_size) + data = self._file.read(sz) + assert len(data) == sz + if data: + AbstractFile.update_piece(self, piece, data) + elif self._prioritize_fn: + self._prioritize_fn(piece, idx) + else: + assert False, 'Missing prioritize fn' + + @property + def is_complete(self): + return all(self.pieces) + + @property + def downloaded(self): + sum = 0 + for i, p in enumerate(self.pieces): + if p and i == self.last_piece: + sum += (self.size % self.piece_size) or self.piece_size + elif p: + sum += self.piece_size + + return sum + + def remove(self): + AbstractFile.remove(self) + if os.path.exists(self.pieces_index_file): + os.unlink(self.pieces_index_file) + + def close(self): + self._file.close() + d = os.path.split(self.pieces_index_file)[0] + if d and os.path.isdir(d): + with open(self.pieces_index_file, 'wb') as f: + pickle.dump((self.mime, self.pieces), f) diff --git a/resources/btclient/opensubtitle.py b/resources/btclient/opensubtitle.py new file mode 100644 index 0000000..9d30aed --- /dev/null +++ b/resources/btclient/opensubtitle.py @@ -0,0 +1,263 @@ +''' +Created on Apr 2, 2015 + +@author: ivan +''' +import xmlrpclib +import urllib2 +import os.path +import gzip +from StringIO import StringIO +import logging +import subprocess +import struct +import time + +from _version import __version__ + +logger = logging.getLogger('opensubtitles') + + +class Urllib2Transport(xmlrpclib.Transport): + def __init__(self, opener=None, https=False, use_datetime=0): + xmlrpclib.Transport.__init__(self, use_datetime) + self.opener = opener or urllib2.build_opener() + self.https = https + + def request(self, host, handler, request_body, verbose=0): + proto = ('http', 'https')[bool(self.https)] + req = urllib2.Request('%s://%s%s' % (proto, host, handler), request_body) + req.add_header('User-agent', self.user_agent) + self.verbose = verbose + return self.parse_response(self.opener.open(req)) + + +class OpenSubtitles(object): + USER_AGENT = 'BTClient v%s' % __version__ + + def __init__(self, lang, user='', pwd=''): + self._lang = lang + self._proxy = xmlrpclib.ServerProxy('http://api.opensubtitles.org/xml-rpc', + Urllib2Transport(use_datetime=True), + allow_none=True, use_datetime=True) + self._token = None + self._user = user + self._pwd = pwd + + def login(self): + res = self._proxy.LogIn(self._user, self._pwd, 'en', self.USER_AGENT) + self._parse_status(res) + token = res.get('token') + if token: + self._token = token + else: + raise xmlrpclib.Fault('NO_TOKEN', 'No token!') + + def _parse_status(self, res): + if res.has_key('status'): + code = res['status'].split()[0] + if code != '200': + raise xmlrpclib.Fault('ERROR_CODE_RETURENED', 'Returned error status: %s (%s)' % (code, res)) + return True + else: + raise xmlrpclib.Fault('NO_STATUS', 'No status!') + + def search(self, filename, filesize=None, filehash=None, limit=20): + filename = os.path.split(filename)[1] + name = os.path.splitext(filename)[0] + query = [] + if filehash and filesize: + query.append({'sublanguageid': self._lang, 'moviehash': filehash, 'moviebytesize': str(filesize)}) + query.append({'sublanguageid': self._lang, 'tag': filename}) + query.append({'sublanguageid': self._lang, 'query': name}) + res = self._proxy.SearchSubtitles(self._token, query, {'limit': limit}) + self._parse_status(res) + data = res.get('data') + + return data if data else [] + + @staticmethod + def _sub_file(filename, lang, ext): + lang = lang.lower() + path, fname = os.path.split(filename) + fname = os.path.splitext(fname)[0] + return os.path.join(path, fname + '.' + lang + '.' + ext) + + @staticmethod + def _base_name(filename): + fname = os.path.split(filename)[1] + return os.path.splitext(fname)[0] + + @staticmethod + def hash_file(f, filesize): + + longlongformat = ' 0 and not overwrite: + logger.debug('subs %s are already downloaded', sfile) + return sfile + else: + while True: + try: + with OpenSubtitles(lang) as opensub: + res = opensub.download(filename, filesize, filehash, can_choose) + if res: + logger.debug('Subtitles %s downloaded', res) + return res + else: + logger.debug('No subtitles found for file %s in language %s', filename, lang) + return + except urllib2.HTTPError, e: + retries -= 1 + if retries <= 0: + raise e + logger.debug('Retrying to load subtitles due to HTTP error %d, remains %d attempts', e.code, + retries) + time.sleep(1) + + def download(self, filename, filesize=None, filehash=None, can_choose=True): + data = self.search(filename, filesize, filehash) + if not data: + return None + media_file = OpenSubtitles._base_name(filename).lower() + + def same_name(b): + return media_file == OpenSubtitles._base_name(b['SubFileName']).lower() + + if filehash and filesize: + match = filter(lambda x: x.get('QueryNumber', 0) == 0, data) + logger.debug('Got results by filehash') + else: + match = filter(same_name, data) + if match and can_choose != 'always': + sub = match[0] + link = sub['SubDownloadLink'] + ext = sub['SubFormat'] + logger.debug('Find exact match for media file, subtitle is %s', sub['SubFileName']) + elif can_choose: + link = self.choose(data) + ext = 'srt' + else: + sub = data[0] + link = sub['SubDownloadLink'] + ext = sub['SubFormat'] + if link: + return self.download_link(filename, link, ext) + + def download_link(self, filename, link, ext): + out_file = OpenSubtitles._sub_file(filename, self._lang, ext) + res = urllib2.urlopen(link, timeout=10) + data = StringIO(res.read()) + data.seek(0) + res.close() + z = gzip.GzipFile(fileobj=data) + with open(out_file, 'wb') as f: + while True: + d = z.read(1024) + if not d: + break + f.write(d) + z.close() + return out_file + + def logout(self): + try: + res = self._proxy.LogOut(self._token) + self._parse_status(res) + except urllib2.HTTPError: + logger.warn('Failed to logout') + + def __enter__(self): + self.login() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.logout() + + +def down(f, lang, overwrite=False): + filesize, filehash = calc_hash(f) + OpenSubtitles.download_if_not_exists(f, lang, filesize=filesize, + filehash=filehash, can_choose=True, overwrite=overwrite) + + +def calc_hash(f): + if not os.access(f, os.R_OK): + raise ValueError('Cannot read from file %s' % f) + filesize = os.stat(f).st_size + with open(f, 'rb') as fs: + filehash = OpenSubtitles.hash_file(fs, filesize) + return filesize, filehash + + +def list_subs(f, lang): + import pprint + + filesize, filehash = calc_hash(f) + with OpenSubtitles(lang) as opensub: + res = opensub.search(f, filesize, filehash) + res = map(lambda x: {'SubFileName': x['SubFileName'], + 'SubDownloadsCnt': x['SubDownloadsCnt'], + 'QueryNumber': x.get('QueryNumber', 0), + }, + res) + pprint.pprint(res) + + +if __name__ == '__main__': + + from argparse import ArgumentParser + + p = ArgumentParser() + p.add_argument("video_file", help="video file") + p.add_argument("-d", "--download", action="store_true", help="Download subtitles for video files") + p.add_argument("-l", "--list", action="store_true", help="List available subtitles") + p.add_argument("--lang", default='eng', help="Language") + p.add_argument("--debug", action="store_true", help="Print debug messages") + p.add_argument("--overwrite", action="store_true", help="Overwrite existing subtitles ") + args = p.parse_args() + if args.debug: + logging.basicConfig(level=logging.DEBUG) + if args.download: + down(args.video_file, args.lang, args.overwrite) + else: + list_subs(args.video_file, args.lang) diff --git a/resources/utorrent/__init__.py b/resources/utorrent/__init__.py index e69de29..29440e1 100644 --- a/resources/utorrent/__init__.py +++ b/resources/utorrent/__init__.py @@ -0,0 +1,5 @@ +#-*- coding: utf-8 -*- +''' + Torrenter v2 plugin for XBMC/Kodi + Copyright (C) 2015 DiMartino +''' \ No newline at end of file