pull/1/head
DiMartinoXBMC 2015-08-08 23:22:15 +03:00
parent 597f47b319
commit 63465725a3
5 changed files with 68 additions and 561 deletions

View File

@ -109,7 +109,7 @@ class BTClientPlayer(xbmc.Player):
bt_upload_limit=self.upload_limit, bt_upload_limit=self.upload_limit,
choose_subtitles=False, choose_subtitles=False,
clear_older=0, clear_older=0,
debug_log='',#os.path.join(self.userStorageDirectory, 'log.txt'), debug_log=os.path.join(self.userStorageDirectory, 'log.txt'),
delete_on_finish=False, delete_on_finish=False,
directory=self.userStorageDirectory, directory=self.userStorageDirectory,
listen_port_max=6891,# listen_port_max=6891,#
@ -122,7 +122,7 @@ class BTClientPlayer(xbmc.Player):
stdin=False, stdin=False,
stream=True, stream=True,
subtitles=None, subtitles=None,
trace=False, trace=True,
content_id=self.contentId, content_id=self.contentId,
url=self.torrentUrl) url=self.torrentUrl)
args=main(args) #config args=main(args) #config
@ -226,16 +226,16 @@ class BTClientPlayer(xbmc.Player):
def buffer(self): def buffer(self):
iterator = 0 iterator = 0
progressBar = xbmcgui.DialogProgress() progressBar = xbmcgui.DialogProgress()
progressBar.create(self.localize('Please Wait') + str(' [%s]' % str(self.lt.version)), progressBar.create('%s [%s]' % (self.__class__.__name__,str(self.lt.version)),
self.localize('Seeds searching.')) self.localize('Seeds searching.'))
while iterator < 100:# not self.c.is_file_ready while not self.c.is_file_ready:#iterator < 100:
iterator = 0 iterator = 0
ready_list=[] ready_list=[]
status = self.c.get_normalized_status() status = self.c.get_normalized_status()
conditions=[status['state'] in ['downloading', 'finished', 'seeding'], status['desired_rate'] > 0 or status['progress'] > 0.02, conditions=[status['state'] in ['downloading', 'finished', 'seeding'], status['desired_rate'] > 0 or status['progress'] > 0.01,
status['progress'] > 0.01, self.c.is_file_ready or status['progress'] > 0.02, status['progress'] > 0.005, self.c.is_file_ready,# or status['progress'] > 0.02
status['desired_rate'] > 0 or status['progress'] > 0.02 and (status['download_rate'] > status['desired_rate'] or (status['download_rate'] > status['desired_rate'] or
status['download_rate'] * status['progress'] * 100 > status['desired_rate'])] status['download_rate'] * int(status['progress'] * 100) > status['desired_rate'])]
for cond in conditions: for cond in conditions:
if cond: if cond:
ready_list.append(True) ready_list.append(True)
@ -243,18 +243,18 @@ class BTClientPlayer(xbmc.Player):
else: else:
ready_list.append(False) ready_list.append(False)
speedsText = '%s: %s Mbit/s %s %s: %s Mbit/s' % (self.localize('Bitrate'), str(int(status['desired_rate'] * 8 / (1024 * 1024))) if status['desired_rate'] else 0, speedsText = '%s: %s Mbit/s %s %s: %s Mbit/s' % (self.localize('Download speed'),str(status['download_rate'] * 8 / 1000000),
'[COLOR=green]>[/COLOR]' if ready_list[4] else '[COLOR=red]<[/COLOR]', '[COLOR=green]>[/COLOR]' if ready_list[4] else '[COLOR=red]<[/COLOR]',
self.localize('Download speed'),str(status['download_rate'] * 8 / 1000000)) self.localize('Bitrate'), str(int(status['desired_rate'] * 8 / (1024 * 1024))) if status['desired_rate'] else 0,)
if status['state'] in ['queued','checking','checking fastresume'] or (status['progress'] == 0 and status['num_pieces'] > 0): if status['state'] in ['queued','checking','checking fastresume'] or (status['progress'] == 0 and status['num_pieces'] > 0):
progressBar.update(iterator, self.localize('Checking preloaded files...'), speedsText, ' ') progressBar.update(iterator, self.localize('Checking preloaded files...'), speedsText, ' ')
elif status['state'] in ['downloading', 'finished', 'seeding']: elif status['state'] in ['downloading', 'finished', 'seeding']:
dialogText = self.localize('Preloaded: ') + '%s MB %s %s MB (%s MB)' % \ dialogText = self.localize('Preloaded: ') + '%s MB %s %s MB (%s MB)' % \
(str(status['downloaded'] / 1024 / 1024), '[COLOR=green]>[/COLOR]' if ready_list[3] else '[COLOR=red]<[/COLOR]', str(status['total_size'] / 1024 / 1024 /100), str(status['total_size'] / 1024 / 1024)) (str(status['downloaded'] / 1024 / 1024), '[COLOR=green]>[/COLOR]' if ready_list[2] else '[COLOR=red]<[/COLOR]', str(status['total_size'] / 1024 / 1024 /200), str(status['total_size'] / 1024 / 1024))
peersText = '[%s: %s; %s: %s] %s: %s' % (self.localize('Seeds'), str(status['seeds_connected']), self.localize('Peers'), peersText = '%s: %s [%s: %s; %s: %s]' % (self.localize('File ready: '), '[COLOR=green]YES[/COLOR]' if ready_list[3] else '[COLOR=red]NO[/COLOR]',
str(status['peers_connected']), self.localize('File ready: '), '[COLOR=green]YES[/COLOR]' if ready_list[2] else '[COLOR=red]NO[/COLOR]') self.localize('Seeds'), str(status['seeds_connected']), self.localize('Peers'), str(status['peers_connected']),)
progressBar.update(iterator, peersText, speedsText, dialogText, progressBar.update(iterator, peersText, speedsText, dialogText,
) )
else: else:
@ -346,7 +346,7 @@ class BTClientPlayer(xbmc.Player):
playlist = xbmc.PlayList(xbmc.PLAYLIST_VIDEO) playlist = xbmc.PlayList(xbmc.PLAYLIST_VIDEO)
playlist.clear() playlist.clear()
playlist.add(url, listitem) playlist.add(url, listitem)
xbmc.Player().play(playlist, listitem) xbmc.Player().play(playlist)
log("Serving file on %s" % url) log("Serving file on %s" % url)
return True return True
@ -370,18 +370,17 @@ class BTClientPlayer(xbmc.Player):
f() f()
log('[onPlayBackStopped]: '+(str(("video", "stop", self.display_name)))) log('[onPlayBackStopped]: '+(str(("video", "stop", self.display_name))))
def onPlayBackSeek(self, x ,y): def onPlayBackSeek(self, x ,y):
log('[onPlayBackSeek]: '+(str(("video", "seek", self.display_name)))) log('[onPlayBackSeek]: '+(str(("video", "seek", self.display_name))))
xbmc.Player().pause() #xbmc.Player().pause()
if self.buffer(): #if self.buffer():
xbmc.Player().play() # xbmc.Player().play()
def onPlayBackSeekChapter(self, x): def onPlayBackSeekChapter(self, x):
log('[onPlayBackSeek]: '+(str(("video", "seek", self.display_name)))) log('[onPlayBackSeek]: '+(str(("video", "seek", self.display_name))))
xbmc.Player().pause() #xbmc.Player().pause()
if self.buffer(): #if self.buffer():
xbmc.Player().play() # xbmc.Player().play()
@contextmanager @contextmanager
def attach(self, callback, *events): def attach(self, callback, *events):

View File

@ -163,7 +163,10 @@ class Libtorrent:
} }
progressBar = xbmcgui.DialogProgress() progressBar = xbmcgui.DialogProgress()
progressBar.create(Localization.localize('Please Wait'), Localization.localize('Magnet-link is converting')) progressBar.create(Localization.localize('Please Wait'), Localization.localize('Magnet-link is converting'))
#try:
self.torrentHandle = self.session.add_torrent(magnetSettings) self.torrentHandle = self.session.add_torrent(magnetSettings)
#except:
# self.torrentHandle = self.lt.add_magnet_uri(self.session, self.magnetLink, magnetSettings)
iterator = 0 iterator = 0
while iterator < 100: while iterator < 100:
xbmc.sleep(500) xbmc.sleep(500)
@ -178,7 +181,11 @@ class Libtorrent:
progressBar.update(0) progressBar.update(0)
progressBar.close() progressBar.close()
if self.torrentHandle.status().has_metadata: if self.torrentHandle.status().has_metadata:
return self.torrentHandle.torrent_file() try:
info = self.torrentHandle.torrent_file()
except:
info = self.torrentHandle.get_torrent_info()
return info
def magnetToTorrent(self, magnet): def magnetToTorrent(self, magnet):
self.magnetLink = magnet self.magnetLink = magnet

View File

@ -21,7 +21,6 @@ import shutil
from cachebt import CacheBT from cachebt import CacheBT
from common import AbstractFile, Hasher, BaseMonitor, BaseClient, Resolver from common import AbstractFile, Hasher, BaseMonitor, BaseClient, Resolver
from htclient import HTClient
logging.basicConfig() logging.basicConfig()
logger = logging.getLogger() logger = logging.getLogger()
@ -74,7 +73,7 @@ class StreamServer(SocketServer.ThreadingMixIn, htserver.HTTPServer):
def set_file(self, f): def set_file(self, f):
self.file = f self.file = f
def serve(self, w=0.1): def serve(self, w):
while self._running: while self._running:
try: try:
self.handle_request() self.handle_request()
@ -83,8 +82,8 @@ class StreamServer(SocketServer.ThreadingMixIn, htserver.HTTPServer):
print >> sys.stderr, str(e) print >> sys.stderr, str(e)
def run(self): def run(self):
self.timeout = 0.1 self.timeout = 0.5
t = Thread(target=self.serve, args=[0.1], name='HTTP Server') t = Thread(target=self.serve, args=[self.timeout], name='HTTP Server')
t.daemon = True t.daemon = True
t.start() t.start()
@ -107,7 +106,6 @@ class BTFileHandler(htserver.BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1' protocol_version = 'HTTP/1.1'
def do_GET(self): def do_GET(self):
if self.do_HEAD(only_header=False): if self.do_HEAD(only_header=False):
with self.server.file.create_cursor(self._offset) as f: with self.server.file.create_cursor(self._offset) as f:
send_something = False send_something = False
@ -150,7 +148,7 @@ class BTFileHandler(htserver.BaseHTTPRequestHandler):
range = None # @ReservedAssignment range = None # @ReservedAssignment
if self.server.allow_range: if self.server.allow_range:
range = parse_range(self.headers.get('Range', None)) # @ReservedAssignment range = parse_range(self.headers.get('Range', None)) # @ReservedAssignment
if range: if range not in [None, False]:
self._offset = range self._offset = range
range = (range, size - 1, size) # @ReservedAssignment range = (range, size - 1, size) # @ReservedAssignment
logger.debug('Request range %s - (header is %s', range, self.headers.get('Range', None)) logger.debug('Request range %s - (header is %s', range, self.headers.get('Range', None))
@ -162,8 +160,8 @@ class BTFileHandler(htserver.BaseHTTPRequestHandler):
self.send_error(404, 'Not Found') self.send_error(404, 'Not Found')
def send_resp_header(self, cont_type, cont_length, range=False, only_header=False): # @ReservedAssignment def send_resp_header(self, cont_type, cont_length, range=False, only_header=False): # @ReservedAssignment
# logger.debug('range is %s'% str(range)) logger.debug('range is %s'% str(range))
if self.server.allow_range and range: if self.server.allow_range and range not in [None, False]:
self.send_response(206, 'Partial Content') self.send_response(206, 'Partial Content')
else: else:
self.send_response(200, 'OK') self.send_response(200, 'OK')
@ -175,13 +173,14 @@ class BTFileHandler(htserver.BaseHTTPRequestHandler):
self.send_header('Accept-Ranges', 'bytes') self.send_header('Accept-Ranges', 'bytes')
else: else:
self.send_header('Accept-Ranges', 'none') self.send_header('Accept-Ranges', 'none')
if self.server.allow_range and range: if self.server.allow_range and range not in [None, False]:
if isinstance(range, (types.TupleType, types.ListType)) and len(range) == 3: if isinstance(range, (types.TupleType, types.ListType)) and len(range) == 3:
self.send_header('Content-Range', 'bytes %d-%d/%d' % range) self.send_header('Content-Range', 'bytes %d-%d/%d' % range)
self.send_header('Content-Length', range[1] - range[0] + 1) self.send_header('Content-Length', range[1] - range[0] + 1)
else: else:
raise ValueError('Invalid range value') raise ValueError('Invalid range value')
else: else:
self.send_header('Content-Range', 'bytes %d-%d/%d' % (range, cont_length-1, cont_length))
self.send_header('Content-Length', cont_length) self.send_header('Content-Length', cont_length)
self._finish_header(only_header) self._finish_header(only_header)
@ -203,7 +202,7 @@ class BTClient(BaseClient):
self.lt=lt self.lt=lt
self._cache = CacheBT(path_to_store, self.lt) self._cache = CacheBT(path_to_store, self.lt)
self._torrent_params = {'save_path': path_to_store, self._torrent_params = {'save_path': path_to_store,
#'storage_mode': self.lt.storage_mode_t.storage_mode_sparse 'storage_mode': self.lt.storage_mode_t.storage_mode_sparse
} }
if not state_file: if not state_file:
state_file=os.path.join(path_to_store,'.btclient_state') state_file=os.path.join(path_to_store,'.btclient_state')
@ -232,9 +231,9 @@ class BTClient(BaseClient):
self._hash = None self._hash = None
self._url = None self._url = None
#if args and args.debug_log and args.trace: if args and args.debug_log and args.trace:
# self.add_monitor_listener(self.debug_download_queue) self.add_monitor_listener(self.debug_download_queue)
# self.add_dispatcher_listener(self.debug_alerts) self.add_dispatcher_listener(self.debug_alerts)
@property @property
def is_file_complete(self): def is_file_complete(self):
@ -247,7 +246,10 @@ class BTClient(BaseClient):
def _check_ready(self, s, **kwargs): def _check_ready(self, s, **kwargs):
if s.state in [3, 4, 5] and not self._file and s.progress > 0: if s.state in [3, 4, 5] and not self._file and s.progress > 0:
self._meta_ready(self._th.torrent_file()) try:
self._meta_ready(self._th.torrent_file())
except:
self._meta_ready(self._th.get_torrent_info())
logger.debug('Got torrent metadata and start download') logger.debug('Got torrent metadata and start download')
self.hash = True self.hash = True
self.hash = Hasher(self._file, self._on_file_ready) self.hash = Hasher(self._file, self._on_file_ready)
@ -277,7 +279,11 @@ class BTClient(BaseClient):
(f.path, fmap.piece, fmap.start, fmap.length, (f.path, fmap.piece, fmap.start, fmap.length,
meta.num_pieces(), meta.piece_length())) meta.num_pieces(), meta.piece_length()))
self._cache.file_complete(self._th.torrent_file(), try:
meta = self._th.torrent_file()
except:
meta=self._th.get_torrent_info()
self._cache.file_complete(meta,
self._url if self._url and self._url.startswith('http') else None) self._url if self._url and self._url.startswith('http') else None)
def prioritize_piece(self, pc, idx): def prioritize_piece(self, pc, idx):
@ -294,7 +300,10 @@ class BTClient(BaseClient):
self._th.reset_piece_deadline(i) self._th.reset_piece_deadline(i)
def prioritize_file(self): def prioritize_file(self):
meta = self._th.torrent_file() try:
meta = self._th.torrent_file()
except:
meta=self._th.get_torrent_info()
priorities = [1 if i >= self._file.first_piece and i <= self.file.last_piece else 0 \ priorities = [1 if i >= self._file.first_piece and i <= self.file.last_piece else 0 \
for i in xrange(meta.num_pieces())] for i in xrange(meta.num_pieces())]
self._th.prioritize_pieces(priorities) self._th.prioritize_pieces(priorities)
@ -316,7 +325,11 @@ class BTClient(BaseClient):
@property @property
def unique_file_id(self): def unique_file_id(self):
return str(self._th.torrent_file().info_hash()) try:
meta = self._th.torrent_file()
except:
meta=self._th.get_torrent_info()
return str(meta.info_hash())
@property @property
def pieces(self): def pieces(self):
@ -378,8 +391,6 @@ class BTClient(BaseClient):
self._th.set_sequential_download(True) self._th.set_sequential_download(True)
time.sleep(1) time.sleep(1)
self._th.force_dht_announce() self._th.force_dht_announce()
# if tp.has_key('ti'):
# self._meta_ready(self._th.torrent_file())
self._monitor.start() self._monitor.start()
self._dispatcher.do_start(self._th, self._ses) self._dispatcher.do_start(self._th, self._ses)
@ -652,7 +663,7 @@ def main(args=None):
SPEED_LIMIT = 300 SPEED_LIMIT = 300
THREADS = 2 THREADS = 2
stream(args, HTClient, TestResolver) #stream(args, HTClient, TestResolver)
else: else:
#rclass = plugins.find_matching_plugin(args.url) #rclass = plugins.find_matching_plugin(args.url)
#if rclass: #if rclass:

View File

@ -6,7 +6,6 @@ Created on May 3, 2015
import os import os
from collections import deque from collections import deque
import logging import logging
from threading import Lock, Event, Thread
import copy import copy
import threading import threading
import traceback import traceback
@ -18,7 +17,6 @@ from hachoir_metadata import extractMetadata
from hachoir_parser import guessParser from hachoir_parser import guessParser
import hachoir_core.config as hachoir_config import hachoir_core.config as hachoir_config
from hachoir_core.stream.input import InputIOStream from hachoir_core.stream.input import InputIOStream
#from opensubtitle import OpenSubtitles
logger = logging.getLogger('common') logger = logging.getLogger('common')
hachoir_config.quiet = True hachoir_config.quiet = True
@ -52,9 +50,9 @@ def debug_fn(fn):
return _fn return _fn
class Hasher(Thread): class Hasher(threading.Thread):
def __init__(self, btfile, hash_cb): def __init__(self, btfile, hash_cb):
Thread.__init__(self, name="Hasher") threading.Thread.__init__(self, name="Hasher")
if btfile is None: if btfile is None:
raise ValueError('BTFile is None!') raise ValueError('BTFile is None!')
self._btfile = btfile self._btfile = btfile
@ -64,7 +62,6 @@ class Hasher(Thread):
self.start() self.start()
def run(self): def run(self):
pass
with self._btfile.create_cursor() as c: with self._btfile.create_cursor() as c:
filehash = OpenSubtitles.hash_file(c, self._btfile.size) filehash = OpenSubtitles.hash_file(c, self._btfile.size)
self.hash = filehash self.hash = filehash
@ -106,20 +103,17 @@ class OpenSubtitles(object):
returnedhash = "%016x" % hash returnedhash = "%016x" % hash
return returnedhash return returnedhash
class BaseMonitor(Thread): class BaseMonitor(threading.Thread):
def __init__(self, client, name): def __init__(self, client, name):
Thread.__init__(self, name=name) threading.Thread.__init__(self, name=name)
self.daemon = True self.daemon = True
self._listeners = [] self._listeners = []
self._lock = Lock() self._lock = threading.Lock()
self._wait_event = Event() self._wait_event = threading.Event()
self._running = True self._running = True
self._client = client self._client = client
self._ses = None self._ses = None
def add_to_ctx(self, key, val):
self._ctx[key] = val
def stop(self): def stop(self):
self._running = False self._running = False
self._wait_event.set() self._wait_event.set()
@ -148,7 +142,6 @@ class BaseClient(object):
self._client = client self._client = client
def run(self): def run(self):
while (self._running): while (self._running):
s = self._client.status s = self._client.status
with self._lock: with self._lock:
@ -244,8 +237,8 @@ class PieceCache(object):
def __init__(self, btfile): def __init__(self, btfile):
# self._btfile=btfile # self._btfile=btfile
self._cache = [None] * self.size self._cache = [None] * self.size
self._lock = Lock() self._lock = threading.Lock()
self._event = Event() self._event = threading.Event()
self._cache_first = btfile.first_piece self._cache_first = btfile.first_piece
self._piece_size = btfile.piece_size self._piece_size = btfile.piece_size
self._map_offset = btfile.map_piece self._map_offset = btfile.map_piece
@ -400,7 +393,7 @@ class AbstractFile(object):
self._full_path = os.path.join(base, path) self._full_path = os.path.join(base, path)
self._cursors = [] self._cursors = []
self._cursors_history = deque(maxlen=3) self._cursors_history = deque(maxlen=3)
self._lock = Lock() self._lock = threading.Lock()
self.first_piece = 0 self.first_piece = 0
self.last_piece = self.first_piece + (max(size - 1, 0)) // piece_size self.last_piece = self.first_piece + (max(size - 1, 0)) // piece_size

View File

@ -1,503 +0,0 @@
'''
Created on May 3, 2015
@author: ivan
'''
import urllib2
import os.path
import pickle
import logging
from cookielib import CookieJar
from collections import namedtuple
import random
from httplib import BadStatusLine, IncompleteRead
import socket
import time
import re
import Queue
import collections
from threading import Lock, Thread
import sys
import threading
from urllib import urlencode
import zlib
from io import BytesIO
import gzip
import json
from collections import deque
from common import AbstractFile, BaseClient, Resolver, TerminalColor #Hasher
from bs4 import BeautifulSoup
logger = logging.getLogger('htclient')
Piece = namedtuple('Piece', ['piece', 'data', 'total_size', 'type'])
class HTTPLoader(object):
UA_STRING = ['Mozilla/5.0 (Windows NT 6.3; rv:36.0) Gecko/20100101 Firefox/36.0',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A']
PARSER = 'lxml' # 'html5lib'#'html.parser'#'lxml'
class Error(Exception):
pass
def __init__(self, url, id, resolver_class=None):
self.id = id
self.user_agent = self._choose_ua()
resolver_class = resolver_class or Resolver
self._client = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar()))
self.url = self.resolve_file_url(resolver_class, url)
if not self.url:
raise HTTPLoader.Error('Urlwas not resolved to file link')
def resolve_file_url(self, resolver_class, url):
r = resolver_class(self)
return r.resolve(url)
def _choose_ua(self):
return HTTPLoader.UA_STRING[random.randint(0, len(HTTPLoader.UA_STRING) - 1)]
RANGE_RE = re.compile(r'bytes\s+(\d+)-(\d+)/(\d+)')
def _parse_range(self, r):
m = HTTPLoader.RANGE_RE.match(r)
if not m:
raise HTTPLoader.Error('Invalid range header')
return int(m.group(1)), int(m.group(2)), int(m.group(3))
def open(self, url, data=None, headers={}, method='get'):
hdr = {'User-Agent': self.user_agent}
hdr.update(headers)
url, post_args = self._encode_data(url, data, method)
req = urllib2.Request(url, post_args, headers=headers)
res = None
retries = 5
while retries:
try:
res = self._client.open(req, timeout=10)
break
except (IOError, urllib2.HTTPError, BadStatusLine, IncompleteRead, socket.timeout) as e:
if isinstance(e, urllib2.HTTPError) and hasattr(e, 'code') and str(e.code) == '404':
raise HTTPLoader.Error('Url %s not found', url)
logging.warn('Retry on (%s) due to IO or HTTPError (%s) ', threading.current_thread().name, e)
retries -= 1
time.sleep(1)
if not res:
raise HTTPLoader.Error('Cannot open resource %s' % url)
return res
def load_piece(self, piece_no, piece_size):
start = piece_no * piece_size
headers = {'Range': 'bytes=%d-' % start}
res = self.open(self.url, headers=headers)
allow_range_header = res.info().getheader('Accept-Ranges')
if allow_range_header and allow_range_header.lower() == 'none':
raise HTTPLoader.Error('Ranges are not supported')
size_header = res.info().getheader('Content-Length')
total_size = int(size_header) if size_header else None
range_header = res.info().getheader('Content-Range')
if not range_header:
if piece_no and not total_size:
raise HTTPLoader.Error('Ranges are not supported')
else:
from_pos, to_pos, size = 0, total_size - 1, total_size
else:
from_pos, to_pos, size = self._parse_range(range_header)
type_header = res.info().getheader('Content-Type')
if not type_header:
raise HTTPLoader.Error('Content Type is missing')
data = res.read(piece_size)
res.close()
return Piece(piece_no, data, size, type_header)
@staticmethod
def decode_data(res):
header = res.info()
data = res.read()
if header.get('Content-Encoding') == 'gzip':
tmp_stream = gzip.GzipFile(fileobj=BytesIO(data))
data = tmp_stream.read()
elif header.get('Content-Encoding') == 'deflate':
data = zlib.decompress(data)
return data
def _encode_data(self, url, data, method='post'):
if not data:
return url, None
if method.lower() == 'post':
return url, urlencode(data)
else:
return url + '?' + urlencode(data), None
def load_page(self, url, data=None, method='get'):
res = self.open(url, data, headers={'Accept-Encoding': "gzip, deflate"}, method=method)
# Content-Type:"text/html; charset=utf-8"
type_header = res.info().getheader('Content-Type')
if not type_header.startswith('text/html'):
raise HTTPLoader.Error("%s is not HTML page" % url)
# Content-Encoding:"gzip"
data = HTTPLoader.decode_data(res)
pg = BeautifulSoup(data, HTTPLoader.PARSER)
return pg
def load_json(self, url, data, method='get'):
res = self.open(url, data, headers={'Accept-Encoding': "gzip, deflate", 'X-Requested-With': 'XMLHttpRequest'},
method=method)
type_header = res.info().getheader('Content-Type')
if not type_header.startswith('application/json'):
raise HTTPLoader.Error("%s is not JSON" % url)
# Content-Encoding:"gzip"
data = HTTPLoader.decode_data(res)
return json.loads(data, encoding='utf8')
def get_redirect(self, url, data, method='get'):
pass
class PriorityQueue(Queue.PriorityQueue):
NORMAL_PRIORITY = 99
NO_DOWNLOAD = 999
NO_PIECE = -1
def __init__(self):
Queue.PriorityQueue.__init__(self, maxsize=0)
self._wset = {}
self._lock = Lock()
def put_piece(self, piece, priority, remove_previous=False):
# do not put piece which is already in queue
with self._lock:
if piece in self._wset:
if self._wset[piece][0] == priority:
return
else:
self.remove_piece(piece)
if remove_previous:
for k in self._wset:
if k < piece:
self.remove_piece(k)
entry = [priority, piece]
self._wset[piece] = entry
self.put(entry, block=False)
def remove_piece(self, piece):
entry = self._wset.pop(piece)
entry[-1] = PriorityQueue.NO_PIECE
def get_piece(self, timeout=None):
while True:
_priority, piece = self.get(timeout=timeout)
with self._lock:
if piece is not PriorityQueue.NO_PIECE:
del self._wset[piece]
return piece
class Pool(object):
def __init__(self, piece_size, loaders, cb, speed_limit=None):
self.piece_size = piece_size
self._cb = cb
self.speed_limit = speed_limit
self._queue = PriorityQueue()
self._running = True
self._threads = [Thread(name="worker %d" % i, target=self.work, args=[l]) for i, l in enumerate(loaders)]
for t in self._threads:
t.daemon = True
t.start()
def add_worker_async(self, id, gen_fn, args=[], kwargs={}):
def resolve():
l = gen_fn(*args, **kwargs)
t = Thread(name="worker %d" % id, target=self.work, args=[l])
t.daemon = True
t.start()
self._threads.append(t)
adder = Thread(name="Adder", target=resolve)
adder.daemon = True
adder.start()
def work(self, loader):
while self._running:
pc = self._queue.get_piece()
if not self._running:
break
try:
start = time.time()
p = loader.load_piece(pc, self.piece_size)
self._cb(p.piece, p.data)
if self.speed_limit:
dur = time.time() - start
expected = self.piece_size / 1024.0 / self.speed_limit
wait_time = expected - dur
if wait_time > 0:
logger.debug('Waiting %f on %s', wait_time, threading.current_thread().name)
time.sleep(wait_time)
except Exception, e:
logger.exception('(%s) Error when loading piece %d: %s', threading.current_thread().name, pc, e)
def stop(self):
self._running = False
# push some dummy tasks to assure workers ends
for i in xrange(len(self._threads)):
self._queue.put_piece(i, PriorityQueue.NO_DOWNLOAD)
def add_piece(self, piece, priority=PriorityQueue.NORMAL_PRIORITY, remove_previous=False):
self._queue.put_piece(piece, priority, remove_previous)
class HTClient(BaseClient):
def __init__(self, path_to_store, args=None, piece_size=2 * 1024 * 1024, no_threads=2, resolver_class=None):
BaseClient.__init__(self, path_to_store, args=args)
self._pool = None
self.resolver_class = resolver_class
self._no_threads = self.resolver_class.THREADS if self.resolver_class and hasattr(self.resolver_class,
'THREADS') else no_threads
self.piece_size = piece_size
self._last_downloaded = deque(maxlen=60)
def update_piece(self, piece, data):
self._file.update_piece(piece, data)
if not self._ready and hasattr(self._file, 'filehash') and self._file.filehash \
and all(self._file.pieces[:5]):
self._set_ready(self._file.is_complete)
def request_piece(self, piece, priority):
if self._pool:
self._pool.add_piece(piece, priority)
else:
raise Exception('Pool not started')
def _on_file_ready(self, filehash):
self._file.filehash = filehash
if self.is_file_complete:
self._set_ready(True)
def _set_ready(self, complete):
self._ready = True
if self._on_ready_action:
self._on_ready_action(self._file, complete)
def start_url(self, uri):
self._monitor.start()
path = self.resolver_class.url_to_file(uri)
c0 = None
try:
self._file = HTFile(path, self._base_path, 0, self.piece_size, self.request_piece)
except HTFile.UnknownSize:
c0 = HTTPLoader(uri, 0, self.resolver_class)
p = c0.load_piece(0, self.piece_size)
self._file = HTFile(path, self._base_path, p.total_size, self.piece_size, self.request_piece)
self.update_piece(0, p.data)
self._file.mime = p.type
if not self._file.is_complete:
c0 = c0 or HTTPLoader(uri, 0, self.resolver_class)
self._pool = Pool(self.piece_size, [c0],
self.update_piece,
speed_limit=self.resolver_class.SPEED_LIMIT if hasattr(self.resolver_class,
'SPEED_LIMIT') else None)
def gen_loader(i):
return HTTPLoader(uri, i, self.resolver_class)
for i in xrange(1, self._no_threads):
self._pool.add_worker_async(i, gen_loader, (i,))
# get remaining pieces with normal priority
for i in xrange(1, self._file.last_piece + 1):
if not self._file.pieces[i]:
self._pool.add_piece(i)
#self.hash = Hasher(self._file, self._on_file_ready)
@property
def is_file_complete(self):
return self.file.is_complete if self.file else False
@property
def unique_file_id(self):
return self.file.filehash
Status = collections.namedtuple('Status',
['state', 'downloaded', 'download_rate', 'total_size', 'threads', 'desired_rate'])
@property
def status(self):
tick = time.time()
state = 'starting' if not self.file else 'finished' if self.is_file_complete else 'downloading'
downloaded = self.file.downloaded if self.file else 0
if self._last_downloaded:
prev_time, prev_down = self._last_downloaded[0]
download_rate = (downloaded - prev_down) / (tick - prev_time)
else:
download_rate = 0
total_size = self.file.size if self.file else 0
threads = self._no_threads
desired_rate = self._file.byte_rate if self._file else 0
if self.file:
self._last_downloaded.append((tick, downloaded))
return HTClient.Status(state, downloaded, download_rate, total_size, threads, desired_rate)
def get_normalized_status(self):
s = self.status
return {'source_type': 'http',
'state': s.state,
'downloaded': s.downloaded,
'total_size': s.total_size,
'download_rate': s.download_rate,
'desired_rate': s.desired_rate,
'piece_size': self._file.piece_size if self._file else 0,
'progress': s.downloaded / float(s.total_size) if s.total_size else 0,
# HTTP specific
'threads': s.threads
}
def close(self):
if self._file:
self._file.close()
BaseClient.close(self)
def print_status(self, s, client):
progress = s.downloaded / float(s.total_size) * 100 if s.total_size else 0
total_size = s.total_size / 1048576.0
color = ''
if progress >= 100.0 or not s.desired_rate or s.state in ('finished', 'starting'):
color = TerminalColor.default
elif s.desired_rate > s.download_rate:
color = TerminalColor.red
elif s.download_rate > s.desired_rate and s.download_rate < s.desired_rate * 1.2:
color = TerminalColor.yellow
else:
color = TerminalColor.green
print '\r%.2f%% (of %.1fMB) down: %s%.1f kB/s\033[39m(need %.1f) %s' % \
(progress, total_size,
color, s.download_rate / 1000.0, s.desired_rate / 1000.0 if s.desired_rate else 0.0,
s.state),
sys.stdout.write("\033[K")
sys.stdout.flush()
class HTFile(AbstractFile):
def __init__(self, path, base, size, piece_size=2097152, prioritize_fn=None):
self._full_path = os.path.join(base, path)
self._prioritize_fn = prioritize_fn
self.pieces = None
self.mime = None
size = self._load_cached(size)
AbstractFile.__init__(self, path, base, size, piece_size)
if not self.pieces or len(self.pieces) != self.last_piece + 1:
self.pieces = [False for _i in xrange(self.last_piece + 1)]
self._file = open(self.full_path, 'r+b')
class UnknownSize(Exception):
pass
def _load_cached(self, size):
if not os.path.exists(self.full_path) and size:
self._allocate_file(size)
return size
elif os.path.exists(self.full_path):
sz = os.stat(self.full_path).st_size
if size and size != sz:
logger.warn('Invalid cached file')
self._allocate_file(size)
return size
pcs_file = self.pieces_index_file
if os.access(pcs_file, os.R_OK):
with open(pcs_file, 'rb') as f:
pcs = pickle.load(f)
if isinstance(pcs, tuple):
self.pieces = pcs[1]
self.mime = pcs[0]
else:
logger.warn('Invalid pieces file %s', pcs_file)
return sz
else:
raise HTFile.UnknownSize()
@property
def pieces_index_file(self):
return self.full_path + '.pieces'
def _allocate_file(self, size):
path = os.path.split(self.full_path)[0]
if not os.path.exists(path):
os.makedirs(path, mode=0755)
# sparecelly allocate file
with open(self.full_path, 'ab') as f:
f.truncate(size)
def update_piece(self, n, data):
assert n != self.last_piece and len(data) == self.piece_size or \
n == self.last_piece and len(data) == (
self.size % self.piece_size) or self.piece_size, "Invalid size of piece %d - %d" % (n, len(data))
assert n >= self.first_piece and n <= self.last_piece, 'Invalid piece no %d' % n
with self._lock:
if not self.pieces[n]:
self._file.seek(n * self.piece_size)
self._file.write(data)
self._file.flush()
self.pieces[n] = True
logger.debug('Received piece %d in thread %s', n, threading.current_thread().name)
AbstractFile.update_piece(self, n, data)
def prioritize_piece(self, piece, idx):
assert piece >= self.first_piece and piece <= self.last_piece, 'Invalid piece no %d' % piece
data = None
with self._lock:
if self.pieces[piece]:
sz = self.piece_size if piece < self.last_piece else self.size % self.piece_size
self._file.seek(piece * self.piece_size)
data = self._file.read(sz)
assert len(data) == sz
if data:
AbstractFile.update_piece(self, piece, data)
elif self._prioritize_fn:
self._prioritize_fn(piece, idx)
else:
assert False, 'Missing prioritize fn'
@property
def is_complete(self):
return all(self.pieces)
@property
def downloaded(self):
sum = 0
for i, p in enumerate(self.pieces):
if p and i == self.last_piece:
sum += (self.size % self.piece_size) or self.piece_size
elif p:
sum += self.piece_size
return sum
def remove(self):
AbstractFile.remove(self)
if os.path.exists(self.pieces_index_file):
os.unlink(self.pieces_index_file)
def close(self):
self._file.close()
d = os.path.split(self.pieces_index_file)[0]
if d and os.path.isdir(d):
with open(self.pieces_index_file, 'wb') as f:
pickle.dump((self.mime, self.pieces), f)