script.module.gorrent2http/lib/pyrrent2http/util.py

96 lines
2.7 KiB
Python
Raw Normal View History

import sys
import socket
2016-03-09 13:29:58 +03:00
import chardet
import os
from . import MediaType
import mimetypes
import urlparse, urllib
SUBTITLES_FORMATS = ['.aqt', '.gsub', '.jss', '.sub', '.ttxt', '.pjs', '.psb', '.rt', '.smi', '.stl',
'.ssf', '.srt', '.ssa', '.ass', '.usf', '.idx']
class Struct(dict):
def __getattr__(self, attr):
return self[attr]
def __setattr__(self, attr, value):
self[attr] = value
def uri2path(uri):
if uri[1] == ':' and sys.platform.startswith('win'):
uri = 'file:///' + uri
fileUri = urlparse.urlparse(uri)
if fileUri.scheme == 'file':
uriPath = fileUri.path
if uriPath != '' and sys.platform.startswith('win') and (os.path.sep == uriPath[0] or uriPath[0] == '/'):
uriPath = uriPath[1:]
absPath = os.path.abspath(urllib.unquote(uriPath))
return localize_path(absPath)
def detect_media_type(name):
ext = os.path.splitext(name)[1]
if ext in SUBTITLES_FORMATS:
return MediaType.SUBTITLES
else:
mime_type = mimetypes.guess_type(name)[0]
if not mime_type:
return MediaType.UNKNOWN
mime_type = mime_type.split("/")[0]
if mime_type == 'audio':
return MediaType.AUDIO
elif mime_type == 'video':
return MediaType.VIDEO
else:
return MediaType.UNKNOWN
2016-03-13 17:28:01 +03:00
def normalize_msg(tmpl, args):
2016-03-12 02:35:45 +03:00
msg = isinstance(tmpl, unicode) and tmpl or tmpl.decode(chardet.detect(tmpl)['encoding'])
arg_ = []
for a in args:
2016-03-14 13:03:04 +03:00
arg_.append(isinstance(a, unicode) and a or a.decode(chardet.detect(a)['encoding']))
return msg % tuple(arg_)
2016-03-09 13:29:58 +03:00
def localize_path(path):
if not isinstance(path, unicode): path = path.decode(chardet.detect(path)['encoding'])
2016-03-09 13:29:58 +03:00
if not sys.platform.startswith('win'):
2016-03-10 18:15:54 +03:00
path = path.encode(True and sys.getfilesystemencoding() or 'utf-8')
2016-03-09 13:29:58 +03:00
return path
def can_bind(host, port):
"""
Checks we can bind to specified host and port
:param host: Host
:param port: Port
:return: True if bind succeed
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.close()
except socket.error:
return False
return True
def find_free_port(host):
"""
Finds free TCP port that can be used for binding
:param host: Host
:return: Free port
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, 0))
port = s.getsockname()[1]
s.close()
except socket.error:
return False
return port
def ensure_fs_encoding(string):
if isinstance(string, str):
string = string.decode('utf-8')
return string.encode(sys.getfilesystemencoding() or 'utf-8')