# coding=utf8 import logging from vk.exceptions import VkAuthError from vk.utils import raw_input, get_url_query, LoggingSession, get_form_action logger = logging.getLogger('vk') class AuthMixin(object): LOGIN_URL = 'https://m.vk.com' # REDIRECT_URI = 'https://oauth.vk.com/blank.html' AUTHORIZE_URL = 'https://oauth.vk.com/authorize' CAPTCHA_URI = 'https://m.vk.com/captcha.php' def __init__(self, app_id=None, user_login='', user_password='', scope='offline', **kwargs): logger.debug('AuthMixin.__init__(app_id=%(app_id)r, user_login=%(user_login)r, user_password=%(user_password)r, **kwargs=%(kwargs)s)', dict(app_id=app_id, user_login=user_login, user_password=user_password, kwargs=kwargs)) super(AuthMixin, self).__init__(**kwargs) self.app_id = app_id self.user_login = user_login self.user_password = user_password self.scope = scope # Some API methods get args (e.g. user id) from access token. # If we define user login, we need get access token now. if self.user_login: self.access_token = self.get_access_token() @property def user_login(self): if not self._user_login: self._user_login = self.get_user_login() return self._user_login @user_login.setter def user_login(self, value): self._user_login = value def get_user_login(self): return self._user_login @property def user_password(self): if not self._user_password: self._user_password = self.get_user_password() return self._user_password @user_password.setter def user_password(self, value): self._user_password = value def get_user_password(self): return self._user_password def get_access_token(self): """ Get access token using app id and user login and password. """ logger.debug('AuthMixin.get_access_token()') auth_session = LoggingSession() with auth_session as self.auth_session: self.auth_session = auth_session self.login() auth_response_url_query = self.oauth2_authorization() if 'access_token' in auth_response_url_query: return auth_response_url_query['access_token'] else: raise VkAuthError('OAuth2 authorization error') def login(self): """ Login """ response = self.auth_session.get(self.LOGIN_URL) login_form_action = get_form_action(response.text) if not login_form_action: raise VkAuthError('VK changed login flow') login_form_data = { 'email': self.user_login, 'pass': self.user_password, } response = self.auth_session.post(login_form_action, login_form_data) logger.debug('Cookies: %s', self.auth_session.cookies) response_url_query = get_url_query(response.url) if 'remixsid' in self.auth_session.cookies or 'remixsid6' in self.auth_session.cookies: return if 'sid' in response_url_query: self.auth_captcha_is_needed(response, login_form_data) elif response_url_query.get('act') == 'authcheck': self.auth_check_is_needed(response.text) elif 'security_check' in response_url_query: self.phone_number_is_needed(response.text) else: message = 'Authorization error (incorrect password)' logger.error(message) raise VkAuthError(message) def oauth2_authorization(self): """ OAuth2 """ auth_data = { 'client_id': self.app_id, 'display': 'mobile', 'response_type': 'token', 'scope': self.scope, 'v': '5.28', } response = self.auth_session.post(self.AUTHORIZE_URL, auth_data) response_url_query = get_url_query(response.url) if 'access_token' in response_url_query: return response_url_query # Permissions is needed logger.info('Getting permissions') # form_action = re.findall(r'