我有一个在 Python 3.7 + Tornado 5 上运行的 REST API,使用 postgresql 作为数据库,使用带有 SQLAlchemy 核心的 aiopg(通过 aiopg.sa 绑定)。对于单元测试,我使用 py.test 和 pytest-tornado。
只要不涉及对数据库的查询,所有测试都会正常进行,我会得到这个:
运行时错误:任务 cb=[IOLoop.add_future..() at venv/lib/python3.7/site-packages/tornado/ioloop.py:719]> 将 Future 附加到另一个循环
相同的代码在测试中运行良好,到目前为止我能够处理 100 个请求。
这是 @auth 装饰器的一部分,它将检查 JWT 令牌的 Authorization 标头,对其进行解码并获取用户的数据并将其附加到请求中;这是查询的一部分:
partner_id = payload['partner_id']
provided_scopes = payload.get("scope", [])
for scope in scopes:
if scope not in provided_scopes:
logger.error(
'Authentication failed, scopes are not compliant - '
'required: {} - '
'provided: {}'.format(scopes, provided_scopes)
)
raise ForbiddenException(
"insufficient permissions or wrong user."
)
db = self.settings['db']
partner = await Partner.get(db, username=partner_id)
# The user is authenticated at this stage, let's add
# the user info to the request so it can be used
if not partner:
raise UnauthorizedException('Unknown user from token')
p = Partner(**partner)
setattr(self.request, "partner_id", p.uuid)
setattr(self.request, "partner", p)
Partner 的 .get() 异步方法来自应用程序中所有模型的基类。这是 .get 方法的实现:
@classmethod
async def get(cls, db, order=None, limit=None, offset=None, **kwargs):
"""
Get one instance that will match the criteria
:param db:
:param order:
:param limit:
:param offset:
:param kwargs:
:return:
"""
if len(kwargs) == 0:
return None
if not hasattr(cls, '__tablename__'):
raise InvalidModelException()
tbl = cls.__table__
instance = None
clause = cls.get_clause(**kwargs)
query = (tbl.select().where(text(clause)))
if order:
query = query.order_by(text(order))
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
logger.info(f'GET query executing:\n{query}')
try:
async with db.acquire() as conn:
async with conn.execute(query) as rows:
instance = await rows.first()
except DataError as de:
[...]
return instance
上面的 .get() 方法将返回模型实例(行表示)或无。
它使用 db.acquire() 上下文管理器,如 aiopg 的文档中所述:https ://aiopg.readthedocs.io/en/stable/sa.html 。
如同一文档中所述, sa.create_engine() 方法返回一个连接池,因此 db.acquire() 仅使用池中的一个连接。我将这个池共享给 Tornado 中的每个请求,他们在需要时使用它来执行查询。
这是我在 conftest.py 中设置的夹具:
@pytest.fixture
async def db():
dbe = await setup_db()
return dbe
@pytest.fixture
def app(db, event_loop):
"""
Returns a valid testing Tornado Application instance.
:return:
"""
app = make_app(db)
settings.JWT_SECRET = 'its_secret_one'
return app
我无法解释为什么会发生这种情况;Tornado 的文档和源代码清楚地表明默认使用 asyncIO 事件循环,通过调试它我可以看到事件循环确实是同一个,但由于某种原因它似乎突然关闭或停止。
这是一项失败的测试:
@pytest.mark.gen_test(timeout=2)
def test_score_returns_204_empty(app, http_server, http_client, base_url):
score_url = '/'.join([base_url, URL_PREFIX, 'score'])
token = create_token('test', scopes=['score:get'])
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json',
}
response = yield http_client.fetch(score_url, headers=headers, raise_error=False)
assert response.code == 204
此测试失败,因为它返回 401 而不是 204,因为对 auth 装饰器的查询由于 RuntimeError 而失败,然后返回未经授权的响应。
这里的异步专家的任何想法都将不胜感激,我对此非常迷茫!!!