https://github.com/Ofineo/coffee-shop
我尝试在 Auth.py 中使用 jose.JWT.decode 解码 JWT,但我总是得到一个错误:
jose.exceptions.JWKError: Could not deserialize key data
。经过大量调查,我将其归结为 RS256 算法。这正是它崩溃的地方:
payload = jwt.decode(
token,
rsa_key,
algorithms=ALGORITHMS,
audience=API_AUDIENCE,
issuer='https://' + AUTH0_DOMAIN + '/',
)
这是一个有效的令牌示例:
它在http://www.JWT.io
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6Ik9USXpNakl6UkVZMFJEQXdNRGN3UTBFNVF6TTBNekE0TURNMVF6bERRVGRFUVRNeE1VUXpNdyJ9.eyJpc3MiOiJodHRwczovL29maW5lby5ldS5hdXRoMC5jb20vIiwic3ViIjoiYXV0aDB8NWU3NGQ0OTAzODYwMWQwYmU5MWI4NDM3IiwiYXVkIjoiY29mZmVlIiwiaWF0IjoxNTg0NzE0OTY3LCJleHAiOjE1ODQ3MjIxNjcsImF6cCI6ImRWdlZ3N200MlpRUkdBWmptZkE0ekkxYWQ0dXZ0S0Q4Iiwic2NvcGUiOiIiLCJwZXJtaXNzaW9ucyI6WyJkZWxldGU6ZHJpbmtzIiwiZ2V0OmRyaW5rcy1kZXRhaWwiLCJwYXRjaDpkcmlua3MiLCJwb3N0OmRyaW5rcyJdfQ.xviLIr5euhsWlgQZiJZV9JOL0hA3Fz_jYwfhjPj-a8Zf7YySp1RMmzTVw8X7xKpkYZZlaaR47kYL42I_y172UX7_ABkQ8nYqJXqIa_g9ZFYhnAqk4PN0aIAjO8F6HLPaSh6c6DdAdMBeODS0p8JPPSzjWd5AKzjqzff5a7FEcnXZqgB-Ac01OECAdViodeKJk7dBnuKfD2UlmmEHM-2xQC2ZM0zO17qTJ-zia0lHy3Z6MK9-nbf4wxlFidTIWD9WyvqJxCg40YUMkn2YrILfdIGvxFCRyBisQp7Cu-UsOEa0irgNf5zUXmYLhp1DgV-fFxTfRB0nX6O5Sf29tfOMNQ
中解码没有问题
这是我的代码。我从 @app.route 调用 requires_auth 函数
import json
from flask import request, _request_ctx_stack, abort
from functools import wraps
from jose import jwt
from urllib.request import urlopen
import sys
AUTH0_DOMAIN = 'ofineo.eu.auth0.com'
ALGORITHMS = ['RS256']
API_AUDIENCE = 'coffee'
# AuthError Exception
'''
AuthError Exception
A standardized way to communicate auth failure modes
'''
class AuthError(Exception):
def __init__(self, error, status_code):
self.error = error
self.status_code = status_code
# Auth Header
def get_token_auth_header():
"""Obtains the Access Token from the Authorization Header
"""
auth = request.headers.get('Authorization', None)
if not auth:
raise AuthError({
'code': 'authorization_header_missing',
'description': 'Authorization header is expected.'
}, 401)
parts = auth.split()
if parts[0].lower() != 'bearer':
raise AuthError({
'code': 'invalid_header',
'description': 'Authorization header must start with "Bearer".'
}, 401)
elif len(parts) == 1:
raise AuthError({
'code': 'invalid_header',
'description': 'Token not found.'
}, 401)
elif len(parts) > 2:
raise AuthError({
'code': 'invalid_header',
'description': 'Authorization header must be bearer token.'
}, 401)
token = parts[1]
return token
def check_permissions(permission, payload):
if 'permissions' not in payload:
raise AuthError({
'code': 'invalid_claims',
'description': 'Permissions not included in JWT.'
}, 400)
if permission not in payload['permissions']:
raise AuthError({
'code': 'unauthorized',
'description': 'Permission not found.'
}, 403)
return True
def verify_decode_jwt(token):
jsonurl = urlopen(f'https://{AUTH0_DOMAIN}/.well-known/jwks.json')
jwks = json.loads(jsonurl.read())
unverified_header = jwt.get_unverified_header(token)
rsa_key = {}
if 'kid' not in unverified_header:
raise AuthError({
'code': 'invalid_header',
'description': 'Authorization malformed.'
}, 401)
for key in jwks['keys']:
if key['kid'] == unverified_header['kid']:
rsa_key = {
'kty': key['kty'],
'kid': key['kid'],
'use': key['use'],
'n': key['n'],
'e': key['e']
}
if rsa_key:
try:
payload = jwt.decode(
token,
rsa_key,
algorithms=ALGORITHMS,
audience=API_AUDIENCE,
issuer='https://' + AUTH0_DOMAIN + '/',
)
return payload
except jwt.ExpiredSignatureError:
raise AuthError({
'code': 'token_expired',
'description': 'Token expired.'
}, 401)
except jwt.JWTClaimsError:
raise AuthError({
'code': 'invalid_claims',
'description': 'Incorrect claims. Please, check the audience and issuer.'
}, 401)
except jwt.JWTError:
raise AuthError({
'code': 'invalid signature',
'description': 'the signature is invalid in some way'
}, 401)
except Exception:
raise AuthError({
'code': 'invalid_header',
'description': 'Unable to parse authentication token.'
}, 400)
raise AuthError({
'code': 'invalid_header',
'description': 'Unable to find the appropriate key.'
}, 400)
def requires_auth(permission=''):
def requires_auth_decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
token = get_token_auth_header()
try:
payload = verify_decode_jwt(token)
check_permissions(permission, payload)
except Exception as e:
print(e)
abort(401)
return f(payload, *args, **kwargs)
return wrapper
return requires_auth_decorator
我正确使用图书馆吗?