6

我正在尝试为 django 频道设置身份验证中间件。我希望这个中间件只对 websocket 请求有效。

似乎在这种情况下我没有获得完整的中间件功能。例如我无法response = self.get_response(scope)工作:

'TokenAuthMiddleware' object has no attribute 'get_response'

现在这个中间件一切正常(它只为 websocket 请求激活,而不是在 中注册settings.py),除了我需要一种方法来修改响应状态代码(阻止匿名用户并设置错误代码ExpiredSignatureError)。任何帮助表示赞赏。我使用 Django2.0.6和频道2.1.1。jwt 身份验证通过djangorestframework-jwt

中间件:

import jwt, re
import traceback
import logging

from channels.auth import AuthMiddlewareStack
from django.contrib.auth.models import AnonymousUser
from django.conf import LazySettings
from jwt import InvalidSignatureError, ExpiredSignatureError, DecodeError

from project.models import MyUser

settings = LazySettings()
logger = logging.getLogger(__name__)


class TokenAuthMiddleware:
    """
    Token authorization middleware for Django Channels 2

    """

    def __init__(self, inner):
        self.inner = inner

    def __call__(self, scope):
        headers = dict(scope['headers'])
        auth_header = None
        if b'authorization' in headers:
            auth_header = headers[b'authorization'].decode()
        else:
            try:
                auth_header = _str_to_dict(headers[b'cookie'].decode())['X-Authorization']
            except:
                pass

        logger.info(auth_header)

        if auth_header:
            try:
                user_jwt = jwt.decode(
                    auth_header,
                    settings.SECRET_KEY,
                )
                scope['user'] = MyUser.objects.get(
                    id=user_jwt['user_id']
                )
            except (InvalidSignatureError, KeyError, ExpiredSignatureError, DecodeError):
                traceback.print_exc()
                pass
            except Exception as e:  # NoQA
                logger.error(scope)
                traceback.print_exc()

        return self.inner(scope)


TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddleware(AuthMiddlewareStack(inner))


def _str_to_dict(str):
    return {k: v.strip('"') for k, v in re.findall(r'(\S+)=(".*?"|\S+)', str)}

路由.py

application = ProtocolTypeRouter({
    # (http->django views is added by default)
    'websocket': TokenAuthMiddlewareStack(
        URLRouter(
            cmonitorserv.routing.websocket_urlpatterns
        )
    ),
})
4

1 回答 1

3

无法使用中间件找到解决方案。现在通过处理身份验证权限来解决consumers.py

def _is_authenticated(self):
    if hasattr(self.scope, 'auth_error'):
        return False
    if not self.scope['user'] or self.scope['user'] is AnonymousUser:
        return False
    return True

另一个似乎没有在任何地方记录的重要事情 - 要拒绝与自定义错误代码的连接,我们需要先接受它。

class WebConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()
        if self._is_authenticated():
              ....
        else:
            logger.error("ws client auth error")
            self.close(code=4003)
于 2018-06-18T19:55:53.783 回答