Initial commit

sandbox1
Anton Argirov 2015-01-11 11:47:17 +06:00
commit 74f247223f
9 changed files with 1144 additions and 0 deletions

3
.gitignore vendored 100644
View File

@ -0,0 +1,3 @@
.idea/
*.py[cod]
bin/

16
addon.xml 100644
View File

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="script.module.torrent2http" name="torrent2http" version="0.0.4" provider-name="anteo">
<requires>
<import addon="xbmc.python" version="2.14.0"/>
</requires>
<extension point="xbmc.python.module" library="lib"/>
<extension point="xbmc.addon.metadata">
<platform>all</platform>
<language>en</language>
<summary lang="en">Downloads torrents and share it over HTTP</summary>
<summary lang="ru">Загружает торренты и раздает их по HTTP</summary>
<description lang="ru">Обеспечивает последовательную (sequential) загрузку торрентов для потокового онлайн просмотра через HTTP. Основан на библиотеке LibTorrent.</description>
<description lang="en">Provides sequential torrent downloading for online streaming video and other media over HTTP.</description>
<email>anteo@academ.org</email>
</extension>
</addon>

8
changelog.txt 100644
View File

@ -0,0 +1,8 @@
[B]Version 0.0.3[/B]
+ Detect media types
[B]Version 0.0.2[/B]
+ Added some configurable libtorrent options
[B]Version 0.0.1[/B]
+ Initial release

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
from collections import namedtuple
# noinspection PyClassHasNoInit
class State:
QUEUED_FOR_CHECKING = 0
CHECKING_FILES = 1
DOWNLOADING_METADATA = 2
DOWNLOADING = 3
FINISHED = 4
SEEDING = 5
ALLOCATING = 6
CHECKING_RESUME_DATA = 7
# noinspection PyClassHasNoInit
class MediaType:
UNKNOWN = None
AUDIO = 'audio'
VIDEO = 'video'
SUBTITLES = 'subtitles'
# noinspection PyClassHasNoInit
class Encryption:
FORCED = 0
ENABLED = 1
DISABLED = 2
SessionStatus = namedtuple('SessionStatus', "name, state, state_str, error, progress, download_rate, upload_rate, "
"total_download, total_upload, num_peers, num_seeds, total_seeds, "
"total_peers")
FileStatus = namedtuple('FileStatus', "name, save_path, url, size, offset, download, progress, index, media_type")
PeerInfo = namedtuple('PeerInfo', "ip, flags, source, up_speed, down_speed, total_upload, total_download, "
"country, client")
from engine import Engine
from platform import Platform
from error import Error

View File

@ -0,0 +1,368 @@
# -*- coding: utf-8 -*-
import json
import os
import socket
import stat
import subprocess
import sys
import time
import urllib2
import logpipe
import mimetypes
import xbmc
from error import Error
from platform import Platform
from . import SessionStatus, FileStatus, PeerInfo, MediaType, Encryption
from os.path import dirname
class Engine:
SUBTITLES_FORMATS = ['.aqt', '.gsub', '.jss', '.sub', '.ttxt', '.pjs', '.psb', '.rt', '.smi', '.stl',
'.ssf', '.srt', '.ssa', '.ass', '.usf', '.idx']
def _ensure_binary_executable(self, path):
st = os.stat(path)
if not st.st_mode & stat.S_IEXEC:
self._log("%s is not executable, trying to change its mode..." % path)
os.chmod(path, st.st_mode | stat.S_IEXEC)
st = os.stat(path)
if st.st_mode & stat.S_IEXEC:
self._log("Succeeded")
return True
else:
self._log("Failed")
return False
return True
def _log(self, message):
if self.logger:
self.logger(message)
else:
xbmc.log("[torrent2http] %s" % message)
def _get_binary_path(self, binaries_path):
binary = "torrent2http" + (".exe" if self.platform.system == 'windows' else "")
binary_dir = os.path.join(binaries_path, "%s_%s" % (self.platform.system, self.platform.arch))
binary_path = os.path.join(binary_dir, binary)
if not os.path.isfile(binary_path):
raise Error("Can't find torrent2http binary for %s" % self.platform,
Error.UNKNOWN_PLATFORM, platform=str(self.platform))
if not self._ensure_binary_executable(binary_path):
if self.platform.system == "android":
self._log("Trying to copy torrent2http to ext4, since the sdcard is noexec...")
xbmc_home = os.environ.get('XBMC_HOME') or os.environ.get('KODI_HOME')
if not xbmc_home:
raise Error("Suppose we are running XBMC, but environment variable "
"XBMC_HOME or KODI_HOME is not found", Error.XBMC_HOME_NOT_DEFINED)
base_xbmc_path = dirname(dirname(dirname(xbmc_home)))
android_binary_dir = os.path.join(base_xbmc_path, "files")
if not os.path.exists(android_binary_dir):
os.makedirs(android_binary_dir)
android_binary_path = os.path.join(android_binary_dir, binary)
if not os.path.exists(android_binary_path) or \
int(os.path.getmtime(android_binary_path)) < int(os.path.getmtime(binary_path)):
import shutil
shutil.copy2(binary_path, android_binary_path)
if not self._ensure_binary_executable(android_binary_path):
raise Error("Can't make %s executable" % android_binary_path, Error.NOEXEC_FILESYSTEM)
binary_path = android_binary_path
else:
raise Error("Can't make %s executable, ensure it's placed on exec partition and "
"partition is in read/write mode" % binary_path, Error.NOEXEC_FILESYSTEM)
self._log("Selected %s as torrent2http binary" % binary_path)
return binary_path
@staticmethod
def _can_bind(host, port):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.close()
except socket.error:
return False
return True
@staticmethod
def _find_free_port(host):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, 0))
port = s.getsockname()[1]
s.close()
except socket.error:
return False
return port
def __init__(self, uri=None, binaries_path=None, platform=None, download_path=".",
bind_host='127.0.0.1', bind_port=5001, connections_limit=None, download_kbps=None, upload_kbps=None,
enable_dht=True, enable_lsd=True, enable_natpmp=True, enable_upnp=True, enable_scrape=False,
log_stats=False, encryption=Encryption.ENABLED, keep_complete=False, keep_incomplete=False,
keep_files=False, log_files_progress=False, log_overall_progress=False, log_pieces_progress=False,
listen_port=6881, use_random_port=False, max_idle_timeout=None, no_sparse=False, resume_file=None,
user_agent=None, startup_timeout=5, state_file=None, enable_utp=True, enable_tcp=True,
debug_alerts=False, logger=None, torrent_connect_boost=50, connection_speed=50,
peer_connect_timeout=15, request_timeout=20, min_reconnect_time=60, max_failcount=3,
dht_routers=None, trackers=None):
self.dht_routers = dht_routers or []
self.trackers = trackers or []
self.max_failcount = max_failcount
self.min_reconnect_time = min_reconnect_time
self.request_timeout = request_timeout
self.peer_connect_timeout = peer_connect_timeout
self.connection_speed = connection_speed
self.torrent_connect_boost = torrent_connect_boost
self.platform = platform
self.bind_host = bind_host
self.bind_port = bind_port
self.binaries_path = binaries_path or os.path.join(dirname(dirname(dirname(os.path.abspath(__file__)))), 'bin')
self.download_path = download_path
self.connections_limit = connections_limit
self.download_kbps = download_kbps
self.upload_kbps = upload_kbps
self.enable_dht = enable_dht
self.enable_lsd = enable_lsd
self.enable_natpmp = enable_natpmp
self.enable_upnp = enable_upnp
self.enable_scrape = enable_scrape
self.log_stats = log_stats
self.encryption = encryption
self.keep_complete = keep_complete
self.keep_incomplete = keep_incomplete
self.keep_files = keep_files
self.log_files_progress = log_files_progress
self.log_overall_progress = log_overall_progress
self.log_pieces_progress = log_pieces_progress
self.listen_port = listen_port
self.use_random_port = use_random_port
self.max_idle_timeout = max_idle_timeout
self.no_sparse = no_sparse
self.resume_file = resume_file
self.user_agent = user_agent
self.startup_timeout = startup_timeout
self.state_file = state_file
self.wait_on_close_timeout = None
self.enable_utp = enable_utp
self.enable_tcp = enable_tcp
self.debug_alerts = debug_alerts
self.logger = logger
self.uri = uri
self.logpipe = None
self.process = None
@staticmethod
def _validate_save_path(path):
if "://" in path:
if sys.platform.startswith('win') and path.lower().startswith("smb://"):
path = path.replace("smb:", "").replace("/", "\\")
else:
raise Error("Downloading to an unmounted network share is not supported", Error.INVALID_DOWNLOAD_PATH)
if not os.path.isdir(path):
raise Error("Download path doesn't exist (%s)" % path, Error.INVALID_DOWNLOAD_PATH)
return path
def start(self, start_index=None):
self.platform = self.platform or Platform()
binary_path = self._get_binary_path(self.binaries_path)
download_path = self._validate_save_path(self.download_path)
if not self._can_bind(self.bind_host, self.bind_port):
port = self._find_free_port(self.bind_host)
if port is False:
raise Error("Can't find port to bind torrent2http", Error.BIND_ERROR)
self._log("Can't bind to %s:%s, so we found another port: %d" % (self.bind_host, self.bind_port, port))
self.bind_port = port
kwargs = {
'--bind': "%s:%s" % (self.bind_host, self.bind_port),
'--uri': self.uri,
'--file-index': start_index,
'--dl-path': download_path,
'--connections-limit': self.connections_limit,
'--dl-rate': self.download_kbps,
'--ul-rate': self.upload_kbps,
'--enable-dht': self.enable_dht,
'--enable-lsd': self.enable_lsd,
'--enable-natpmp': self.enable_natpmp,
'--enable-upnp': self.enable_upnp,
'--enable-scrape': self.enable_scrape,
'--encryption': self.encryption,
'--show-stats': self.log_stats,
'--files-progress': self.log_files_progress,
'--overall-progress': self.log_overall_progress,
'--pieces-progress': self.log_pieces_progress,
'--listen-port': self.listen_port,
'--random-port': self.use_random_port,
'--keep-complete': self.keep_complete,
'--keep-incomplete': self.keep_incomplete,
'--keep-files': self.keep_files,
'--max-idle': self.max_idle_timeout,
'--no-sparse': self.no_sparse,
'--resume-file': self.resume_file,
'--user-agent': self.user_agent,
'--state-file': self.state_file,
'--enable-utp': self.enable_utp,
'--enable-tcp': self.enable_tcp,
'--debug-alerts': self.debug_alerts,
'--torrent-connect-boost': self.torrent_connect_boost,
'--connection-speed': self.connection_speed,
'--peer-connect-timeout': self.peer_connect_timeout,
'--request-timeout': self.request_timeout,
'--min-reconnect-time': self.min_reconnect_time,
'--max-failcount': self.max_failcount,
'--dht-routers': ",".join(self.dht_routers),
'--trackers': ",".join(self.trackers),
}
args = [binary_path]
for k, v in kwargs.iteritems():
if v is not None:
if isinstance(v, bool):
if v:
args.append(k)
else:
args.append("%s=false" % k)
else:
args.append(k)
if isinstance(v, str):
v = v.decode('utf-8')
if isinstance(v, unicode):
v = v.encode(sys.getfilesystemencoding() or 'utf-8')
else:
v = str(v)
args.append(v)
self._log("Invoking %s" % " ".join(args))
startupinfo = None
if self.platform.system == "windows":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= 1
startupinfo.wShowWindow = 0
self.logpipe = logpipe.LogPipe(self._log)
try:
self.process = subprocess.Popen(args, stderr=self.logpipe, stdout=self.logpipe, startupinfo=startupinfo)
except OSError, e:
raise Error("Can't start torrent2http: %r" % e, Error.POPEN_ERROR)
start = time.time()
initialized = False
while (time.time() - start) < self.startup_timeout:
time.sleep(0.1)
if not self.is_alive():
raise Error("Can't start torrent2http, see log for details", Error.PROCESS_ERROR)
try:
self.status(1)
initialized = True
break
except Error:
pass
if not initialized:
raise Error("Can't start torrent2http, time is out", Error.TIMEOUT)
self._log("torrent2http successfully started.")
def check_torrent_error(self, status=None):
if not status:
status = self.status()
if status.error:
raise Error("Torrent error: %s" % status.error, Error.TORRENT_ERROR, reason=status.error)
def status(self, timeout=10):
status = self._decode(self._request('status', timeout))
status = SessionStatus(**status)
return status
def _detect_media_type(self, name):
ext = os.path.splitext(name)[1]
if ext in self.SUBTITLES_FORMATS:
return MediaType.SUBTITLES
else:
mime_type = mimetypes.guess_type(name)[0]
if not mime_type:
return MediaType.UNKNOWN
mime_type = mime_type.split("/")[0]
if mime_type == 'audio':
return MediaType.AUDIO
elif mime_type == 'video':
return MediaType.VIDEO
else:
return MediaType.UNKNOWN
def list(self, media_types=None, timeout=10):
files = self._decode(self._request('ls', timeout))['files']
if files:
res = [FileStatus(index=index, media_type=self._detect_media_type(f['name']), **f)
for index, f in enumerate(files)]
if media_types is not None:
res = filter(lambda fs: fs.media_type in media_types, res)
return res
def file_status(self, file_index, timeout=10):
res = self.list(timeout=timeout)
if res:
try:
return next((f for f in res if f.index == file_index))
except StopIteration:
raise Error("Requested file index (%d) is invalid" % file_index, Error.INVALID_FILE_INDEX,
file_index=file_index)
def peers(self, timeout=10):
peers = self._decode(self._request('peers', timeout))['peers']
if peers:
return [PeerInfo(**p) for p in peers]
def is_alive(self):
return self.process and self.process.poll() is None
@staticmethod
def _decode(response):
try:
return json.loads(response)
except (KeyError, ValueError), e:
raise Error("Can't decode response from torrent2http: %r" % e, Error.REQUEST_ERROR)
def _request(self, cmd, timeout=None):
if not self.is_alive():
raise Error("torrent2http is not started", Error.REQUEST_ERROR)
try:
url = "http://%s:%s/%s" % (self.bind_host, self.bind_port, cmd)
kwargs = {}
if timeout is not None:
kwargs['timeout'] = timeout
return urllib2.urlopen(url, **kwargs).read()
except urllib2.URLError as e:
if isinstance(e.reason, socket.timeout):
raise Error("Timeout occurred while sending command '%s' to torrent2http" % cmd, Error.TIMEOUT)
else:
raise Error("Can't send command '%s' to torrent2http: %r" % (cmd, e), Error.REQUEST_ERROR)
except socket.error as e:
reason = e[1] if isinstance(e, tuple) else e
raise Error("Can't read from torrent2http: %s" % reason, Error.REQUEST_ERROR)
def wait_on_close(self, wait_timeout=10):
self.wait_on_close_timeout = wait_timeout
def close(self):
if self.logpipe and self.wait_on_close_timeout is None:
self.logpipe.close()
if self.is_alive():
self._log("Shutting down torrent2http...")
self._request('shutdown')
finished = False
if self.wait_on_close_timeout is not None:
start = time.time()
os.close(self.logpipe.write_fd)
while (time.time() - start) < self.wait_on_close_timeout:
time.sleep(0.5)
if not self.is_alive():
finished = True
break
if not finished:
self._log("Timeout occurred while shutting down torrent2http, killing it")
self.process.kill()
else:
self._log("torrent2http successfully shut down.")
self.wait_on_close_timeout = None
self.logpipe = None
self.process = None

View File

@ -0,0 +1,21 @@
class Error(Exception):
TORRENT_ERROR = 12
UNKNOWN_PLATFORM = 1
XBMC_HOME_NOT_DEFINED = 2
NOEXEC_FILESYSTEM = 3
REQUEST_ERROR = 5
INVALID_DOWNLOAD_PATH = 6
BIND_ERROR = 7
POPEN_ERROR = 8
PROCESS_ERROR = 9
TIMEOUT = 10
INVALID_FILE_INDEX = 11
def __init__(self, message, code=0, **kwargs):
self.message = message
self.code = code
self.kwargs = kwargs
def __str__(self):
return self.message

View File

@ -0,0 +1,33 @@
import os
import threading
import re
class LogPipe(threading.Thread):
def __init__(self, logger):
threading.Thread.__init__(self)
self.daemon = False
self.logger = logger
self.read_fd, self.write_fd = os.pipe()
self.stop = threading.Event()
self.start()
def fileno(self):
return self.write_fd
def run(self):
self.logger("Logging thread started.")
with os.fdopen(self.read_fd) as f:
for line in iter(f.readline, ""):
line = re.sub(r'^\d+/\d+/\d+ \d+:\d+:\d+ ', '', line)
self.logger(line.strip())
if self.stop.is_set():
break
self.logger("Logging thread finished.")
def close(self):
self.stop.set()
f = os.fdopen(self.write_fd, "w")
f.write("Stopping logging thread...\n")
f.close()

View File

@ -0,0 +1,613 @@
"""Guess the MIME type of a file.
This module defines two useful functions:
guess_type(url, strict=1) -- guess the MIME type and encoding of a URL.
guess_extension(type, strict=1) -- guess the extension for a given MIME type.
It also contains the following, for tuning the behavior:
Data:
knownfiles -- list of files to parse
inited -- flag set when init() has been called
suffix_map -- dictionary mapping suffixes to suffixes
encodings_map -- dictionary mapping suffixes to encodings
types_map -- dictionary mapping suffixes to types
Functions:
init([files]) -- parse a list of files, default knownfiles (on Windows, the
default values are taken from the registry)
read_mime_types(file) -- parse one file, return a dictionary or None
"""
import os
import sys
import posixpath
import urllib
try:
import _winreg
except ImportError:
_winreg = None
__all__ = [
"guess_type","guess_extension","guess_all_extensions",
"add_type","read_mime_types","init"
]
knownfiles = [
"/etc/mime.types",
"/etc/httpd/mime.types", # Mac OS X
"/etc/httpd/conf/mime.types", # Apache
"/etc/apache/mime.types", # Apache 1
"/etc/apache2/mime.types", # Apache 2
"/usr/local/etc/httpd/conf/mime.types",
"/usr/local/lib/netscape/mime.types",
"/usr/local/etc/httpd/conf/mime.types", # Apache 1.2
"/usr/local/etc/mime.types", # Apache 1.3
]
inited = False
_db = None
class MimeTypes:
"""MIME-types datastore.
This datastore can handle information from mime.types-style files
and supports basic determination of MIME type from a filename or
URL, and can guess a reasonable extension given a MIME type.
"""
def __init__(self, filenames=(), strict=True):
if not inited:
init()
self.encodings_map = encodings_map.copy()
self.suffix_map = suffix_map.copy()
self.types_map = ({}, {}) # dict for (non-strict, strict)
self.types_map_inv = ({}, {})
for (ext, type) in types_map.items():
self.add_type(type, ext, True)
for (ext, type) in common_types.items():
self.add_type(type, ext, False)
for name in filenames:
self.read(name, strict)
def add_type(self, type, ext, strict=True):
"""Add a mapping between a type and an extension.
When the extension is already known, the new
type will replace the old one. When the type
is already known the extension will be added
to the list of known extensions.
If strict is true, information will be added to
list of standard types, else to the list of non-standard
types.
"""
self.types_map[strict][ext] = type
exts = self.types_map_inv[strict].setdefault(type, [])
if ext not in exts:
exts.append(ext)
def guess_type(self, url, strict=True):
"""Guess the type of a file based on its URL.
Return value is a tuple (type, encoding) where type is None if
the type can't be guessed (no or unknown suffix) or a string
of the form type/subtype, usable for a MIME Content-type
header; and encoding is None for no encoding or the name of
the program used to encode (e.g. compress or gzip). The
mappings are table driven. Encoding suffixes are case
sensitive; type suffixes are first tried case sensitive, then
case insensitive.
The suffixes .tgz, .taz and .tz (case sensitive!) are all
mapped to '.tar.gz'. (This is table-driven too, using the
dictionary suffix_map.)
Optional `strict' argument when False adds a bunch of commonly found,
but non-standard types.
"""
scheme, url = urllib.splittype(url)
if scheme == 'data':
# syntax of data URLs:
# dataurl := "data:" [ mediatype ] [ ";base64" ] "," data
# mediatype := [ type "/" subtype ] *( ";" parameter )
# data := *urlchar
# parameter := attribute "=" value
# type/subtype defaults to "text/plain"
comma = url.find(',')
if comma < 0:
# bad data URL
return None, None
semi = url.find(';', 0, comma)
if semi >= 0:
type = url[:semi]
else:
type = url[:comma]
if '=' in type or '/' not in type:
type = 'text/plain'
return type, None # never compressed, so encoding is None
base, ext = posixpath.splitext(url)
while ext in self.suffix_map:
base, ext = posixpath.splitext(base + self.suffix_map[ext])
if ext in self.encodings_map:
encoding = self.encodings_map[ext]
base, ext = posixpath.splitext(base)
else:
encoding = None
types_map = self.types_map[True]
if ext in types_map:
return types_map[ext], encoding
elif ext.lower() in types_map:
return types_map[ext.lower()], encoding
elif strict:
return None, encoding
types_map = self.types_map[False]
if ext in types_map:
return types_map[ext], encoding
elif ext.lower() in types_map:
return types_map[ext.lower()], encoding
else:
return None, encoding
def guess_all_extensions(self, type, strict=True):
"""Guess the extensions for a file based on its MIME type.
Return value is a list of strings giving the possible filename
extensions, including the leading dot ('.'). The extension is not
guaranteed to have been associated with any particular data stream,
but would be mapped to the MIME type `type' by guess_type().
Optional `strict' argument when false adds a bunch of commonly found,
but non-standard types.
"""
type = type.lower()
extensions = self.types_map_inv[True].get(type, [])
if not strict:
for ext in self.types_map_inv[False].get(type, []):
if ext not in extensions:
extensions.append(ext)
return extensions
def guess_extension(self, type, strict=True):
"""Guess the extension for a file based on its MIME type.
Return value is a string giving a filename extension,
including the leading dot ('.'). The extension is not
guaranteed to have been associated with any particular data
stream, but would be mapped to the MIME type `type' by
guess_type(). If no extension can be guessed for `type', None
is returned.
Optional `strict' argument when false adds a bunch of commonly found,
but non-standard types.
"""
extensions = self.guess_all_extensions(type, strict)
if not extensions:
return None
return extensions[0]
def read(self, filename, strict=True):
"""
Read a single mime.types-format file, specified by pathname.
If strict is true, information will be added to
list of standard types, else to the list of non-standard
types.
"""
with open(filename) as fp:
self.readfp(fp, strict)
def readfp(self, fp, strict=True):
"""
Read a single mime.types-format file.
If strict is true, information will be added to
list of standard types, else to the list of non-standard
types.
"""
while 1:
line = fp.readline()
if not line:
break
words = line.split()
for i in range(len(words)):
if words[i][0] == '#':
del words[i:]
break
if not words:
continue
type, suffixes = words[0], words[1:]
for suff in suffixes:
self.add_type(type, '.' + suff, strict)
def read_windows_registry(self, strict=True):
"""
Load the MIME types database from Windows registry.
If strict is true, information will be added to
list of standard types, else to the list of non-standard
types.
"""
# Windows only
if not _winreg:
return
def enum_types(mimedb):
i = 0
while True:
try:
ctype = _winreg.EnumKey(mimedb, i)
except EnvironmentError:
break
try:
ctype = ctype.encode(default_encoding) # omit in 3.x!
except UnicodeEncodeError:
pass
else:
yield ctype
i += 1
default_encoding = sys.getdefaultencoding()
with _winreg.OpenKey(_winreg.HKEY_CLASSES_ROOT,
r'MIME\Database\Content Type') as mimedb:
for ctype in enum_types(mimedb):
try:
with _winreg.OpenKey(mimedb, ctype) as key:
suffix, datatype = _winreg.QueryValueEx(key,
'Extension')
except EnvironmentError:
continue
if datatype != _winreg.REG_SZ:
continue
try:
suffix = suffix.encode(default_encoding) # omit in 3.x!
except UnicodeEncodeError:
continue
self.add_type(ctype, suffix, strict)
def guess_type(url, strict=True):
"""Guess the type of a file based on its URL.
Return value is a tuple (type, encoding) where type is None if the
type can't be guessed (no or unknown suffix) or a string of the
form type/subtype, usable for a MIME Content-type header; and
encoding is None for no encoding or the name of the program used
to encode (e.g. compress or gzip). The mappings are table
driven. Encoding suffixes are case sensitive; type suffixes are
first tried case sensitive, then case insensitive.
The suffixes .tgz, .taz and .tz (case sensitive!) are all mapped
to ".tar.gz". (This is table-driven too, using the dictionary
suffix_map).
Optional `strict' argument when false adds a bunch of commonly found, but
non-standard types.
"""
if _db is None:
init()
return _db.guess_type(url, strict)
def guess_all_extensions(type, strict=True):
"""Guess the extensions for a file based on its MIME type.
Return value is a list of strings giving the possible filename
extensions, including the leading dot ('.'). The extension is not
guaranteed to have been associated with any particular data
stream, but would be mapped to the MIME type `type' by
guess_type(). If no extension can be guessed for `type', None
is returned.
Optional `strict' argument when false adds a bunch of commonly found,
but non-standard types.
"""
if _db is None:
init()
return _db.guess_all_extensions(type, strict)
def guess_extension(type, strict=True):
"""Guess the extension for a file based on its MIME type.
Return value is a string giving a filename extension, including the
leading dot ('.'). The extension is not guaranteed to have been
associated with any particular data stream, but would be mapped to the
MIME type `type' by guess_type(). If no extension can be guessed for
`type', None is returned.
Optional `strict' argument when false adds a bunch of commonly found,
but non-standard types.
"""
if _db is None:
init()
return _db.guess_extension(type, strict)
def add_type(type, ext, strict=True):
"""Add a mapping between a type and an extension.
When the extension is already known, the new
type will replace the old one. When the type
is already known the extension will be added
to the list of known extensions.
If strict is true, information will be added to
list of standard types, else to the list of non-standard
types.
"""
if _db is None:
init()
return _db.add_type(type, ext, strict)
def init(files=None):
global suffix_map, types_map, encodings_map, common_types
global inited, _db
inited = True # so that MimeTypes.__init__() doesn't call us again
db = MimeTypes()
if files is None:
if _winreg:
db.read_windows_registry()
files = knownfiles
for file in files:
if os.path.isfile(file):
db.read(file)
encodings_map = db.encodings_map
suffix_map = db.suffix_map
types_map = db.types_map[True]
common_types = db.types_map[False]
# Make the DB a global variable now that it is fully initialized
_db = db
def read_mime_types(file):
try:
f = open(file)
except IOError:
return None
db = MimeTypes()
db.readfp(f, True)
return db.types_map[True]
def _default_mime_types():
global suffix_map
global encodings_map
global types_map
global common_types
suffix_map = {
'.tgz': '.tar.gz',
'.taz': '.tar.gz',
'.tz': '.tar.gz',
'.tbz2': '.tar.bz2',
}
encodings_map = {
'.gz': 'gzip',
'.Z': 'compress',
'.bz2': 'bzip2',
}
# Before adding new types, make sure they are either registered with IANA,
# at http://www.isi.edu/in-notes/iana/assignments/media-types
# or extensions, i.e. using the x- prefix
# If you add to these, please keep them sorted!
types_map = {
'.3gp' : 'video/3gp',
'.a' : 'application/octet-stream',
'.ai' : 'application/postscript',
'.aif' : 'audio/x-aiff',
'.aifc' : 'audio/x-aiff',
'.aiff' : 'audio/x-aiff',
'.asf' : 'video/x-ms-asf',
'.asx' : 'video/x-ms-asf',
'.au' : 'audio/basic',
'.avi' : 'video/x-msvideo',
'.axv' : 'video/annodex',
'.bat' : 'text/plain',
'.bcpio' : 'application/x-bcpio',
'.bin' : 'application/octet-stream',
'.bmp' : 'image/x-ms-bmp',
'.c' : 'text/plain',
# Duplicates :(
'.cdf' : 'application/x-cdf',
'.cdf' : 'application/x-netcdf',
'.cpio' : 'application/x-cpio',
'.csh' : 'application/x-csh',
'.css' : 'text/css',
'.dif' : 'video/dv',
'.dl' : 'video/dl',
'.dll' : 'application/octet-stream',
'.dv' : 'video/dv',
'.doc' : 'application/msword',
'.dot' : 'application/msword',
'.dvi' : 'application/x-dvi',
'.eml' : 'message/rfc822',
'.eps' : 'application/postscript',
'.etx' : 'text/x-setext',
'.exe' : 'application/octet-stream',
'.fli' : 'video/fli',
'.flv' : 'video/x-flv',
'.gif' : 'image/gif',
'.gl' : 'video/gl',
'.gtar' : 'application/x-gtar',
'.h' : 'text/plain',
'.hdf' : 'application/x-hdf',
'.htm' : 'text/html',
'.html' : 'text/html',
'.ief' : 'image/ief',
'.jpe' : 'image/jpeg',
'.jpeg' : 'image/jpeg',
'.jpg' : 'image/jpeg',
'.js' : 'application/x-javascript',
'.ksh' : 'text/plain',
'.latex' : 'application/x-latex',
'.lsf' : 'video/x-la-lsf',
'.lsx' : 'video/x-la-lsf',
'.m1v' : 'video/mpeg',
'.man' : 'application/x-troff-man',
'.me' : 'application/x-troff-me',
'.mht' : 'message/rfc822',
'.mhtml' : 'message/rfc822',
'.mif' : 'application/x-mif',
'.mng' : 'video/x-mng',
'.movie' : 'video/x-sgi-movie',
'.mp2' : 'audio/mpeg',
'.mp3' : 'audio/mpeg',
'.mp4' : 'video/mp4',
'.mpa' : 'video/mpeg',
'.mpe' : 'video/mpeg',
'.mpeg' : 'video/mpeg',
'.mpg' : 'video/mpeg',
'.mpv' : 'video/matroska',
'.mkv' : 'video/matroska',
'.mov' : 'video/quicktime',
'.ms' : 'application/x-troff-ms',
'.nc' : 'application/x-netcdf',
'.nws' : 'message/rfc822',
'.o' : 'application/octet-stream',
'.obj' : 'application/octet-stream',
'.oda' : 'application/oda',
'.ogv' : 'video/ogg',
'.p12' : 'application/x-pkcs12',
'.p7c' : 'application/pkcs7-mime',
'.pbm' : 'image/x-portable-bitmap',
'.pdf' : 'application/pdf',
'.pfx' : 'application/x-pkcs12',
'.pgm' : 'image/x-portable-graymap',
'.pl' : 'text/plain',
'.png' : 'image/png',
'.pnm' : 'image/x-portable-anymap',
'.pot' : 'application/vnd.ms-powerpoint',
'.ppa' : 'application/vnd.ms-powerpoint',
'.ppm' : 'image/x-portable-pixmap',
'.pps' : 'application/vnd.ms-powerpoint',
'.ppt' : 'application/vnd.ms-powerpoint',
'.ps' : 'application/postscript',
'.pwz' : 'application/vnd.ms-powerpoint',
'.py' : 'text/x-python',
'.pyc' : 'application/x-python-code',
'.pyo' : 'application/x-python-code',
'.qt' : 'video/quicktime',
'.ra' : 'audio/x-pn-realaudio',
'.ram' : 'application/x-pn-realaudio',
'.ras' : 'image/x-cmu-raster',
'.rdf' : 'application/xml',
'.rgb' : 'image/x-rgb',
'.roff' : 'application/x-troff',
'.rtx' : 'text/richtext',
'.sgm' : 'text/x-sgml',
'.sgml' : 'text/x-sgml',
'.sh' : 'application/x-sh',
'.shar' : 'application/x-shar',
'.snd' : 'audio/basic',
'.so' : 'application/octet-stream',
'.src' : 'application/x-wais-source',
'.sv4cpio': 'application/x-sv4cpio',
'.sv4crc' : 'application/x-sv4crc',
'.swf' : 'application/x-shockwave-flash',
'.t' : 'application/x-troff',
'.tar' : 'application/x-tar',
'.tcl' : 'application/x-tcl',
'.tex' : 'application/x-tex',
'.texi' : 'application/x-texinfo',
'.texinfo': 'application/x-texinfo',
'.tif' : 'image/tiff',
'.tiff' : 'image/tiff',
'.tr' : 'application/x-troff',
'.ts' : 'video/MP2T',
'.tsv' : 'text/tab-separated-values',
'.txt' : 'text/plain',
'.ustar' : 'application/x-ustar',
'.vcf' : 'text/x-vcard',
'.wav' : 'audio/x-wav',
'.webm' : 'video/webm',
'.wiz' : 'application/msword',
'.wm' : 'video/x-ms-wm',
'.wmv' : 'video/x-ms-wmv',
'.wmx' : 'video/x-ms-wmx',
'.wvx' : 'video/x-ms-wvx',
'.wsdl' : 'application/xml',
'.xbm' : 'image/x-xbitmap',
'.xlb' : 'application/vnd.ms-excel',
# Duplicates :(
'.xls' : 'application/excel',
'.xls' : 'application/vnd.ms-excel',
'.xml' : 'text/xml',
'.xpdl' : 'application/xml',
'.xpm' : 'image/x-xpixmap',
'.xsl' : 'application/xml',
'.xwd' : 'image/x-xwindowdump',
'.zip' : 'application/zip',
}
# These are non-standard types, commonly found in the wild. They will
# only match if strict=0 flag is given to the API methods.
# Please sort these too
common_types = {
'.jpg' : 'image/jpg',
'.mid' : 'audio/midi',
'.midi': 'audio/midi',
'.pct' : 'image/pict',
'.pic' : 'image/pict',
'.pict': 'image/pict',
'.rtf' : 'application/rtf',
'.xul' : 'text/xul'
}
_default_mime_types()
if __name__ == '__main__':
import getopt
USAGE = """\
Usage: mimetypes.py [options] type
Options:
--help / -h -- print this message and exit
--lenient / -l -- additionally search of some common, but non-standard
types.
--extension / -e -- guess extension instead of type
More than one type argument may be given.
"""
def usage(code, msg=''):
print USAGE
if msg: print msg
sys.exit(code)
try:
opts, args = getopt.getopt(sys.argv[1:], 'hle',
['help', 'lenient', 'extension'])
except getopt.error, msg:
usage(1, msg)
strict = 1
extension = 0
for opt, arg in opts:
if opt in ('-h', '--help'):
usage(0)
elif opt in ('-l', '--lenient'):
strict = 0
elif opt in ('-e', '--extension'):
extension = 1
for gtype in args:
if extension:
guess = guess_extension(gtype, strict)
if not guess: print "I don't know anything about type", gtype
else: print guess
else:
guess, encoding = guess_type(gtype, strict)
if not guess: print "I don't know anything about type", gtype
else: print 'type:', guess, 'encoding:', encoding

View File

@ -0,0 +1,38 @@
from __future__ import absolute_import
from platform import uname
from .error import Error
import sys
import os
class Platform:
def __init__(self):
self.arch = self.arch()
self.system = self.system()
def __str__(self):
return "%s/%s" % (self.system, self.arch)
@staticmethod
def arch():
if uname()[4].startswith('arm'):
return 'arm'
elif sys.maxsize > 2**32:
return 'x64'
else:
return 'x86'
@staticmethod
def system():
if sys.platform.startswith('linux'):
if 'ANDROID_DATA' in os.environ:
return 'android'
else:
return 'linux'
elif sys.platform.startswith('win'):
return 'windows'
elif sys.platform.startswith('darwin'):
return 'darwin'
else:
raise Error("Platform %s is unknown" % sys.platform, Error.UNKNOWN_PLATFORM,
platform=sys.platform)