3

我目前正在尝试编写一个 Python 脚本,该脚本将使用 Deviantart 的 API 自动随机播放我的收藏夹。为此,我需要先登录我的脚本。Deviantart 使用 OAuth2 身份验证,这需要一个 redirect_uri,据我了解,它应该是我的应用程序运行的服务器。

但是,我在我的计算机上本地运行脚本(而不是在服务器上),并且只是通过 Python 的Requests库发送 http 请求。然后,当 OAuth 过程将身份验证令牌所需的代码作为GET调用 redirect_uri 的参数发送时,我该如何进行身份验证,这对我来说特别无意义?不运行服务器就无法进行身份验证吗?

编辑

我的问题仍然是我正在运行一个简单的离线脚本,我不确定如何从中进行身份验证。

到目前为止,这是我的身份验证代码:

import binascii, os, requests

def auth():
    request = 'https://www.deviantart.com/oauth2/authorize'
    state = binascii.hexlify(os.urandom(160))
    params = {
        'response_type': 'token',
        'client_id': 1,
        'redirect_uri': 'https://localhost:8080',
        'state': state,
        'scope': 'user'
    }
    r = requests.get(request, params)
    print(r)

打印的响应只是一个 200 HTTP 代码,而不是访问令牌(显然,因为用户名和密码尚未在任何地方输入)。该请求被发送到 DA 的授权页面,但由于该页面本身实际上并未在我的脚本中打开,因此我无法在任何地方输入我的用户名和密码登录。而且我也不能直接在GET请求中发送用户名和密码以进行身份​​验证(同样显然,因为这样发送密码是一个糟糕的主意)。

最好我想要一种方法,让用户(我)在脚本正在运行的控制台中提示输入用户名和密码,然后在用户成功登录后让脚本继续执行。

或者,如果上述方法不可行,则脚本应在浏览器中打开授权网页,然后在用户登录后继续执行。

我将如何在 Python 中实现这两种解决方案中的任何一种?

4

3 回答 3

2

如果您的应用程序处于脱机状态,则不能使用授权代码或隐式授权类型:这两个流程都需要重定向 URI。

由于无法从 Internet 访问您的 python 脚本,并且 Deviantart 不允许使用其他授权类型(客户端凭据除外,但与您的情况无关),因此您将无法发布任何访问令牌。

您的应用程序必须可以从 Internet 访问。

于 2016-10-05T04:50:25.800 回答
1

您应该使用收到的代码获取授权令牌。该令牌将用于之后访问 DeviantArt。

请参阅https://www.deviantart.com/developers/authentication(“使用授权码授予”部分)。

于 2016-10-04T17:29:50.537 回答
0

根据请求,我将使用我最终用于脚本身份验证的代码来更新这个问题,希望它对某人有所帮助。

import webbrowser
import requests
import urllib.parse
import binascii
import os
import time
from http.server import HTTPServer, BaseHTTPRequestHandler

AUTH = 'https://www.deviantart.com/oauth2/authorize'
TOKEN = 'https://www.deviantart.com/oauth2/token'

code = ''
state = binascii.hexlify(os.urandom(20)).decode('utf-8')


class Communicator:
    def __init__(self):
        self.client_id = '<insert-actual-id>'  # You get these two from the DA developer API page
        self.client_secret = '<insert-actual-secret>'  # but it's safer if you store them in a separate file
        self.server, self.port = 'localhost', 8080
        self._redirect_uri = f'http://{self.server}:{self.port}'
        self._last_request_time = 0

    def auth(self, *args):
        scope = ' '.join(args)

        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': self._redirect_uri,
            'scope': scope,
            'state': state
        }
        request = requests.Request('GET', AUTH, params).prepare()
        request.prepare_url(AUTH, params)
        webbrowser.open(request.url)
        server = HTTPServer((self.server, self.port), RequestHandler)
        server.handle_request()

        params = {
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'grant_type': 'authorization_code',
            'code': code,
            'redirect_uri': self._redirect_uri
        }
        self._get_token(params)

    def _get_token(self, params):
        r = requests.get(TOKEN, params).json()
        self.token = r['access_token']
        self.refresh_token = r['refresh_token']

    def _refresh_token(self):
        params = {
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'grant_type': 'refresh_token',
            'refresh_token': self.refresh_token
        }
        self._get_token(params)

    def _request(self, func, url, params, sleep=5, cooldown=600):
        t = time.time()
        if t - self._last_request_time < sleep:
            time.sleep(sleep - t + self._last_request_time)
        self._last_request_time = t

        max_sleep = 16 * sleep

        params['access_token'] = self.token
        while True:
            try:
                r = func(url, params).json()
                if 'error_code' in r and r['error_code'] == 429:
                    sleep *= 2
                    time.sleep(sleep)
                    if sleep > max_sleep:
                        raise ConnectionError("Request timed out - server is busy.")
                elif 'error' in r and r['error'] == 'user_api_threshold':
                    raise ConnectionError("Too many requests")
                elif 'error' in r and r['error'] == 'invalid_token':
                    print("Refreshing token.")
                    self._refresh_token()
                    params['access_token'] = self.token
                else:
                    return r
            except ConnectionError:
                print(f"Request limit reached - waiting {cooldown // 60} minutes before retrying...")
                time.sleep(cooldown)

    def get(self, url, params):
        return self._request(requests.get, url, params)

    def post(self, url, params):
        return self._request(requests.post, url, params)


class RequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        global code
        self.close_connection = True
        query = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
        if not query['state'] or query['state'][0] != state:
            raise RuntimeError("state argument missing or invalid")
        code = query['code']


BROWSE = 'browse'
BROWSE_MORE_LIKE_THIS = 'browse.mlt'
COLLECTION = 'collection'
COMMENT = 'comment.post'
FEED = 'feed'
GALLERY = 'gallery'
MESSAGE = 'message'
NOTE = 'note'
PUBLISH = 'publish'
STASH = 'stash'
USER = 'user'
USER_MANAGE = 'user.manage'

if __name__ == '__main__':
    com = Communicator()
    com.auth(BROWSE, COLLECTION)  # request specific permissions
    ...  # do stuff with com.get() and com.post() requests
于 2022-02-19T14:10:48.250 回答