4

在为我的聊天消费者编写测试的过程中,我遇到了无法使用 WebSocketCommunicator 在测试中进行身份验证的问题。我有自定义的 JwtTokenAuthMiddleware,它通过在请求查询中使用令牌来实现套接字中的身份验证,因为据我所知,使用授权标头进行体面的身份验证是不可能的。你们能给我建议或提供我在网上找不到的示例代码吗?顺便说一句,我的聊天没有问题。测试也应该很好,我从官方文档 Django Channels 2.x 测试中获取了指南。

--JwtTokenAuthMiddlewate--

class JwtTokenAuthMiddleware:
    def __init__(self, inner):
        self.inner = inner

    def __call__(self, scope):
        close_old_connections()
        try:
            raw_token = scope['query_string'].decode().split('=')[1]
            auth = JWTAuthentication()
            validated_token = auth.get_validated_token(raw_token)
            user = auth.get_user(validated_token)
            scope['user'] = user
        except (IndexError, InvalidToken, AuthenticationFailed):
            scope['user'] = AnonymousUser()
        return self.inner(scope)


JwtTokenAuthMiddlewareStack = lambda inner: JwtTokenAuthMiddleware(AuthMiddlewareStack(inner))

--示例测试--

@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_trainer_auth_success():
    room = await database_sync_to_async(RoomFactory.create)()
    trainer = room.trainer
    trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
    room_url = f'ws/room/{room.id}/'

    trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
    connected, _ = await trainer_communicator.connect()
    assert connected

    trainer_connect_resp = await trainer_communicator.receive_json_from()
    assert_connection(trainer_connect_resp, [], room.max_round_time)
    await trainer_communicator.disconnect()

--错误追溯--

___________________________________________________________ test_trainer_auth_success ___________________________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f6b9906f290>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x7f6b98f76510 maxsize=0>

    async def get(self):
        """Remove and return an item from the queue.

        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               await getter
E               concurrent.futures._base.CancelledError

/usr/local/lib/python3.7/asyncio/queues.py:159: CancelledError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db(transaction=True)
    @pytest.mark.asyncio
    async def test_trainer_auth_success():
        room = await database_sync_to_async(RoomFactory.create)()
        trainer = room.trainer
        trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
        room_url = f'ws/room/{room.id}/'

        # trainer_communicator = await assert_get_connected_communicator(application, room_url, trainer_token)
        trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
>       connected, _ = await trainer_communicator.connect()

apps/chat/tests/test_consumers.py:39: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/site-packages/channels/testing/websocket.py:36: in connect
    response = await self.receive_output(timeout)
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:85: in receive_output
    raise e
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74: in receive_output
    return await self.output_queue.get()
/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asgiref.timeout.timeout object at 0x7f6b98f76d50>, exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
=============================================================== warnings summary ================================================================
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39
  /usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
    item = pytest.Function(name, parent=collector)

/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45
  /usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
    item = pytest.Function(name, parent=collector)  # To reload keywords.
4

1 回答 1

4

我一直在尝试或多或少地做同样的事情,似乎在测试消费者和传播者时没有简单的方法来验证用户。一个关于这个主题的 GH 问题,其中给出了一些(工作!)解决方法,也许你觉得它有帮助。

于 2020-08-10T20:59:39.847 回答