经过一番挖掘,发现 Djoser 使用 simple-jwt 模块来生成其 JWT 令牌,因此错误消息来自该模块。
在一些为 simple-jwt 做出贡献的人的帮助下,我能够修改如下所示的代码,以使端点检查用户是否处于非活动状态并发送适当的错误。
# custom_serializers.py
from django.contrib.auth.models import update_last_login
from rest_framework_simplejwt.serializers import TokenObtainSerializer
from rest_framework_simplejwt.exceptions import AuthenticationFailed
from rest_framework import status
from rest_framework_simplejwt.settings import api_settings
from rest_framework_simplejwt.tokens import RefreshToken
class InActiveUser(AuthenticationFailed):
status_code = status.HTTP_406_NOT_ACCEPTABLE
default_detail = "User is not active, please confirm your email"
default_code = 'user_is_inactive'
# noinspection PyAbstractClass
class CustomTokenObtainPairSerializer(TokenObtainSerializer):
@classmethod
def get_token(cls, user):
return RefreshToken.for_user(user)
def validate(self, attrs):
data = super().validate(attrs)
if not self.user.is_active:
raise InActiveUser()
refresh = self.get_token(self.user)
data['refresh'] = str(refresh)
data['access'] = str(refresh.access_token)
if api_settings.UPDATE_LAST_LOGIN:
update_last_login(None, self.user)
return data
# custom_authentication.py
def custom_user_authentication_rule(user):
"""
Override the default user authentication rule for Simple JWT Token to return true if there is a user and let
serializer check whether user is active or not to return an appropriate error.
Add 'USER_AUTHENTICATION_RULE': 'path_to_custom_user_authentication_rule' to simplejwt settings to override the default.
:param user: user to be authenticated
:return: True if user is not None
"""
return True if user is not None else False
# views.py
from .custom_serializer import CustomTokenObtainPairSerializer, InActiveUser
from rest_framework.response import Response
from rest_framework_simplejwt.exceptions import AuthenticationFailed, InvalidToken, TokenError
from rest_framework_simplejwt.views import TokenViewBase
class CustomTokenObtainPairView(TokenViewBase):
"""
Takes a set of user credentials and returns an access and refresh JSON web
token pair to prove the authentication of those credentials.
Returns HTTP 406 when user is inactive and HTTP 401 when login credentials are invalid.
"""
serializer_class = CustomTokenObtainPairSerializer
def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
try:
serializer.is_valid(raise_exception=True)
except AuthenticationFailed:
raise InActiveUser()
except TokenError:
raise InvalidToken()
return Response(serializer.validated_data, status=status.HTTP_200_OK)
# urls.py
path('api/token/', CustomTokenObtainPairView.as_view(),
name='token_obtain_pair'),
path('api/token/refresh/', TokenRefreshView.as_view(), name='token_refresh'),
path('api/token/verify/', TokenVerifyView.as_view(), name='token_verify'),