4

我有一个在 Python 3.7 + Tornado 5 上运行的 REST API,使用 postgresql 作为数据库,使用带有 SQLAlchemy 核心的 aiopg(通过 aiopg.sa 绑定)。对于单元测试,我使用 py.test 和 pytest-tornado。

只要不涉及对数据库的查询,所有测试都会正常进行,我会得到这个:

运行时错误:任务 cb=[IOLoop.add_future..() at venv/lib/python3.7/site-packages/tornado/ioloop.py:719]> 将 Future 附加到另一个循环

相同的代码在测试中运行良好,到目前为止我能够处理 100 个请求。

这是 @auth 装饰器的一部分,它将检查 JWT 令牌的 Authorization 标头,对其进行解码并获取用户的数据并将其附加到请求中;这是查询的一部分:

                partner_id = payload['partner_id']
                provided_scopes = payload.get("scope", [])
                for scope in scopes:
                    if scope not in provided_scopes:
                        logger.error(
                            'Authentication failed, scopes are not compliant - '
                            'required: {} - '
                            'provided: {}'.format(scopes, provided_scopes)
                        )
                        raise ForbiddenException(
                            "insufficient permissions or wrong user."
                        )
                db = self.settings['db']
                partner = await Partner.get(db, username=partner_id)
                # The user is authenticated at this stage, let's add
                # the user info to the request so it can be used
                if not partner:
                    raise UnauthorizedException('Unknown user from token')
                p = Partner(**partner)
                setattr(self.request, "partner_id", p.uuid)
                setattr(self.request, "partner", p)

Partner 的 .get() 异步方法来自应用程序中所有模型的基类。这是 .get 方法的实现:

@classmethod
async def get(cls, db, order=None, limit=None, offset=None, **kwargs):
    """
    Get one instance that will match the criteria
    :param db:
    :param order:
    :param limit:
    :param offset:
    :param kwargs:
    :return:
    """
    if len(kwargs) == 0:
        return None
    if not hasattr(cls, '__tablename__'):
        raise InvalidModelException()
    tbl = cls.__table__
    instance = None
    clause = cls.get_clause(**kwargs)
    query = (tbl.select().where(text(clause)))
    if order:
        query = query.order_by(text(order))
    if limit:
        query = query.limit(limit)
    if offset:
        query = query.offset(offset)
    logger.info(f'GET query executing:\n{query}')
    try:
        async with db.acquire() as conn:
            async with conn.execute(query) as rows:
                instance = await rows.first()
    except DataError as de:
        [...]
    return instance

上面的 .get() 方法将返回模型实例(行表示)或无。

它使用 db.acquire() 上下文管理器,如 aiopg 的文档中所述:https ://aiopg.readthedocs.io/en/stable/sa.html 。

如同一文档中所述, sa.create_engine() 方法返回一个连接池,因此 db.acquire() 仅使用池中的一个连接。我将这个池共享给 Tornado 中的每个请求,他们在需要时使用它来执行查询。

这是我在 conftest.py 中设置的夹具:

@pytest.fixture
async def db():
    dbe = await setup_db()
    return dbe


@pytest.fixture
def app(db, event_loop):
    """
    Returns a valid testing Tornado Application instance.
    :return:
    """
    app = make_app(db)
    settings.JWT_SECRET = 'its_secret_one'
    return app

我无法解释为什么会发生这种情况;Tornado 的文档和源代码清楚地表明默认使用 asyncIO 事件循环,通过调试它我可以看到事件循环确实是同一个,但由于某种原因它似乎突然关闭或停止。

这是一项失败的测试:

@pytest.mark.gen_test(timeout=2)
def test_score_returns_204_empty(app, http_server, http_client, base_url):
    score_url = '/'.join([base_url, URL_PREFIX, 'score'])
    token = create_token('test', scopes=['score:get'])
    headers = {
        'Authorization': f'Bearer {token}',
        'Accept': 'application/json',
    }
    response = yield http_client.fetch(score_url, headers=headers, raise_error=False)
    assert response.code == 204

此测试失败,因为它返回 401 而不是 204,因为对 auth 装饰器的查询由于 RuntimeError 而失败,然后返回未经授权的响应。

这里的异步专家的任何想法都将不胜感激,我对此非常迷茫!!!

4

1 回答 1

5

好吧,经过大量的挖掘、测试,当然还有很多关于 asyncio 的学习,我自己实现了它。感谢您迄今为止的建议。

问题是来自 asyncio 的 event_loop 没有运行;正如@hoefling 提到的,pytest 本身不支持协程,但是 pytest-asyncio 为您的测试带来了如此有用的功能。这在这里得到了很好的解释:https ://medium.com/ideas-at-igenius/testing-asyncio-python-code-with-pytest-a2f3628f82bc

因此,如果没有 pytest-asyncio,您需要测试的异步代码将如下所示:

def test_this_is_an_async_test():
   loop = asyncio.get_event_loop()
   result = loop.run_until_complete(my_async_function(param1, param2, param3)
   assert result == 'expected'

我们使用 loop.run_until_complete() 作为,否则,循环将永远不会运行,因为这是 asyncio 默认的工作方式(并且 pytest 没有让它以不同的方式工作)。

使用 pytest-asyncio,您的测试可以使用众所周知的 async / await 部分:

async def test_this_is_an_async_test(event_loop):
   result = await my_async_function(param1, param2, param3)
   assert result == 'expected'

在这种情况下,pytest-asyncio 包装了上面的 run_until_complete() 调用,对其进行了大量总结,因此事件循环将运行并可供您的异步代码使用。

请注意:第二种情况下的 event_loop 参数在这里甚至不是必需的,pytest-asyncio 提供了一个可用于您的测试。

另一方面,当您测试 Tornado 应用程序时,您通常需要在测试期间启动并运行 http 服务器、侦听已知端口等,因此通常的方法是编写夹具来获取http 服务器,base_url(通常是http://localhost :,带有未使用的端口等)。

pytest-tornado 是一个非常有用的工具,因为它为您提供了以下几种固定装置:http_server、http_client、unused_port、base_url 等。

另外值得一提的是,它获得了 pytest 标记的 gen_test() 功能,该功能将任何标准测试转换为通过 yield 使用协程,甚至断言它将在给定的超时下运行,如下所示:

    @pytest.mark.gen_test(timeout=3)
    def test_fetch_my_data(http_client, base_url):
       result = yield http_client.fetch('/'.join([base_url, 'result']))
       assert len(result) == 1000

但是,这种方式不支持 async / await,实际上只有 Tornado 的 ioloop 可以通过 io_loop 夹具使用(尽管 Tornado 的 ioloop 默认使用 Tornado 5.0 下面的 asyncio),所以你需要结合 pytest.mark。 gen_test 和 pytest.mark.asyncio,但顺序正确!(我确实失败了)。

一旦我更好地了解可能是什么问题,这就是下一个方法:

    @pytest.mark.gen_test(timeout=2)
    @pytest.mark.asyncio
    async def test_score_returns_204_empty(http_client, base_url):
        score_url = '/'.join([base_url, URL_PREFIX, 'score'])
        token = create_token('test', scopes=['score:get'])
        headers = {
            'Authorization': f'Bearer {token}',
            'Accept': 'application/json',
        }
        response = await http_client.fetch(score_url, headers=headers, raise_error=False)
        assert response.code == 204

但这是完全错误的,如果您了解 Python 的装饰器包装器是如何工作的。使用上面的代码,pytest-asyncio 的协程然后被包装在一个 pytest-tornado yield gen.coroutine 中,它不会让事件循环运行......所以我的测试仍然失败并出现同样的问题。对数据库的任何查询都返回一个 Future 等待事件循环运行。

一旦我犯了这个愚蠢的错误,我更新的代码:

    @pytest.mark.asyncio
    @pytest.mark.gen_test(timeout=2)
    async def test_score_returns_204_empty(http_client, base_url):
        score_url = '/'.join([base_url, URL_PREFIX, 'score'])
        token = create_token('test', scopes=['score:get'])
        headers = {
            'Authorization': f'Bearer {token}',
            'Accept': 'application/json',
        }
        response = await http_client.fetch(score_url, headers=headers, raise_error=False)
        assert response.code == 204

在这种情况下, gen.coroutine 被包裹在 pytest-asyncio 协程中,并且 event_loop 按预期运行协程!

但是还有一个小问题,我也花了一点时间才意识到;pytest-asyncio 的 event_loop 夹具为每个测试创建一个新的事件循环,而 pytest-tornado 也创建一个新的 IOloop。并且测试仍然失败,但这次出现了不同的错误。

conftest.py 文件现在看起来像这样;请注意,我重新声明了 event_loop 夹具以使用来自 pytest-tornado io_loop 夹具本身的 event_loop(请记住 pytest-tornado 在每个测试函数上创建一个新的 io_loop):

@pytest.fixture(scope='function')
def event_loop(io_loop):
    loop = io_loop.current().asyncio_loop
    yield loop
    loop.stop()


@pytest.fixture(scope='function')
async def db():
    dbe = await setup_db()
    yield dbe


@pytest.fixture
def app(db):
    """
    Returns a valid testing Tornado Application instance.
    :return:
    """
    app = make_app(db)
    settings.JWT_SECRET = 'its_secret_one'
    yield app

现在我所有的测试都成功了,我重新成为一个快乐的人,并且为我现在对 asyncio 生活方式的更好理解感到非常自豪。凉爽的!

于 2019-01-04T07:42:58.940 回答