script.video.iptv.itv/itv.py

163 lines
5.5 KiB
Python

# coding=utf-8
#
# Copyright (C) 2018 Dmitry Vinogradov
# https://github.com/kodi-iptv-addons
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Library General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Library General Public License for more details.
#
# You should have received a copy of the GNU Library General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
#
import json
from urllib.parse import quote
from iptvlib.api import Api, ApiException
from iptvlib.models import *
class Itv(Api):
PROTECTED_GROUP = "Взрослый"
TEXT_ERROR_WRONG_KEY_ID = 30005 # type: int
key = None # type: str
adult = None # type: bool
sort_channels = None # type: bool
hostname = None # type: str
_player_info = None # type: list
def __init__(self, hostname, key, adult, **kwargs):
super(Itv, self).__init__(**kwargs)
self.hostname = hostname
self.key = self.username = key
self.adult = adult
self.auth_status = self.AUTH_STATUS_NONE
Model.API = self
@property
def base_api_url(self):
return "http://%s/%%s" % self.hostname
@property
def base_icon_url(self):
return "http://%s/icon/%%s" % self.hostname
@property
def host(self):
return self.hostname
@property
def diff_live_archive(self):
return TENSECS
@property
def archive_ttl(self):
return TREEDAYS
def get_cookie(self):
return ""
def is_login_request(self, uri, payload=None, method=None, headers=None):
return uri == "" and payload.get("action") == "playerInfo"
def login(self):
self._player_info = []
response = self.make_request("", {"action": "playerInfo", "ukey": self.key})
if isinstance(response, list) and len(response) and response[0].get("response") == "No Token":
raise ApiException(
addon.getLocalizedString(self.TEXT_ERROR_WRONG_KEY_ID),
Api.E_API_ERROR
)
else:
is_error, error = Api.is_error_response(response)
if is_error:
raise ApiException(error.get("message"), error.get("code"))
self._player_info = response
self.auth_status = self.AUTH_STATUS_OK
return response
def get_groups(self):
if self._player_info is None:
self.login()
number = 1
groups = OrderedDict()
channels = OrderedDict()
for channel_data in self._player_info:
if (channel_data["cat_id"] in groups) is False:
gid = str(channel_data["cat_id"])
groups[gid] = Group(
gid=gid,
name=channel_data["cat_name"],
channels=OrderedDict(),
number=number
)
number += 1
group = groups[channel_data["cat_id"]]
if self.adult is False and group.name == self.PROTECTED_GROUP:
continue
cid = str(channel_data["ch_id"])
channel = Channel(
cid=cid,
gid=group.gid,
name=channel_data["channel_name"],
icon=self.base_icon_url % channel_data["logo"],
epg=True,
archive=bool(channel_data.get("rec", 0)),
protected=group.name == self.PROTECTED_GROUP
)
channel.data.update({"server": channel_data["server_cdn"], "token": channel_data["token"], "port": channel_data["port_cdn"]})
group.channels[cid] = channels[cid] = channel
return groups
def get_stream_url(self, cid, ut_start=None):
channel = self.channels[cid]
url = "http://%s:%s/%s/" % \
(channel.data["server"], channel.data["port"] if channel.data["port"] != "" else "80", cid)
if ut_start is None:
url = "%smono.m3u8?token=%s" % (url, channel.data["token"])
else:
url = "%sindex-%s-%s.m3u8?token=%s" % (url, int(ut_start), int(time_now() - ut_start), channel.data["token"])
return self.resolve_url(url)
def get_epg(self, cid):
# type: (str) -> OrderedDict[int, Program]
programs = self.get_epg_gh(self.channels[cid])
if len(programs):
return programs
obj = quote(json.dumps({"action": "epg", "chid": cid}))
response = self.make_request("epg.php?obj=%s" % obj) # type: dict[str, list[dict]]
is_error, error = Api.is_error_response(response)
if is_error:
raise ApiException(error.get("message"), error.get("code"))
programs = OrderedDict()
prev = None
for v in response["res"]:
program = Program(
cid,
self.channels[cid].gid,
int(v["startTime"]),
int(v["stopTime"]),
v["title"],
v["desc"],
self.channels[cid].archive
)
if prev is not None:
program.prev_program = prev
prev.next_program = program
programs[program.ut_start] = prev = program
return programs