根据请求,我将使用我最终用于脚本身份验证的代码来更新这个问题,希望它对某人有所帮助。
import webbrowser
import requests
import urllib.parse
import binascii
import os
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
AUTH = 'https://www.deviantart.com/oauth2/authorize'
TOKEN = 'https://www.deviantart.com/oauth2/token'
code = ''
state = binascii.hexlify(os.urandom(20)).decode('utf-8')
class Communicator:
def __init__(self):
self.client_id = '<insert-actual-id>' # You get these two from the DA developer API page
self.client_secret = '<insert-actual-secret>' # but it's safer if you store them in a separate file
self.server, self.port = 'localhost', 8080
self._redirect_uri = f'http://{self.server}:{self.port}'
self._last_request_time = 0
def auth(self, *args):
scope = ' '.join(args)
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self._redirect_uri,
'scope': scope,
'state': state
}
request = requests.Request('GET', AUTH, params).prepare()
request.prepare_url(AUTH, params)
webbrowser.open(request.url)
server = HTTPServer((self.server, self.port), RequestHandler)
server.handle_request()
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self._redirect_uri
}
self._get_token(params)
def _get_token(self, params):
r = requests.get(TOKEN, params).json()
self.token = r['access_token']
self.refresh_token = r['refresh_token']
def _refresh_token(self):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token
}
self._get_token(params)
def _request(self, func, url, params, sleep=5, cooldown=600):
t = time.time()
if t - self._last_request_time < sleep:
time.sleep(sleep - t + self._last_request_time)
self._last_request_time = t
max_sleep = 16 * sleep
params['access_token'] = self.token
while True:
try:
r = func(url, params).json()
if 'error_code' in r and r['error_code'] == 429:
sleep *= 2
time.sleep(sleep)
if sleep > max_sleep:
raise ConnectionError("Request timed out - server is busy.")
elif 'error' in r and r['error'] == 'user_api_threshold':
raise ConnectionError("Too many requests")
elif 'error' in r and r['error'] == 'invalid_token':
print("Refreshing token.")
self._refresh_token()
params['access_token'] = self.token
else:
return r
except ConnectionError:
print(f"Request limit reached - waiting {cooldown // 60} minutes before retrying...")
time.sleep(cooldown)
def get(self, url, params):
return self._request(requests.get, url, params)
def post(self, url, params):
return self._request(requests.post, url, params)
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
global code
self.close_connection = True
query = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
if not query['state'] or query['state'][0] != state:
raise RuntimeError("state argument missing or invalid")
code = query['code']
BROWSE = 'browse'
BROWSE_MORE_LIKE_THIS = 'browse.mlt'
COLLECTION = 'collection'
COMMENT = 'comment.post'
FEED = 'feed'
GALLERY = 'gallery'
MESSAGE = 'message'
NOTE = 'note'
PUBLISH = 'publish'
STASH = 'stash'
USER = 'user'
USER_MANAGE = 'user.manage'
if __name__ == '__main__':
com = Communicator()
com.auth(BROWSE, COLLECTION) # request specific permissions
... # do stuff with com.get() and com.post() requests