我目前正在用 pytest 编写一些异步测试,发现自己遇到了以下情况。
考虑我们有一个asyncio.Queue
调用peer2_subscriber
,我们想检查它是否收到某个消息(在触发某些操作后,为简洁起见省略)
peer, cmd, msg = await asyncio.wait_for(
peer2_subscriber.get(),
timeout=1,
)
assert peer == peer2
assert isinstance(cmd, Transactions)
assert msg[0].hash == txs[0].hash
现在,考虑我想测试另一个asyncio.Queue
没有推动的东西。
我发现自己创建了这样一个辅助方法。
async def wait_with_fallback(fn, fallback):
try:
return await asyncio.wait_for(
fn(),
timeout=1
)
except asyncio.TimeoutError:
return fallback
然后在测试中我写了类似的东西:
val = await wait_with_fallback(
peer1_subscriber.get,
None
)
assert val == None
我想知道是否存在我缺少的现有模式?