''' Created on May 3, 2015 @author: ivan ''' import urllib2 import os.path import pickle import logging from cookielib import CookieJar from collections import namedtuple import random from httplib import BadStatusLine, IncompleteRead import socket import time import re import Queue import collections from threading import Lock, Thread import sys import threading from urllib import urlencode import zlib from io import BytesIO import gzip import json from collections import deque from common import AbstractFile, BaseClient, Resolver, TerminalColor #Hasher from bs4 import BeautifulSoup logger = logging.getLogger('htclient') Piece = namedtuple('Piece', ['piece', 'data', 'total_size', 'type']) class HTTPLoader(object): UA_STRING = ['Mozilla/5.0 (Windows NT 6.3; rv:36.0) Gecko/20100101 Firefox/36.0', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A'] PARSER = 'lxml' # 'html5lib'#'html.parser'#'lxml' class Error(Exception): pass def __init__(self, url, id, resolver_class=None): self.id = id self.user_agent = self._choose_ua() resolver_class = resolver_class or Resolver self._client = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar())) self.url = self.resolve_file_url(resolver_class, url) if not self.url: raise HTTPLoader.Error('Urlwas not resolved to file link') def resolve_file_url(self, resolver_class, url): r = resolver_class(self) return r.resolve(url) def _choose_ua(self): return HTTPLoader.UA_STRING[random.randint(0, len(HTTPLoader.UA_STRING) - 1)] RANGE_RE = re.compile(r'bytes\s+(\d+)-(\d+)/(\d+)') def _parse_range(self, r): m = HTTPLoader.RANGE_RE.match(r) if not m: raise HTTPLoader.Error('Invalid range header') return int(m.group(1)), int(m.group(2)), int(m.group(3)) def open(self, url, data=None, headers={}, method='get'): hdr = {'User-Agent': self.user_agent} hdr.update(headers) url, post_args = self._encode_data(url, data, method) req = urllib2.Request(url, post_args, headers=headers) res = None retries = 5 while retries: try: res = self._client.open(req, timeout=10) break except (IOError, urllib2.HTTPError, BadStatusLine, IncompleteRead, socket.timeout) as e: if isinstance(e, urllib2.HTTPError) and hasattr(e, 'code') and str(e.code) == '404': raise HTTPLoader.Error('Url %s not found', url) logging.warn('Retry on (%s) due to IO or HTTPError (%s) ', threading.current_thread().name, e) retries -= 1 time.sleep(1) if not res: raise HTTPLoader.Error('Cannot open resource %s' % url) return res def load_piece(self, piece_no, piece_size): start = piece_no * piece_size headers = {'Range': 'bytes=%d-' % start} res = self.open(self.url, headers=headers) allow_range_header = res.info().getheader('Accept-Ranges') if allow_range_header and allow_range_header.lower() == 'none': raise HTTPLoader.Error('Ranges are not supported') size_header = res.info().getheader('Content-Length') total_size = int(size_header) if size_header else None range_header = res.info().getheader('Content-Range') if not range_header: if piece_no and not total_size: raise HTTPLoader.Error('Ranges are not supported') else: from_pos, to_pos, size = 0, total_size - 1, total_size else: from_pos, to_pos, size = self._parse_range(range_header) type_header = res.info().getheader('Content-Type') if not type_header: raise HTTPLoader.Error('Content Type is missing') data = res.read(piece_size) res.close() return Piece(piece_no, data, size, type_header) @staticmethod def decode_data(res): header = res.info() data = res.read() if header.get('Content-Encoding') == 'gzip': tmp_stream = gzip.GzipFile(fileobj=BytesIO(data)) data = tmp_stream.read() elif header.get('Content-Encoding') == 'deflate': data = zlib.decompress(data) return data def _encode_data(self, url, data, method='post'): if not data: return url, None if method.lower() == 'post': return url, urlencode(data) else: return url + '?' + urlencode(data), None def load_page(self, url, data=None, method='get'): res = self.open(url, data, headers={'Accept-Encoding': "gzip, deflate"}, method=method) # Content-Type:"text/html; charset=utf-8" type_header = res.info().getheader('Content-Type') if not type_header.startswith('text/html'): raise HTTPLoader.Error("%s is not HTML page" % url) # Content-Encoding:"gzip" data = HTTPLoader.decode_data(res) pg = BeautifulSoup(data, HTTPLoader.PARSER) return pg def load_json(self, url, data, method='get'): res = self.open(url, data, headers={'Accept-Encoding': "gzip, deflate", 'X-Requested-With': 'XMLHttpRequest'}, method=method) type_header = res.info().getheader('Content-Type') if not type_header.startswith('application/json'): raise HTTPLoader.Error("%s is not JSON" % url) # Content-Encoding:"gzip" data = HTTPLoader.decode_data(res) return json.loads(data, encoding='utf8') def get_redirect(self, url, data, method='get'): pass class PriorityQueue(Queue.PriorityQueue): NORMAL_PRIORITY = 99 NO_DOWNLOAD = 999 NO_PIECE = -1 def __init__(self): Queue.PriorityQueue.__init__(self, maxsize=0) self._wset = {} self._lock = Lock() def put_piece(self, piece, priority, remove_previous=False): # do not put piece which is already in queue with self._lock: if piece in self._wset: if self._wset[piece][0] == priority: return else: self.remove_piece(piece) if remove_previous: for k in self._wset: if k < piece: self.remove_piece(k) entry = [priority, piece] self._wset[piece] = entry self.put(entry, block=False) def remove_piece(self, piece): entry = self._wset.pop(piece) entry[-1] = PriorityQueue.NO_PIECE def get_piece(self, timeout=None): while True: _priority, piece = self.get(timeout=timeout) with self._lock: if piece is not PriorityQueue.NO_PIECE: del self._wset[piece] return piece class Pool(object): def __init__(self, piece_size, loaders, cb, speed_limit=None): self.piece_size = piece_size self._cb = cb self.speed_limit = speed_limit self._queue = PriorityQueue() self._running = True self._threads = [Thread(name="worker %d" % i, target=self.work, args=[l]) for i, l in enumerate(loaders)] for t in self._threads: t.daemon = True t.start() def add_worker_async(self, id, gen_fn, args=[], kwargs={}): def resolve(): l = gen_fn(*args, **kwargs) t = Thread(name="worker %d" % id, target=self.work, args=[l]) t.daemon = True t.start() self._threads.append(t) adder = Thread(name="Adder", target=resolve) adder.daemon = True adder.start() def work(self, loader): while self._running: pc = self._queue.get_piece() if not self._running: break try: start = time.time() p = loader.load_piece(pc, self.piece_size) self._cb(p.piece, p.data) if self.speed_limit: dur = time.time() - start expected = self.piece_size / 1024.0 / self.speed_limit wait_time = expected - dur if wait_time > 0: logger.debug('Waiting %f on %s', wait_time, threading.current_thread().name) time.sleep(wait_time) except Exception, e: logger.exception('(%s) Error when loading piece %d: %s', threading.current_thread().name, pc, e) def stop(self): self._running = False # push some dummy tasks to assure workers ends for i in xrange(len(self._threads)): self._queue.put_piece(i, PriorityQueue.NO_DOWNLOAD) def add_piece(self, piece, priority=PriorityQueue.NORMAL_PRIORITY, remove_previous=False): self._queue.put_piece(piece, priority, remove_previous) class HTClient(BaseClient): def __init__(self, path_to_store, args=None, piece_size=2 * 1024 * 1024, no_threads=2, resolver_class=None): BaseClient.__init__(self, path_to_store, args=args) self._pool = None self.resolver_class = resolver_class self._no_threads = self.resolver_class.THREADS if self.resolver_class and hasattr(self.resolver_class, 'THREADS') else no_threads self.piece_size = piece_size self._last_downloaded = deque(maxlen=60) def update_piece(self, piece, data): self._file.update_piece(piece, data) if not self._ready and hasattr(self._file, 'filehash') and self._file.filehash \ and all(self._file.pieces[:5]): self._set_ready(self._file.is_complete) def request_piece(self, piece, priority): if self._pool: self._pool.add_piece(piece, priority) else: raise Exception('Pool not started') def _on_file_ready(self, filehash): self._file.filehash = filehash if self.is_file_complete: self._set_ready(True) def _set_ready(self, complete): self._ready = True if self._on_ready_action: self._on_ready_action(self._file, complete) def start_url(self, uri): self._monitor.start() path = self.resolver_class.url_to_file(uri) c0 = None try: self._file = HTFile(path, self._base_path, 0, self.piece_size, self.request_piece) except HTFile.UnknownSize: c0 = HTTPLoader(uri, 0, self.resolver_class) p = c0.load_piece(0, self.piece_size) self._file = HTFile(path, self._base_path, p.total_size, self.piece_size, self.request_piece) self.update_piece(0, p.data) self._file.mime = p.type if not self._file.is_complete: c0 = c0 or HTTPLoader(uri, 0, self.resolver_class) self._pool = Pool(self.piece_size, [c0], self.update_piece, speed_limit=self.resolver_class.SPEED_LIMIT if hasattr(self.resolver_class, 'SPEED_LIMIT') else None) def gen_loader(i): return HTTPLoader(uri, i, self.resolver_class) for i in xrange(1, self._no_threads): self._pool.add_worker_async(i, gen_loader, (i,)) # get remaining pieces with normal priority for i in xrange(1, self._file.last_piece + 1): if not self._file.pieces[i]: self._pool.add_piece(i) #self.hash = Hasher(self._file, self._on_file_ready) @property def is_file_complete(self): return self.file.is_complete if self.file else False @property def unique_file_id(self): return self.file.filehash Status = collections.namedtuple('Status', ['state', 'downloaded', 'download_rate', 'total_size', 'threads', 'desired_rate']) @property def status(self): tick = time.time() state = 'starting' if not self.file else 'finished' if self.is_file_complete else 'downloading' downloaded = self.file.downloaded if self.file else 0 if self._last_downloaded: prev_time, prev_down = self._last_downloaded[0] download_rate = (downloaded - prev_down) / (tick - prev_time) else: download_rate = 0 total_size = self.file.size if self.file else 0 threads = self._no_threads desired_rate = self._file.byte_rate if self._file else 0 if self.file: self._last_downloaded.append((tick, downloaded)) return HTClient.Status(state, downloaded, download_rate, total_size, threads, desired_rate) def get_normalized_status(self): s = self.status return {'source_type': 'http', 'state': s.state, 'downloaded': s.downloaded, 'total_size': s.total_size, 'download_rate': s.download_rate, 'desired_rate': s.desired_rate, 'piece_size': self._file.piece_size if self._file else 0, 'progress': s.downloaded / float(s.total_size) if s.total_size else 0, # HTTP specific 'threads': s.threads } def close(self): if self._file: self._file.close() BaseClient.close(self) def print_status(self, s, client): progress = s.downloaded / float(s.total_size) * 100 if s.total_size else 0 total_size = s.total_size / 1048576.0 color = '' if progress >= 100.0 or not s.desired_rate or s.state in ('finished', 'starting'): color = TerminalColor.default elif s.desired_rate > s.download_rate: color = TerminalColor.red elif s.download_rate > s.desired_rate and s.download_rate < s.desired_rate * 1.2: color = TerminalColor.yellow else: color = TerminalColor.green print '\r%.2f%% (of %.1fMB) down: %s%.1f kB/s\033[39m(need %.1f) %s' % \ (progress, total_size, color, s.download_rate / 1000.0, s.desired_rate / 1000.0 if s.desired_rate else 0.0, s.state), sys.stdout.write("\033[K") sys.stdout.flush() class HTFile(AbstractFile): def __init__(self, path, base, size, piece_size=2097152, prioritize_fn=None): self._full_path = os.path.join(base, path) self._prioritize_fn = prioritize_fn self.pieces = None self.mime = None size = self._load_cached(size) AbstractFile.__init__(self, path, base, size, piece_size) if not self.pieces or len(self.pieces) != self.last_piece + 1: self.pieces = [False for _i in xrange(self.last_piece + 1)] self._file = open(self.full_path, 'r+b') class UnknownSize(Exception): pass def _load_cached(self, size): if not os.path.exists(self.full_path) and size: self._allocate_file(size) return size elif os.path.exists(self.full_path): sz = os.stat(self.full_path).st_size if size and size != sz: logger.warn('Invalid cached file') self._allocate_file(size) return size pcs_file = self.pieces_index_file if os.access(pcs_file, os.R_OK): with open(pcs_file, 'rb') as f: pcs = pickle.load(f) if isinstance(pcs, tuple): self.pieces = pcs[1] self.mime = pcs[0] else: logger.warn('Invalid pieces file %s', pcs_file) return sz else: raise HTFile.UnknownSize() @property def pieces_index_file(self): return self.full_path + '.pieces' def _allocate_file(self, size): path = os.path.split(self.full_path)[0] if not os.path.exists(path): os.makedirs(path, mode=0755) # sparecelly allocate file with open(self.full_path, 'ab') as f: f.truncate(size) def update_piece(self, n, data): assert n != self.last_piece and len(data) == self.piece_size or \ n == self.last_piece and len(data) == ( self.size % self.piece_size) or self.piece_size, "Invalid size of piece %d - %d" % (n, len(data)) assert n >= self.first_piece and n <= self.last_piece, 'Invalid piece no %d' % n with self._lock: if not self.pieces[n]: self._file.seek(n * self.piece_size) self._file.write(data) self._file.flush() self.pieces[n] = True logger.debug('Received piece %d in thread %s', n, threading.current_thread().name) AbstractFile.update_piece(self, n, data) def prioritize_piece(self, piece, idx): assert piece >= self.first_piece and piece <= self.last_piece, 'Invalid piece no %d' % piece data = None with self._lock: if self.pieces[piece]: sz = self.piece_size if piece < self.last_piece else self.size % self.piece_size self._file.seek(piece * self.piece_size) data = self._file.read(sz) assert len(data) == sz if data: AbstractFile.update_piece(self, piece, data) elif self._prioritize_fn: self._prioritize_fn(piece, idx) else: assert False, 'Missing prioritize fn' @property def is_complete(self): return all(self.pieces) @property def downloaded(self): sum = 0 for i, p in enumerate(self.pieces): if p and i == self.last_piece: sum += (self.size % self.piece_size) or self.piece_size elif p: sum += self.piece_size return sum def remove(self): AbstractFile.remove(self) if os.path.exists(self.pieces_index_file): os.unlink(self.pieces_index_file) def close(self): self._file.close() d = os.path.split(self.pieces_index_file)[0] if d and os.path.isdir(d): with open(self.pieces_index_file, 'wb') as f: pickle.dump((self.mime, self.pieces), f)