anyio
是 FastAPI 的一部分starlette
,因此也是 FastAPI 的一部分。我发现使用它的任务组对我的一个 API 服务器之外的外部服务执行并发请求非常方便。
另外,我想在结果准备好后立即发布。fastapi.StreamingResponse可以解决问题,但我仍然需要能够在返回后保持任务组正常运行StreamingResponse
,但这听起来与结构化并发的想法背道而驰。
使用异步生成器可能看起来是一个明显的解决方案,但yield
通常不能在任务组上下文中使用,根据此:https ://trio.readthedocs.io/en/stable/reference-core.html#cancel-范围和托儿所
有一个 FastAPI 服务器的示例似乎可以工作,尽管它会在返回之前聚合响应:
import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/")
async def root():
# What to put below?
result = await main()
return StreamingResponse(iter(result))
async def main():
send_stream, receive_stream = anyio.create_memory_object_stream()
result = []
async with anyio.create_task_group() as tg:
async with send_stream:
for num in range(5):
tg.start_soon(sometask, num, send_stream.clone())
async with receive_stream:
async for entry in receive_stream:
# What to do here???
result.append(entry)
return result
async def sometask(num, send_stream):
await anyio.sleep(1)
async with send_stream:
await send_stream.send(f'number {num}\n')
if __name__ == "__main__":
import uvicorn
# Debug-only configuration
uvicorn.run(app)
所以,问题是,是否有类似于@trio_util.trio_async_generator
in 的东西anyio
,或者是否可以@trio_util.trio_async_generator
直接与 FastAPI 一起使用?
也许还有其他解决方案?