我正在尝试使用Authlib实现 OAuth2 提供者和客户端。我按照文档编写提供程序,似乎是正确的,但是客户端在被重定向到身份验证页面并成功确认访问资源后,我收到以下错误:{“error”:“invalid_grant”} . 不幸的是,我找不到原因。
这是我的代码:
模型.py
from pony.orm import *
from authlib.oauth2.rfc6749 import ClientMixin
db = Database()
class User(db.Entity):
id = PrimaryKey(int, auto=True)
name = Required(str)
username = Required(str)
password = Required(str)
clients = Set("Client")
tokens = Set("Token")
def get_user_id(self):
return self.id
def get_username(self):
return self.name
class Client(db.Entity, ClientMixin):
id = PrimaryKey(int, auto=True)
name = Required(str)
client_id = Required(str)
client_secret = Required(str)
redirect_url = Required(str)
users = Set("User")
tokens = Set("Token")
def check_redirect_uri(self, redirect_uri):
print "Client::check_redirect_uri:", redirect_uri
return True
def check_response_type(self, response_type):
print "Client::check_response_type: ", response_type
return response_type in ["authorization_code", "code", "password"]
#Authorization token
class Token(db.Entity):
id = PrimaryKey(int, auto=True)
value = Required(str)
user = Required("User")
client = Required("Client")
scope = Optional(str)
@db_session
def create_new_user(name, username, password):
user = User(name=name, username=username, password=password)
commit()
@db_session
def create_new_client(name, client_id, client_secret, redirect_url):
client = Client(name=name, client_id=client_id, client_secret=client_secret, redirect_url=redirect_url)
commit()
from authlib.oauth2.rfc6749 import grants
from authlib.common.security import generate_token
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
@db_session
def create_authorization_code(self, client, grant_user, request):
code = generate_token(48)
token = Token(value=token, user=User[int(grant_user.get_user_id)],
client=Client[int(client.client_id)], scope=request.scope)
commit()
print "AuthorizationCodeGrant::create_authorization_code: token:", token
return code
def parse_authorization_code(self, code, client):
print "AuthorizationCodeGrant::parse_authorization_code"
token = Token.select(lambda t: t.value == code and t.client_id == client.client_id).first()
return token
def delete_authorization_code(self, authorization_code):
print "AuthorizationCodeGrant::delete autorization code: ", authorization_code.user_id
def authenticate_user(self, authorization_code):
print "authenticate_user"
return User[int(authorization_code.user_id)]
db.bind(provider='sqlite', filename='database.sqlite', create_db=True)
db.generate_mapping(create_tables=True)
set_sql_debug(True)
服务器.py
app = Flask(__name__, template_folder='templates')
@db_session
def query_client(client_id):
print "query_client id", client_id
client = Client.select(lambda c: c.client_id == client_id).first()
return client
@db_session
def save_token(token_data, request):
if request.user:
user_id = request.user.get_user_id()
print "user_id", user_id
else:
user_id = request.client.user_id
print "user_id", user_id
client = Client[int(request.client.client_id)]
user = User[int(user_id)]
token = Token(user=user, client=client)
commit()
server = AuthorizationServer(
query_client=query_client,
save_token=save_token,
)
#AuthorizationCodeGrant defindo em models.py
server.register_grant(AuthorizationCodeGrant)
from authlib.flask.oauth2 import ResourceProtector
require_oauth = ResourceProtector()
def current_user():
if 'id' in session:
uid = session['id']
try:
return User[int(uid)]
except:
return None
return None
@app.route('/', methods=['GET', 'POST'])
@db_session
def index():
if session['id']:
user = current_user()
return render_template('user.html', user=user)
return render_template('index.html')
@app.route('/login', methods=['GET', 'POST'])
@db_session
def login():
if request.method == 'GET':
if session['id']:
user = current_user()
return render_template('user.html', user=user)
return render_template('login.html')
else:
username = request.form['username']
user = User.select(lambda c: c.username == username).first()
if user:
session['id'] = user.id
return render_template('user.html', user=user)
else:
return render_template('error.html', msg="Usuario nao encontrado")
@app.route('/newlogin', methods=['GET', 'POST'])
def create_login():
if request.method == 'GET':
return render_template('newlogin.html')
else:
name = request.form['name']
username = request.form['username']
password = request.form['password']
create_new_user(name, username, password)
return redirect("/")
@app.route('/logout', methods=['GET', 'POST'])
def logout():
session['id'] = None
return redirect('/')
@app.route('/clients', methods=['GET', 'POST'])
@db_session
def clients():
if request.method == 'POST':
name = request.form['name']
client_id = gen_salt(24)
client_secret = gen_salt(48)
redirect_url = request.form['redirecturl']
create_new_client(name, client_id, client_secret, redirect_url=redirect_url)
clients = Client.select()
return render_template('clients.html', clients=clients)
@app.route('/oauth/authorize', methods=['GET', 'POST'])
@db_session
def authorize():
user = current_user()
if request.method == 'GET':
try:
grant = server.validate_consent_request(end_user=user)
print "Server: /authorize: grant:"
pprint(vars(grant))
except OAuth2Error as error:
return error.error
return render_template('authorize.html', user=user, grant=grant)
if not user and 'username' in request.form:
username = request.form.get('username')
user = User.select(lambda u: u.username == username).first()
if request.form['confirm']:
grant_user = user
else:
grant_user = None
print "Server: /authorize: grant_user:", grant_user
return server.create_authorization_response(grant_user=grant_user)
@app.route('/oauth/token', methods=['GET', 'POST'])
def issue_token():
return server.create_token_response()
@app.route('/user/profile')
@require_oauth("profile")
def get_profile():
return jsonify(current_user())
if __name__ == '__main__':
import os
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = 'true'
os.environ['AUTHLIB_INSECURE_TRANSPORT'] ='1'
app.debug = True
app.secret_key = 'development'
server.init_app(app)
app.run()
客户端.py
app = Flask(__name__)
oauth = OAuth(app)
client = oauth.register(
name='ecl-app1',
client_id='wKbpjHNvSSeirBO9SzYdWOg4',
client_secret='1SNDG8hxpNjP2rrlapVAPy8d9CHdVQ8Kzd4NqHydV7VdIrv7',
access_token_url='http://localhost:5000/oauth/access_token',
authorize_url='http://localhost:5000/oauth/authorize',
api_base_url='http://localhost:5000/',
grant_type='code',
client_kwargs={'scope': 'profile'},
)
@app.route('/')
def index():
redirect_uri = url_for('authorize', _external=True)
print "App Client: index: redirect_uri:", redirect_uri
resp = client.authorize_redirect(redirect_uri)
print "App Client: index: authorize_redirect: ", resp
#print(resp.status)
#print(resp.headers)
#print(resp.get_data())
return client.authorize_redirect(redirect_uri)
@app.route('/authorize')
def authorize():
print "App Client: authorize:"
token = client.authorize_access_token()
# you can save the token into database
profile = client.get('/user')
return "ok"
if __name__ == '__main__':
import os
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = 'true'
os.environ['AUTHLIB_INSECURE_TRANSPORT'] ='1'
app.debug = True
app.secret_key = 'development'
app.run(port=9000)