我被告知下面的代码是不安全的,因为它不允许有一个从托儿所内部产生的异步生成器,除非它是一个异步上下文管理器。
T = TypeVar('T')
async def delay(interval: float, source: AsyncIterable[T]) -> AsyncIterable[T]:
"""Delays each item in source by an interval.
Received items are temporarily stored in an unbounded queue, along with a timestamp, using
a background task. The foreground task takes items from the queue, and waits until the
item is older than the given interval and then yields it."""
send_channel, receive_channel = trio.open_memory_channel(math.inf)
async def pull_task():
async with aclosing(source) as agen:
async for item in agen:
send_channel.send_nowait((item, trio.current_time() + interval))
async with trio.open_nursery() as nursery:
nursery.start_soon(pull_task)
async with receive_channel:
async for item, timestamp in receive_channel:
now = trio.current_time()
if timestamp > now:
await trio.sleep(timestamp - now)
yield item
我很难理解这怎么可能打破。如果有人可以提供一个使用这个确切的生成器函数的示例代码,这证明了不安全性,将不胜感激和奖励。
上述代码的目标是延迟异步序列的处理,而不施加任何背压。如果你能证明这段代码不像我期望的那样工作,那也将不胜感激。
谢谢你。