我一直在尝试集成Authlib Flask Client以通过 Google OIDC 实现 OIDC。我正在使用 Flask SQLAlchemy,一切正常。唯一的问题是我无法在过期时刷新令牌。
问题:
- 如何验证令牌,以及令牌过期时我应该在 fetch_token 函数中返回什么。
- 如何使用 refresh_token 更新令牌?我从谷歌获取刷新令牌,并将其存储在数据库中,但没有调用 update_token。
- 调用注销时可以从数据库中删除令牌条目吗?再次登录时会重新创建。
我认为这是自动完成的,我不需要自己验证令牌。下面是我的代码,带有数据库模型,还有 fetch_token 和 update_token。
几点注意事项:
- 我正在验证
expires_at
fetch_token 方法的内部。 - 如果我没有在 fetch_token 中添加 expires_at,当我调用
oauth.google.token
- 我尝试添加
sender
update_token 方法,但效果不佳。
有人可以帮我理解我在这里做错了什么。下面是我的代码。
import time
from authlib.integrations.flask_client import OAuth
from authlib.oidc.core.errors import LoginRequiredError
from flask import Flask
from flask import current_app as app, redirect, url_for, session
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, Integer, String
class OAuth2Token(app.db.Model):
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False)
name = Column(String(20), nullable=False)
access_token = Column(String(255), nullable=False)
expires_in = Column(Integer, default=0)
scope = Column(String, default=0)
token_type = Column(String(20))
refresh_token = Column(String(255))
expires_at = Column(Integer, default=0)
def to_token(self):
return dict(
access_token=self.access_token,
expires_in=self.expires_in,
scope=self.scope,
token_type=self.token_type,
refresh_token=self.refresh_token,
expires_at=self.expires_at,
)
@property
def is_active(self):
return self.expires_at > round(time.time())
@staticmethod
def save(**kwargs):
item = OAuth2Token(**kwargs)
app.db.session.add(item)
app.db.session.commit()
@staticmethod
def get(**kwargs):
return OAuth2Token.query.filter_by(**kwargs).first()
@staticmethod
def delete(**kwargs):
OAuth2Token.query.filter_by(**kwargs).delete()
app.db.session.commit()
@staticmethod
def get_active(name, user_id, int_time):
return OAuth2Token.query.filter(OAuth2Token.name == name,
OAuth2Token.user_id == user_id,
OAuth2Token.expires_at >= int_time
).first()
@staticmethod
def all():
return OAuth2Token.query.all()
@staticmethod
def update_tokens(name, token, refresh_token=None, access_token=None):
if refresh_token:
item = OAuth2Token.get(name=name, refresh_token=refresh_token)
elif access_token:
item = OAuth2Token.get(name=name, access_token=access_token)
else:
return
item.access_token = token['access_token']
item.refresh_token = token.get('refresh_token')
item.expires_at = token['expires_at']
app.db.session.commit()
def _update_token(name, token, refresh_token=None, access_token=None):
try:
OAuth2Token.update_tokens(name, token, refresh_token=refresh_token, access_token=access_token)
except Exception as ex:
print("exception")
def _fetch_token(name):
try:
_current_time = round(time.time())
token = OAuth2Token.get_active(name=name,
user_id=session["user"]["id"],
int_time=_current_time)
if not token:
return None
return token.to_token()
except Exception as ex:
raise LoginRequiredError
class CustomApp(Flask):
def __init__(self, *args, **kwargs):
super(CustomApp, self).__init__(*args, **kwargs)
self.db = SQLAlchemy(self)
with self.app_context():
self.db.create_all()
oauth = OAuth(
self,
fetch_token=_fetch_token,
update_token=_update_token
)
self.auth_client = oauth.register(
name='google',
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={
'scope': 'openid email profile',
},
authorize_params={
'access_type': 'offline',
'prompt': 'consent'
}
)
@self.route('/login')
def login():
redirect_uri = url_for('auth', _external=True)
return self.auth_client.authorize_redirect(redirect_uri)
@self.route('/auth')
def auth():
try:
token = self.auth_client.authorize_access_token()
user = self.auth_client.parse_id_token(token)
user_id = user.user_id
# Since we are not storing the id_token in the model
token.pop("id_token")
OAuth2Token.save(name='google', user_id=user_id, **token)
session["user"] = user
return redirect('/')
except Exception as ex:
raise ex
@self.route('/logout')
def logout():
if session.get("user"):
OAuth2Token.delete(name='google', user_id=session["user"]["id"])
session.pop('user', None)
return redirect(url_for('login'))