diff --git a/BTClientPlayer.py b/BTClientPlayer.py index c3f3957..7e5072c 100644 --- a/BTClientPlayer.py +++ b/BTClientPlayer.py @@ -109,7 +109,7 @@ class BTClientPlayer(xbmc.Player): bt_upload_limit=self.upload_limit, choose_subtitles=False, clear_older=0, - debug_log='',#os.path.join(self.userStorageDirectory, 'log.txt'), + debug_log=os.path.join(self.userStorageDirectory, 'log.txt'), delete_on_finish=False, directory=self.userStorageDirectory, listen_port_max=6891,# @@ -122,7 +122,7 @@ class BTClientPlayer(xbmc.Player): stdin=False, stream=True, subtitles=None, - trace=False, + trace=True, content_id=self.contentId, url=self.torrentUrl) args=main(args) #config @@ -226,16 +226,16 @@ class BTClientPlayer(xbmc.Player): def buffer(self): iterator = 0 progressBar = xbmcgui.DialogProgress() - progressBar.create(self.localize('Please Wait') + str(' [%s]' % str(self.lt.version)), + progressBar.create('%s [%s]' % (self.__class__.__name__,str(self.lt.version)), self.localize('Seeds searching.')) - while iterator < 100:# not self.c.is_file_ready + while not self.c.is_file_ready:#iterator < 100: iterator = 0 ready_list=[] status = self.c.get_normalized_status() - conditions=[status['state'] in ['downloading', 'finished', 'seeding'], status['desired_rate'] > 0 or status['progress'] > 0.02, - status['progress'] > 0.01, self.c.is_file_ready or status['progress'] > 0.02, - status['desired_rate'] > 0 or status['progress'] > 0.02 and (status['download_rate'] > status['desired_rate'] or - status['download_rate'] * status['progress'] * 100 > status['desired_rate'])] + conditions=[status['state'] in ['downloading', 'finished', 'seeding'], status['desired_rate'] > 0 or status['progress'] > 0.01, + status['progress'] > 0.005, self.c.is_file_ready,# or status['progress'] > 0.02 + (status['download_rate'] > status['desired_rate'] or + status['download_rate'] * int(status['progress'] * 100) > status['desired_rate'])] for cond in conditions: if cond: ready_list.append(True) @@ -243,18 +243,18 @@ class BTClientPlayer(xbmc.Player): else: ready_list.append(False) - speedsText = '%s: %s Mbit/s %s %s: %s Mbit/s' % (self.localize('Bitrate'), str(int(status['desired_rate'] * 8 / (1024 * 1024))) if status['desired_rate'] else 0, + speedsText = '%s: %s Mbit/s %s %s: %s Mbit/s' % (self.localize('Download speed'),str(status['download_rate'] * 8 / 1000000), '[COLOR=green]>[/COLOR]' if ready_list[4] else '[COLOR=red]<[/COLOR]', - self.localize('Download speed'),str(status['download_rate'] * 8 / 1000000)) + self.localize('Bitrate'), str(int(status['desired_rate'] * 8 / (1024 * 1024))) if status['desired_rate'] else 0,) if status['state'] in ['queued','checking','checking fastresume'] or (status['progress'] == 0 and status['num_pieces'] > 0): progressBar.update(iterator, self.localize('Checking preloaded files...'), speedsText, ' ') elif status['state'] in ['downloading', 'finished', 'seeding']: dialogText = self.localize('Preloaded: ') + '%s MB %s %s MB (%s MB)' % \ - (str(status['downloaded'] / 1024 / 1024), '[COLOR=green]>[/COLOR]' if ready_list[3] else '[COLOR=red]<[/COLOR]', str(status['total_size'] / 1024 / 1024 /100), str(status['total_size'] / 1024 / 1024)) - peersText = '[%s: %s; %s: %s] %s: %s' % (self.localize('Seeds'), str(status['seeds_connected']), self.localize('Peers'), - str(status['peers_connected']), self.localize('File ready: '), '[COLOR=green]YES[/COLOR]' if ready_list[2] else '[COLOR=red]NO[/COLOR]') + (str(status['downloaded'] / 1024 / 1024), '[COLOR=green]>[/COLOR]' if ready_list[2] else '[COLOR=red]<[/COLOR]', str(status['total_size'] / 1024 / 1024 /200), str(status['total_size'] / 1024 / 1024)) + peersText = '%s: %s [%s: %s; %s: %s]' % (self.localize('File ready: '), '[COLOR=green]YES[/COLOR]' if ready_list[3] else '[COLOR=red]NO[/COLOR]', + self.localize('Seeds'), str(status['seeds_connected']), self.localize('Peers'), str(status['peers_connected']),) progressBar.update(iterator, peersText, speedsText, dialogText, ) else: @@ -346,7 +346,7 @@ class BTClientPlayer(xbmc.Player): playlist = xbmc.PlayList(xbmc.PLAYLIST_VIDEO) playlist.clear() playlist.add(url, listitem) - xbmc.Player().play(playlist, listitem) + xbmc.Player().play(playlist) log("Serving file on %s" % url) return True @@ -370,18 +370,17 @@ class BTClientPlayer(xbmc.Player): f() log('[onPlayBackStopped]: '+(str(("video", "stop", self.display_name)))) - def onPlayBackSeek(self, x ,y): log('[onPlayBackSeek]: '+(str(("video", "seek", self.display_name)))) - xbmc.Player().pause() - if self.buffer(): - xbmc.Player().play() + #xbmc.Player().pause() + #if self.buffer(): + # xbmc.Player().play() def onPlayBackSeekChapter(self, x): log('[onPlayBackSeek]: '+(str(("video", "seek", self.display_name)))) - xbmc.Player().pause() - if self.buffer(): - xbmc.Player().play() + #xbmc.Player().pause() + #if self.buffer(): + # xbmc.Player().play() @contextmanager def attach(self, callback, *events): diff --git a/Libtorrent.py b/Libtorrent.py index eb7e7a9..61f21b8 100644 --- a/Libtorrent.py +++ b/Libtorrent.py @@ -163,7 +163,10 @@ class Libtorrent: } progressBar = xbmcgui.DialogProgress() progressBar.create(Localization.localize('Please Wait'), Localization.localize('Magnet-link is converting')) + #try: self.torrentHandle = self.session.add_torrent(magnetSettings) + #except: + # self.torrentHandle = self.lt.add_magnet_uri(self.session, self.magnetLink, magnetSettings) iterator = 0 while iterator < 100: xbmc.sleep(500) @@ -178,7 +181,11 @@ class Libtorrent: progressBar.update(0) progressBar.close() if self.torrentHandle.status().has_metadata: - return self.torrentHandle.torrent_file() + try: + info = self.torrentHandle.torrent_file() + except: + info = self.torrentHandle.get_torrent_info() + return info def magnetToTorrent(self, magnet): self.magnetLink = magnet diff --git a/resources/btclient/btclient.py b/resources/btclient/btclient.py index 7caecaa..1f0b839 100644 --- a/resources/btclient/btclient.py +++ b/resources/btclient/btclient.py @@ -21,7 +21,6 @@ import shutil from cachebt import CacheBT from common import AbstractFile, Hasher, BaseMonitor, BaseClient, Resolver -from htclient import HTClient logging.basicConfig() logger = logging.getLogger() @@ -74,7 +73,7 @@ class StreamServer(SocketServer.ThreadingMixIn, htserver.HTTPServer): def set_file(self, f): self.file = f - def serve(self, w=0.1): + def serve(self, w): while self._running: try: self.handle_request() @@ -83,8 +82,8 @@ class StreamServer(SocketServer.ThreadingMixIn, htserver.HTTPServer): print >> sys.stderr, str(e) def run(self): - self.timeout = 0.1 - t = Thread(target=self.serve, args=[0.1], name='HTTP Server') + self.timeout = 0.5 + t = Thread(target=self.serve, args=[self.timeout], name='HTTP Server') t.daemon = True t.start() @@ -107,7 +106,6 @@ class BTFileHandler(htserver.BaseHTTPRequestHandler): protocol_version = 'HTTP/1.1' def do_GET(self): - if self.do_HEAD(only_header=False): with self.server.file.create_cursor(self._offset) as f: send_something = False @@ -150,7 +148,7 @@ class BTFileHandler(htserver.BaseHTTPRequestHandler): range = None # @ReservedAssignment if self.server.allow_range: range = parse_range(self.headers.get('Range', None)) # @ReservedAssignment - if range: + if range not in [None, False]: self._offset = range range = (range, size - 1, size) # @ReservedAssignment logger.debug('Request range %s - (header is %s', range, self.headers.get('Range', None)) @@ -162,8 +160,8 @@ class BTFileHandler(htserver.BaseHTTPRequestHandler): self.send_error(404, 'Not Found') def send_resp_header(self, cont_type, cont_length, range=False, only_header=False): # @ReservedAssignment - # logger.debug('range is %s'% str(range)) - if self.server.allow_range and range: + logger.debug('range is %s'% str(range)) + if self.server.allow_range and range not in [None, False]: self.send_response(206, 'Partial Content') else: self.send_response(200, 'OK') @@ -175,13 +173,14 @@ class BTFileHandler(htserver.BaseHTTPRequestHandler): self.send_header('Accept-Ranges', 'bytes') else: self.send_header('Accept-Ranges', 'none') - if self.server.allow_range and range: + if self.server.allow_range and range not in [None, False]: if isinstance(range, (types.TupleType, types.ListType)) and len(range) == 3: self.send_header('Content-Range', 'bytes %d-%d/%d' % range) self.send_header('Content-Length', range[1] - range[0] + 1) else: raise ValueError('Invalid range value') else: + self.send_header('Content-Range', 'bytes %d-%d/%d' % (range, cont_length-1, cont_length)) self.send_header('Content-Length', cont_length) self._finish_header(only_header) @@ -203,7 +202,7 @@ class BTClient(BaseClient): self.lt=lt self._cache = CacheBT(path_to_store, self.lt) self._torrent_params = {'save_path': path_to_store, - #'storage_mode': self.lt.storage_mode_t.storage_mode_sparse + 'storage_mode': self.lt.storage_mode_t.storage_mode_sparse } if not state_file: state_file=os.path.join(path_to_store,'.btclient_state') @@ -232,9 +231,9 @@ class BTClient(BaseClient): self._hash = None self._url = None - #if args and args.debug_log and args.trace: - # self.add_monitor_listener(self.debug_download_queue) - # self.add_dispatcher_listener(self.debug_alerts) + if args and args.debug_log and args.trace: + self.add_monitor_listener(self.debug_download_queue) + self.add_dispatcher_listener(self.debug_alerts) @property def is_file_complete(self): @@ -247,7 +246,10 @@ class BTClient(BaseClient): def _check_ready(self, s, **kwargs): if s.state in [3, 4, 5] and not self._file and s.progress > 0: - self._meta_ready(self._th.torrent_file()) + try: + self._meta_ready(self._th.torrent_file()) + except: + self._meta_ready(self._th.get_torrent_info()) logger.debug('Got torrent metadata and start download') self.hash = True self.hash = Hasher(self._file, self._on_file_ready) @@ -277,7 +279,11 @@ class BTClient(BaseClient): (f.path, fmap.piece, fmap.start, fmap.length, meta.num_pieces(), meta.piece_length())) - self._cache.file_complete(self._th.torrent_file(), + try: + meta = self._th.torrent_file() + except: + meta=self._th.get_torrent_info() + self._cache.file_complete(meta, self._url if self._url and self._url.startswith('http') else None) def prioritize_piece(self, pc, idx): @@ -294,7 +300,10 @@ class BTClient(BaseClient): self._th.reset_piece_deadline(i) def prioritize_file(self): - meta = self._th.torrent_file() + try: + meta = self._th.torrent_file() + except: + meta=self._th.get_torrent_info() priorities = [1 if i >= self._file.first_piece and i <= self.file.last_piece else 0 \ for i in xrange(meta.num_pieces())] self._th.prioritize_pieces(priorities) @@ -316,7 +325,11 @@ class BTClient(BaseClient): @property def unique_file_id(self): - return str(self._th.torrent_file().info_hash()) + try: + meta = self._th.torrent_file() + except: + meta=self._th.get_torrent_info() + return str(meta.info_hash()) @property def pieces(self): @@ -378,8 +391,6 @@ class BTClient(BaseClient): self._th.set_sequential_download(True) time.sleep(1) self._th.force_dht_announce() - # if tp.has_key('ti'): - # self._meta_ready(self._th.torrent_file()) self._monitor.start() self._dispatcher.do_start(self._th, self._ses) @@ -652,7 +663,7 @@ def main(args=None): SPEED_LIMIT = 300 THREADS = 2 - stream(args, HTClient, TestResolver) + #stream(args, HTClient, TestResolver) else: #rclass = plugins.find_matching_plugin(args.url) #if rclass: diff --git a/resources/btclient/common.py b/resources/btclient/common.py index a770b0c..6cb10e0 100644 --- a/resources/btclient/common.py +++ b/resources/btclient/common.py @@ -6,7 +6,6 @@ Created on May 3, 2015 import os from collections import deque import logging -from threading import Lock, Event, Thread import copy import threading import traceback @@ -18,7 +17,6 @@ from hachoir_metadata import extractMetadata from hachoir_parser import guessParser import hachoir_core.config as hachoir_config from hachoir_core.stream.input import InputIOStream -#from opensubtitle import OpenSubtitles logger = logging.getLogger('common') hachoir_config.quiet = True @@ -52,9 +50,9 @@ def debug_fn(fn): return _fn -class Hasher(Thread): +class Hasher(threading.Thread): def __init__(self, btfile, hash_cb): - Thread.__init__(self, name="Hasher") + threading.Thread.__init__(self, name="Hasher") if btfile is None: raise ValueError('BTFile is None!') self._btfile = btfile @@ -64,7 +62,6 @@ class Hasher(Thread): self.start() def run(self): - pass with self._btfile.create_cursor() as c: filehash = OpenSubtitles.hash_file(c, self._btfile.size) self.hash = filehash @@ -106,20 +103,17 @@ class OpenSubtitles(object): returnedhash = "%016x" % hash return returnedhash -class BaseMonitor(Thread): +class BaseMonitor(threading.Thread): def __init__(self, client, name): - Thread.__init__(self, name=name) + threading.Thread.__init__(self, name=name) self.daemon = True self._listeners = [] - self._lock = Lock() - self._wait_event = Event() + self._lock = threading.Lock() + self._wait_event = threading.Event() self._running = True self._client = client self._ses = None - def add_to_ctx(self, key, val): - self._ctx[key] = val - def stop(self): self._running = False self._wait_event.set() @@ -148,7 +142,6 @@ class BaseClient(object): self._client = client def run(self): - while (self._running): s = self._client.status with self._lock: @@ -244,8 +237,8 @@ class PieceCache(object): def __init__(self, btfile): # self._btfile=btfile self._cache = [None] * self.size - self._lock = Lock() - self._event = Event() + self._lock = threading.Lock() + self._event = threading.Event() self._cache_first = btfile.first_piece self._piece_size = btfile.piece_size self._map_offset = btfile.map_piece @@ -400,7 +393,7 @@ class AbstractFile(object): self._full_path = os.path.join(base, path) self._cursors = [] self._cursors_history = deque(maxlen=3) - self._lock = Lock() + self._lock = threading.Lock() self.first_piece = 0 self.last_piece = self.first_piece + (max(size - 1, 0)) // piece_size diff --git a/resources/btclient/htclient.py b/resources/btclient/htclient.py deleted file mode 100644 index df50c14..0000000 --- a/resources/btclient/htclient.py +++ /dev/null @@ -1,503 +0,0 @@ -''' -Created on May 3, 2015 - -@author: ivan -''' - -import urllib2 -import os.path -import pickle -import logging -from cookielib import CookieJar -from collections import namedtuple -import random -from httplib import BadStatusLine, IncompleteRead -import socket -import time -import re -import Queue -import collections -from threading import Lock, Thread -import sys -import threading -from urllib import urlencode -import zlib -from io import BytesIO -import gzip -import json -from collections import deque - -from common import AbstractFile, BaseClient, Resolver, TerminalColor #Hasher -from bs4 import BeautifulSoup - -logger = logging.getLogger('htclient') - -Piece = namedtuple('Piece', ['piece', 'data', 'total_size', 'type']) - - -class HTTPLoader(object): - UA_STRING = ['Mozilla/5.0 (Windows NT 6.3; rv:36.0) Gecko/20100101 Firefox/36.0', - 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.0 Safari/537.36', - 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A'] - - PARSER = 'lxml' # 'html5lib'#'html.parser'#'lxml' - - class Error(Exception): - pass - - def __init__(self, url, id, resolver_class=None): - self.id = id - self.user_agent = self._choose_ua() - resolver_class = resolver_class or Resolver - self._client = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar())) - self.url = self.resolve_file_url(resolver_class, url) - if not self.url: - raise HTTPLoader.Error('Urlwas not resolved to file link') - - def resolve_file_url(self, resolver_class, url): - r = resolver_class(self) - return r.resolve(url) - - def _choose_ua(self): - return HTTPLoader.UA_STRING[random.randint(0, len(HTTPLoader.UA_STRING) - 1)] - - RANGE_RE = re.compile(r'bytes\s+(\d+)-(\d+)/(\d+)') - - def _parse_range(self, r): - m = HTTPLoader.RANGE_RE.match(r) - if not m: - raise HTTPLoader.Error('Invalid range header') - return int(m.group(1)), int(m.group(2)), int(m.group(3)) - - def open(self, url, data=None, headers={}, method='get'): - hdr = {'User-Agent': self.user_agent} - hdr.update(headers) - url, post_args = self._encode_data(url, data, method) - req = urllib2.Request(url, post_args, headers=headers) - res = None - retries = 5 - while retries: - try: - res = self._client.open(req, timeout=10) - break - except (IOError, urllib2.HTTPError, BadStatusLine, IncompleteRead, socket.timeout) as e: - if isinstance(e, urllib2.HTTPError) and hasattr(e, 'code') and str(e.code) == '404': - raise HTTPLoader.Error('Url %s not found', url) - - logging.warn('Retry on (%s) due to IO or HTTPError (%s) ', threading.current_thread().name, e) - retries -= 1 - time.sleep(1) - if not res: - raise HTTPLoader.Error('Cannot open resource %s' % url) - return res - - def load_piece(self, piece_no, piece_size): - start = piece_no * piece_size - headers = {'Range': 'bytes=%d-' % start} - res = self.open(self.url, headers=headers) - allow_range_header = res.info().getheader('Accept-Ranges') - if allow_range_header and allow_range_header.lower() == 'none': - raise HTTPLoader.Error('Ranges are not supported') - - size_header = res.info().getheader('Content-Length') - total_size = int(size_header) if size_header else None - - range_header = res.info().getheader('Content-Range') - if not range_header: - if piece_no and not total_size: - raise HTTPLoader.Error('Ranges are not supported') - else: - from_pos, to_pos, size = 0, total_size - 1, total_size - else: - from_pos, to_pos, size = self._parse_range(range_header) - - type_header = res.info().getheader('Content-Type') - - if not type_header: - raise HTTPLoader.Error('Content Type is missing') - - data = res.read(piece_size) - - res.close() - return Piece(piece_no, data, size, type_header) - - @staticmethod - def decode_data(res): - header = res.info() - data = res.read() - if header.get('Content-Encoding') == 'gzip': - tmp_stream = gzip.GzipFile(fileobj=BytesIO(data)) - data = tmp_stream.read() - - elif header.get('Content-Encoding') == 'deflate': - data = zlib.decompress(data) - return data - - def _encode_data(self, url, data, method='post'): - if not data: - return url, None - if method.lower() == 'post': - return url, urlencode(data) - else: - return url + '?' + urlencode(data), None - - def load_page(self, url, data=None, method='get'): - res = self.open(url, data, headers={'Accept-Encoding': "gzip, deflate"}, method=method) - # Content-Type:"text/html; charset=utf-8" - type_header = res.info().getheader('Content-Type') - if not type_header.startswith('text/html'): - raise HTTPLoader.Error("%s is not HTML page" % url) - # Content-Encoding:"gzip" - data = HTTPLoader.decode_data(res) - pg = BeautifulSoup(data, HTTPLoader.PARSER) - return pg - - def load_json(self, url, data, method='get'): - res = self.open(url, data, headers={'Accept-Encoding': "gzip, deflate", 'X-Requested-With': 'XMLHttpRequest'}, - method=method) - type_header = res.info().getheader('Content-Type') - if not type_header.startswith('application/json'): - raise HTTPLoader.Error("%s is not JSON" % url) - # Content-Encoding:"gzip" - data = HTTPLoader.decode_data(res) - return json.loads(data, encoding='utf8') - - def get_redirect(self, url, data, method='get'): - pass - - -class PriorityQueue(Queue.PriorityQueue): - NORMAL_PRIORITY = 99 - NO_DOWNLOAD = 999 - NO_PIECE = -1 - - def __init__(self): - Queue.PriorityQueue.__init__(self, maxsize=0) - self._wset = {} - self._lock = Lock() - - def put_piece(self, piece, priority, remove_previous=False): - # do not put piece which is already in queue - with self._lock: - if piece in self._wset: - if self._wset[piece][0] == priority: - return - else: - self.remove_piece(piece) - if remove_previous: - for k in self._wset: - if k < piece: - self.remove_piece(k) - - entry = [priority, piece] - self._wset[piece] = entry - self.put(entry, block=False) - - def remove_piece(self, piece): - entry = self._wset.pop(piece) - entry[-1] = PriorityQueue.NO_PIECE - - def get_piece(self, timeout=None): - while True: - _priority, piece = self.get(timeout=timeout) - with self._lock: - if piece is not PriorityQueue.NO_PIECE: - del self._wset[piece] - return piece - - -class Pool(object): - def __init__(self, piece_size, loaders, cb, speed_limit=None): - self.piece_size = piece_size - self._cb = cb - self.speed_limit = speed_limit - self._queue = PriorityQueue() - self._running = True - self._threads = [Thread(name="worker %d" % i, target=self.work, args=[l]) for i, l in enumerate(loaders)] - for t in self._threads: - t.daemon = True - t.start() - - def add_worker_async(self, id, gen_fn, args=[], kwargs={}): - def resolve(): - l = gen_fn(*args, **kwargs) - t = Thread(name="worker %d" % id, target=self.work, args=[l]) - t.daemon = True - t.start() - self._threads.append(t) - - adder = Thread(name="Adder", target=resolve) - adder.daemon = True - adder.start() - - def work(self, loader): - while self._running: - pc = self._queue.get_piece() - if not self._running: - break - try: - start = time.time() - p = loader.load_piece(pc, self.piece_size) - self._cb(p.piece, p.data) - if self.speed_limit: - dur = time.time() - start - expected = self.piece_size / 1024.0 / self.speed_limit - wait_time = expected - dur - if wait_time > 0: - logger.debug('Waiting %f on %s', wait_time, threading.current_thread().name) - time.sleep(wait_time) - except Exception, e: - logger.exception('(%s) Error when loading piece %d: %s', threading.current_thread().name, pc, e) - - def stop(self): - self._running = False - # push some dummy tasks to assure workers ends - for i in xrange(len(self._threads)): - self._queue.put_piece(i, PriorityQueue.NO_DOWNLOAD) - - def add_piece(self, piece, priority=PriorityQueue.NORMAL_PRIORITY, remove_previous=False): - self._queue.put_piece(piece, priority, remove_previous) - - -class HTClient(BaseClient): - def __init__(self, path_to_store, args=None, piece_size=2 * 1024 * 1024, no_threads=2, resolver_class=None): - BaseClient.__init__(self, path_to_store, args=args) - self._pool = None - self.resolver_class = resolver_class - self._no_threads = self.resolver_class.THREADS if self.resolver_class and hasattr(self.resolver_class, - 'THREADS') else no_threads - self.piece_size = piece_size - self._last_downloaded = deque(maxlen=60) - - def update_piece(self, piece, data): - self._file.update_piece(piece, data) - if not self._ready and hasattr(self._file, 'filehash') and self._file.filehash \ - and all(self._file.pieces[:5]): - self._set_ready(self._file.is_complete) - - def request_piece(self, piece, priority): - if self._pool: - self._pool.add_piece(piece, priority) - else: - raise Exception('Pool not started') - - def _on_file_ready(self, filehash): - self._file.filehash = filehash - if self.is_file_complete: - self._set_ready(True) - - def _set_ready(self, complete): - self._ready = True - if self._on_ready_action: - self._on_ready_action(self._file, complete) - - def start_url(self, uri): - self._monitor.start() - path = self.resolver_class.url_to_file(uri) - c0 = None - try: - self._file = HTFile(path, self._base_path, 0, self.piece_size, self.request_piece) - except HTFile.UnknownSize: - c0 = HTTPLoader(uri, 0, self.resolver_class) - p = c0.load_piece(0, self.piece_size) - self._file = HTFile(path, self._base_path, p.total_size, self.piece_size, self.request_piece) - self.update_piece(0, p.data) - self._file.mime = p.type - - if not self._file.is_complete: - c0 = c0 or HTTPLoader(uri, 0, self.resolver_class) - self._pool = Pool(self.piece_size, [c0], - self.update_piece, - speed_limit=self.resolver_class.SPEED_LIMIT if hasattr(self.resolver_class, - 'SPEED_LIMIT') else None) - - def gen_loader(i): - return HTTPLoader(uri, i, self.resolver_class) - - for i in xrange(1, self._no_threads): - self._pool.add_worker_async(i, gen_loader, (i,)) - # get remaining pieces with normal priority - for i in xrange(1, self._file.last_piece + 1): - if not self._file.pieces[i]: - self._pool.add_piece(i) - - #self.hash = Hasher(self._file, self._on_file_ready) - - @property - def is_file_complete(self): - return self.file.is_complete if self.file else False - - @property - def unique_file_id(self): - return self.file.filehash - - Status = collections.namedtuple('Status', - ['state', 'downloaded', 'download_rate', 'total_size', 'threads', 'desired_rate']) - - @property - def status(self): - tick = time.time() - - state = 'starting' if not self.file else 'finished' if self.is_file_complete else 'downloading' - downloaded = self.file.downloaded if self.file else 0 - if self._last_downloaded: - prev_time, prev_down = self._last_downloaded[0] - download_rate = (downloaded - prev_down) / (tick - prev_time) - else: - download_rate = 0 - total_size = self.file.size if self.file else 0 - threads = self._no_threads - desired_rate = self._file.byte_rate if self._file else 0 - - if self.file: - self._last_downloaded.append((tick, downloaded)) - return HTClient.Status(state, downloaded, download_rate, total_size, threads, desired_rate) - - def get_normalized_status(self): - s = self.status - return {'source_type': 'http', - 'state': s.state, - 'downloaded': s.downloaded, - 'total_size': s.total_size, - 'download_rate': s.download_rate, - 'desired_rate': s.desired_rate, - 'piece_size': self._file.piece_size if self._file else 0, - 'progress': s.downloaded / float(s.total_size) if s.total_size else 0, - # HTTP specific - 'threads': s.threads - } - - def close(self): - if self._file: - self._file.close() - BaseClient.close(self) - - def print_status(self, s, client): - progress = s.downloaded / float(s.total_size) * 100 if s.total_size else 0 - total_size = s.total_size / 1048576.0 - - color = '' - if progress >= 100.0 or not s.desired_rate or s.state in ('finished', 'starting'): - color = TerminalColor.default - elif s.desired_rate > s.download_rate: - color = TerminalColor.red - elif s.download_rate > s.desired_rate and s.download_rate < s.desired_rate * 1.2: - color = TerminalColor.yellow - else: - color = TerminalColor.green - - print '\r%.2f%% (of %.1fMB) down: %s%.1f kB/s\033[39m(need %.1f) %s' % \ - (progress, total_size, - color, s.download_rate / 1000.0, s.desired_rate / 1000.0 if s.desired_rate else 0.0, - s.state), - sys.stdout.write("\033[K") - sys.stdout.flush() - - -class HTFile(AbstractFile): - def __init__(self, path, base, size, piece_size=2097152, prioritize_fn=None): - self._full_path = os.path.join(base, path) - self._prioritize_fn = prioritize_fn - self.pieces = None - self.mime = None - size = self._load_cached(size) - AbstractFile.__init__(self, path, base, size, piece_size) - if not self.pieces or len(self.pieces) != self.last_piece + 1: - self.pieces = [False for _i in xrange(self.last_piece + 1)] - self._file = open(self.full_path, 'r+b') - - class UnknownSize(Exception): - pass - - def _load_cached(self, size): - if not os.path.exists(self.full_path) and size: - self._allocate_file(size) - return size - elif os.path.exists(self.full_path): - sz = os.stat(self.full_path).st_size - if size and size != sz: - logger.warn('Invalid cached file') - self._allocate_file(size) - return size - pcs_file = self.pieces_index_file - if os.access(pcs_file, os.R_OK): - with open(pcs_file, 'rb') as f: - pcs = pickle.load(f) - if isinstance(pcs, tuple): - self.pieces = pcs[1] - self.mime = pcs[0] - else: - logger.warn('Invalid pieces file %s', pcs_file) - return sz - else: - raise HTFile.UnknownSize() - - @property - def pieces_index_file(self): - return self.full_path + '.pieces' - - def _allocate_file(self, size): - path = os.path.split(self.full_path)[0] - if not os.path.exists(path): - os.makedirs(path, mode=0755) - # sparecelly allocate file - with open(self.full_path, 'ab') as f: - f.truncate(size) - - def update_piece(self, n, data): - assert n != self.last_piece and len(data) == self.piece_size or \ - n == self.last_piece and len(data) == ( - self.size % self.piece_size) or self.piece_size, "Invalid size of piece %d - %d" % (n, len(data)) - assert n >= self.first_piece and n <= self.last_piece, 'Invalid piece no %d' % n - - with self._lock: - if not self.pieces[n]: - self._file.seek(n * self.piece_size) - self._file.write(data) - self._file.flush() - self.pieces[n] = True - logger.debug('Received piece %d in thread %s', n, threading.current_thread().name) - AbstractFile.update_piece(self, n, data) - - def prioritize_piece(self, piece, idx): - assert piece >= self.first_piece and piece <= self.last_piece, 'Invalid piece no %d' % piece - data = None - with self._lock: - if self.pieces[piece]: - sz = self.piece_size if piece < self.last_piece else self.size % self.piece_size - self._file.seek(piece * self.piece_size) - data = self._file.read(sz) - assert len(data) == sz - if data: - AbstractFile.update_piece(self, piece, data) - elif self._prioritize_fn: - self._prioritize_fn(piece, idx) - else: - assert False, 'Missing prioritize fn' - - @property - def is_complete(self): - return all(self.pieces) - - @property - def downloaded(self): - sum = 0 - for i, p in enumerate(self.pieces): - if p and i == self.last_piece: - sum += (self.size % self.piece_size) or self.piece_size - elif p: - sum += self.piece_size - - return sum - - def remove(self): - AbstractFile.remove(self) - if os.path.exists(self.pieces_index_file): - os.unlink(self.pieces_index_file) - - def close(self): - self._file.close() - d = os.path.split(self.pieces_index_file)[0] - if d and os.path.isdir(d): - with open(self.pieces_index_file, 'wb') as f: - pickle.dump((self.mime, self.pieces), f)