1

我目前正在用 pytest 编写一些异步测试,发现自己遇到了以下情况。

考虑我们有一个asyncio.Queue调用peer2_subscriber,我们想检查它是否收到某个消息(在触发某些操作后,为简洁起见省略)

peer, cmd, msg = await asyncio.wait_for(
    peer2_subscriber.get(),
    timeout=1,
)

assert peer == peer2
assert isinstance(cmd, Transactions)
assert msg[0].hash == txs[0].hash

现在,考虑我想测试另一个asyncio.Queue没有推动的东西。

我发现自己创建了这样一个辅助方法。

async def wait_with_fallback(fn, fallback):
    try:
        return await asyncio.wait_for(
            fn(),
            timeout=1
        )
    except asyncio.TimeoutError:
        return fallback

然后在测试中我写了类似的东西:

val = await wait_with_fallback(
    peer1_subscriber.get,
    None
)

assert val == None

我想知道是否存在我缺少的现有模式?

4

1 回答 1

2

你的模式有效,所以我会说它是“正确的”,对于正确的某些值......这里主要是风格观点。我会写要么

await asyncio.sleep(1)
assert peer1_subscriber.empty()

或者

await asyncio.sleep(1)
val = peer1_subscriber.get_nowait()
assert val is None
于 2018-06-18T16:21:58.397 回答