Организовал дополнение

master 1.0.0
inpos 2017-01-30 23:13:52 +03:00
parent 445268562c
commit 6496fc3e1b
15 changed files with 657 additions and 0 deletions

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="script.module.vk"
name="vk"
version="1.0.0"
provider-name="inpos">
<requires>
<import addon="xbmc.python" version="1.0.0"/>
</requires>
<extension point="xbmc.python.module"
library="lib" />
<extension point="xbmc.addon.metadata">
<platform>all</platform>
<language></language>
<summary lang="en">vk.com API Python wrapper</summary>
<description lang="en">vk.com API Python wrapper</description>
<disclaimer lang="en">Code taken from https://pypi.python.org/pypi/vk/</disclaimer>
<license>MIT License</license>
<website>https://pypi.python.org/pypi/vk/</website>
<source>https://pypi.python.org/pypi/vk/</source>
</extension>
</addon>

View File

@ -0,0 +1,9 @@
from vk.api import logger
from vk.api import Session, AuthSession, InteractiveSession, InteractiveAuthSession
from vk.api import VERSION
from vk.api import API
__version__ = version = VERSION
# API = OAuthAPI

Binary file not shown.

View File

@ -0,0 +1,185 @@
# coding=utf8
import logging
import logging.config
from vk.logs import LOGGING_CONFIG
from vk.utils import stringify_values, json_iter_parse, LoggingSession, str_type
from vk.exceptions import VkAuthError, VkAPIError
from vk.mixins import AuthMixin, InteractiveMixin
VERSION = '2.0.2'
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger('vk')
class Session(object):
API_URL = 'https://api.vk.com/method/'
def __init__(self, access_token=None):
logger.debug('API.__init__(access_token=%(access_token)r)', {'access_token': access_token})
self.access_token = access_token
self.access_token_is_needed = False
self.requests_session = LoggingSession()
self.requests_session.headers['Accept'] = 'application/json'
self.requests_session.headers['Content-Type'] = 'application/x-www-form-urlencoded'
@property
def access_token(self):
logger.debug('Check that we need new access token')
if self.access_token_is_needed:
logger.debug('We need new access token. Try to get it.')
self.access_token = self.get_access_token()
else:
logger.debug('Use old access token')
return self._access_token
@access_token.setter
def access_token(self, value):
self._access_token = value
if isinstance(value, str_type) and len(value) >= 12:
self.censored_access_token = '{}***{}'.format(value[:4], value[-4:])
else:
self.censored_access_token = value
logger.debug('access_token = %r', self.censored_access_token)
self.access_token_is_needed = not self._access_token
def get_user_login(self):
logger.debug('Do nothing to get user login')
def get_access_token(self):
"""
Dummy method
"""
logger.debug('API.get_access_token()')
return self._access_token
def make_request(self, method_request, captcha_response=None):
logger.debug('Prepare API Method request')
response = self.send_api_request(method_request, captcha_response=captcha_response)
# todo Replace with something less exceptional
response.raise_for_status()
# there are may be 2 dicts in one JSON
# for example: "{'error': ...}{'response': ...}"
for response_or_error in json_iter_parse(response.text):
if 'response' in response_or_error:
# todo Can we have error and response simultaneously
# for error in errors:
# logger.warning(str(error))
return response_or_error['response']
elif 'error' in response_or_error:
error_data = response_or_error['error']
error = VkAPIError(error_data)
if error.is_captcha_needed():
captcha_key = self.get_captcha_key(error.captcha_img)
if not captcha_key:
raise error
captcha_response = {
'sid': error.captcha_sid,
'key': captcha_key,
}
return self.make_request(method_request, captcha_response=captcha_response)
elif error.is_access_token_incorrect():
logger.info('Authorization failed. Access token will be dropped')
self.access_token = None
return self.make_request(method_request)
else:
raise error
def send_api_request(self, request, captcha_response=None):
url = self.API_URL + request._method_name
method_args = request._api._method_default_args.copy()
method_args.update(stringify_values(request._method_args))
access_token = self.access_token
if access_token:
method_args['access_token'] = access_token
if captcha_response:
method_args['captcha_sid'] = captcha_response['sid']
method_args['captcha_key'] = captcha_response['key']
timeout = request._api._timeout
response = self.requests_session.post(url, method_args, timeout=timeout)
return response
def get_captcha_key(self, captcha_image_url):
"""
Default behavior on CAPTCHA is to raise exception
Reload this in child
"""
return None
def auth_code_is_needed(self, content, session):
"""
Default behavior on 2-AUTH CODE is to raise exception
Reload this in child
"""
raise VkAuthError('Authorization error (2-factor code is needed)')
def auth_captcha_is_needed(self, content, session):
"""
Default behavior on CAPTCHA is to raise exception
Reload this in child
"""
raise VkAuthError('Authorization error (captcha)')
def phone_number_is_needed(self, content, session):
"""
Default behavior on PHONE NUMBER is to raise exception
Reload this in child
"""
logger.error('Authorization error (phone number is needed)')
raise VkAuthError('Authorization error (phone number is needed)')
class API(object):
def __init__(self, session, timeout=10, **method_default_args):
self._session = session
self._timeout = timeout
self._method_default_args = method_default_args
def __getattr__(self, method_name):
return Request(self, method_name)
def __call__(self, method_name, **method_kwargs):
return getattr(self, method_name)(**method_kwargs)
class Request(object):
__slots__ = ('_api', '_method_name', '_method_args')
def __init__(self, api, method_name):
self._api = api
self._method_name = method_name
def __getattr__(self, method_name):
return Request(self._api, self._method_name + '.' + method_name)
def __call__(self, **method_args):
self._method_args = method_args
return self._api._session.make_request(self)
class AuthSession(AuthMixin, Session):
pass
class InteractiveSession(InteractiveMixin, Session):
pass
class InteractiveAuthSession(InteractiveMixin, AuthSession):
pass

Binary file not shown.

View File

@ -0,0 +1,57 @@
# API Error Codes
AUTHORIZATION_FAILED = 5 # Invalid access token
PERMISSION_IS_DENIED = 7
CAPTCHA_IS_NEEDED = 14
ACCESS_DENIED = 15 # No access to call this method
# User deactivated
INVALID_USER_ID = 113
class VkException(Exception):
pass
class VkAuthError(VkException):
pass
class VkAPIError(VkException):
__slots__ = ['error', 'code', 'message', 'request_params', 'redirect_uri']
CAPTCHA_NEEDED = 14
ACCESS_DENIED = 15
def __init__(self, error_data):
super(VkAPIError, self).__init__()
self.error_data = error_data
self.code = error_data.get('error_code')
self.message = error_data.get('error_msg')
self.request_params = self.get_pretty_request_params(error_data)
self.redirect_uri = error_data.get('redirect_uri')
@staticmethod
def get_pretty_request_params(error_data):
request_params = error_data.get('request_params', ())
request_params = {param['key']: param['value'] for param in request_params}
return request_params
def is_access_token_incorrect(self):
return self.code == self.ACCESS_DENIED and 'access_token' in self.message
def is_captcha_needed(self):
return self.code == self.CAPTCHA_NEEDED
@property
def captcha_sid(self):
return self.error_data.get('captcha_sid')
@property
def captcha_img(self):
return self.error_data.get('captcha_img')
def __str__(self):
error_message = '{self.code}. {self.message}. request_params = {self.request_params}'.format(self=self)
if self.redirect_uri:
error_message += ',\nredirect_uri = "{self.redirect_uri}"'.format(self=self)
return error_message

Binary file not shown.

View File

@ -0,0 +1,27 @@
import sys
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'loggers': {
'vk': {
'level': 'INFO',
'handlers': ['vk-stdout'],
'propagate': False,
},
},
'handlers': {
'vk-stdout': {
'class': 'logging.StreamHandler',
'stream': sys.stdout,
'formatter': 'vk-verbose',
},
},
'formatters': {
'vk-verbose': {
'format': '%(asctime)s %(name) -5s %(module)s:%(lineno)d %(levelname)s: %(message)s',
},
},
}

Binary file not shown.

View File

@ -0,0 +1,218 @@
# coding=utf8
import logging
from vk.exceptions import VkAuthError
from vk.utils import raw_input, get_url_query, LoggingSession, get_form_action
logger = logging.getLogger('vk')
class AuthMixin(object):
LOGIN_URL = 'https://m.vk.com'
# REDIRECT_URI = 'https://oauth.vk.com/blank.html'
AUTHORIZE_URL = 'https://oauth.vk.com/authorize'
CAPTCHA_URI = 'https://m.vk.com/captcha.php'
def __init__(self, app_id=None, user_login='', user_password='', scope='offline', **kwargs):
logger.debug('AuthMixin.__init__(app_id=%(app_id)r, user_login=%(user_login)r, user_password=%(user_password)r, **kwargs=%(kwargs)s)',
dict(app_id=app_id, user_login=user_login, user_password=user_password, kwargs=kwargs))
super(AuthMixin, self).__init__(**kwargs)
self.app_id = app_id
self.user_login = user_login
self.user_password = user_password
self.scope = scope
# Some API methods get args (e.g. user id) from access token.
# If we define user login, we need get access token now.
if self.user_login:
self.access_token = self.get_access_token()
@property
def user_login(self):
if not self._user_login:
self._user_login = self.get_user_login()
return self._user_login
@user_login.setter
def user_login(self, value):
self._user_login = value
def get_user_login(self):
return self._user_login
@property
def user_password(self):
if not self._user_password:
self._user_password = self.get_user_password()
return self._user_password
@user_password.setter
def user_password(self, value):
self._user_password = value
def get_user_password(self):
return self._user_password
def get_access_token(self):
"""
Get access token using app id and user login and password.
"""
logger.debug('AuthMixin.get_access_token()')
auth_session = LoggingSession()
with auth_session as self.auth_session:
self.auth_session = auth_session
self.login()
auth_response_url_query = self.oauth2_authorization()
if 'access_token' in auth_response_url_query:
return auth_response_url_query['access_token']
else:
raise VkAuthError('OAuth2 authorization error')
def login(self):
"""
Login
"""
response = self.auth_session.get(self.LOGIN_URL)
login_form_action = get_form_action(response.text)
if not login_form_action:
raise VkAuthError('VK changed login flow')
login_form_data = {
'email': self.user_login,
'pass': self.user_password,
}
response = self.auth_session.post(login_form_action, login_form_data)
logger.debug('Cookies: %s', self.auth_session.cookies)
response_url_query = get_url_query(response.url)
if 'remixsid' in self.auth_session.cookies or 'remixsid6' in self.auth_session.cookies:
return
if 'sid' in response_url_query:
self.auth_captcha_is_needed(response, login_form_data)
elif response_url_query.get('act') == 'authcheck':
self.auth_check_is_needed(response.text)
elif 'security_check' in response_url_query:
self.phone_number_is_needed(response.text)
else:
message = 'Authorization error (incorrect password)'
logger.error(message)
raise VkAuthError(message)
def oauth2_authorization(self):
"""
OAuth2
"""
auth_data = {
'client_id': self.app_id,
'display': 'mobile',
'response_type': 'token',
'scope': self.scope,
'v': '5.28',
}
response = self.auth_session.post(self.AUTHORIZE_URL, auth_data)
response_url_query = get_url_query(response.url)
if 'access_token' in response_url_query:
return response_url_query
# Permissions is needed
logger.info('Getting permissions')
# form_action = re.findall(r'<form method="post" action="(.+?)">', auth_response.text)[0]
form_action = get_form_action(response.text)
logger.debug('Response form action: %s', form_action)
if form_action:
response = self.auth_session.get(form_action)
response_url_query = get_url_query(response.url)
return response_url_query
try:
response_json = response.json()
except ValueError: # not JSON in response
error_message = 'OAuth2 grant access error'
else:
error_message = 'VK error: [{}] {}'.format(response_json['error'], response_json['error_description'])
logger.error('Permissions obtained')
raise VkAuthError(error_message)
def auth_check_is_needed(self, html):
logger.info('User enabled 2 factors authorization. Auth check code is needed')
auth_check_form_action = get_form_action(html)
auth_check_code = self.get_auth_check_code()
auth_check_data = {
'code': auth_check_code,
'_ajax': '1',
'remember': '1'
}
response = self.auth_session.post(auth_check_form_action, data=auth_check_data)
def auth_captcha_is_needed(self, response, login_form_data):
logger.info('Captcha is needed')
response_url_dict = get_url_query(response.url)
# form_url = re.findall(r'<form method="post" action="(.+)" novalidate>', response.text)
captcha_form_action = get_form_action(response.text)
logger.debug('form_url %s', captcha_form_action)
if not captcha_form_action:
raise VkAuthError('Cannot find form url')
# todo Are we sure that `response_url_dict` doesn't contain CAPTCHA image url?
captcha_url = '%s?s=%s&sid=%s' % (self.CAPTCHA_URI, response_url_dict['s'], response_url_dict['sid'])
# logger.debug('Captcha url %s', captcha_url)
login_form_data['captcha_sid'] = response_url_dict['sid']
login_form_data['captcha_key'] = self.get_captcha_key(captcha_url)
response = self.auth_session.post(captcha_form_action, login_form_data)
# logger.debug('Cookies %s', self.auth_session.cookies)
# if 'remixsid' not in self.auth_session.cookies and 'remixsid6' not in self.auth_session.cookies:
# raise VkAuthError('Authorization error (Bad password or captcha key)')
def phone_number_is_needed(self, text):
raise VkAuthError('Phone number is needed')
def get_auth_check_code(self):
raise VkAuthError('Auth check code is needed')
class InteractiveMixin(object):
def get_user_login(self):
user_login = raw_input('VK user login: ')
return user_login.strip()
def get_user_password(self):
import getpass
user_password = getpass.getpass('VK user password: ')
return user_password
def get_access_token(self):
logger.debug('InteractiveMixin.get_access_token()')
access_token = super(InteractiveMixin, self).get_access_token()
if not access_token:
access_token = raw_input('VK API access token: ')
return access_token
def get_captcha_key(self, captcha_image_url):
"""
Read CAPTCHA key from shell
"""
print('Open CAPTCHA image url: ', captcha_image_url)
captcha_key = raw_input('Enter CAPTCHA key: ')
return captcha_key
def get_auth_check_code(self):
"""
Read Auth code from shell
"""
auth_check_code = raw_input('Auth check code: ')
return auth_check_code.strip()

Binary file not shown.

View File

@ -0,0 +1,60 @@
# coding=utf8
import os
import sys
import time
import unittest
import vk
import utils
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
# copy to test_props.py and fill it
USER_LOGIN = '' # user email or phone number
USER_PASSWORD = '' # user password
APP_ID = '' # aka API/Client ID
from test_props import USER_LOGIN, USER_PASSWORD, APP_ID
class UtilsTestCase(unittest.TestCase):
def test_stringify(self):
self.assertEqual({1: 'str,str2'}, utils.stringify_values({1: ['str', 'str2']}))
def test_stringify_2(self):
self.assertEqual({1: u'str,стр2'}, utils.stringify_values({1: ['str', u'стр2']}))
def test_stringify_3(self):
self.assertEqual({1: u'стр,стр2'}, utils.stringify_values({1: [u'стр', u'стр2']}))
class VkTestCase(unittest.TestCase):
def setUp(self):
auth_session = vk.AuthSession(app_id=APP_ID, user_login=USER_LOGIN, user_password=USER_PASSWORD)
access_token, _ = auth_session.get_access_token()
session = vk.Session(access_token=access_token)
self.vk_api = vk.API(session, lang='ru')
def test_get_server_time(self):
time_1 = time.time() - 1
time_2 = time_1 + 10
server_time = self.vk_api.getServerTime()
self.assertTrue(time_1 <= server_time <= time_2)
def test_get_server_time_via_token_api(self):
time_1 = time.time() - 1
time_2 = time_1 + 10
server_time = self.vk_api.getServerTime()
self.assertTrue(time_1 <= server_time <= time_2)
def test_get_profiles_via_token(self):
profiles = self.vk_api.users.get(user_id=1)
self.assertEqual(profiles[0]['last_name'], u'Дуров')
if __name__ == '__main__':
unittest.main()

Binary file not shown.

View File

@ -0,0 +1,80 @@
import re
import logging
from collections import Iterable
import requests
logger = logging.getLogger('vk')
try:
# Python 2
str_type = unicode
except NameError:
# Python 3
str_type = str
STRING_TYPES = (str_type, bytes, bytearray)
try:
# Python 2
from urllib import urlencode
from urlparse import urlparse, parse_qsl
except ImportError:
# Python 3
from urllib.parse import urlparse, parse_qsl, urlencode
try:
import simplejson as json
except ImportError:
import json
try:
# Python 2
raw_input = raw_input
except NameError:
# Python 3
raw_input = input
def json_iter_parse(response_text):
decoder = json.JSONDecoder(strict=False)
idx = 0
while idx < len(response_text):
obj, idx = decoder.raw_decode(response_text, idx)
yield obj
def stringify_values(dictionary):
stringified_values_dict = {}
for key, value in dictionary.items():
if isinstance(value, Iterable) and not isinstance(value, STRING_TYPES):
value = u','.join(map(str_type, value))
stringified_values_dict[key] = value
return stringified_values_dict
def get_url_query(url):
parsed_url = urlparse(url)
url_query = parse_qsl(parsed_url.fragment)
# login_response_url_query can have multiple key
url_query = dict(url_query)
return url_query
def get_form_action(html):
form_action = re.findall(r'<form(?= ).* action="(.+)"', html)
if form_action:
return form_action[0]
class LoggingSession(requests.Session):
def request(self, method, url, **kwargs):
logger.debug('Request: %s %s, params=%r, data=%r', method, url, kwargs.get('params'), kwargs.get('data'))
response = super(LoggingSession, self).request(method, url, **kwargs)
logger.debug('Response: %s %s', response.status_code, response.url)
return response

Binary file not shown.