2

anyio是 FastAPI 的一部分starlette,因此也是 FastAPI 的一部分。我发现使用它的任务组对我的一个 API 服务器之外的外部服务执行并发请求非常方便。

另外,我想在结果准备好后立即发布。fastapi.StreamingResponse可以解决问题,但我仍然需要能够在返回后保持任务组正常运行StreamingResponse,但这听起来与结构化并发的想法背道而驰。

使用异步生成器可能看起来是一个明显的解决方案,但yield通常不能在任务组上下文中使用,根据此:https ://trio.readthedocs.io/en/stable/reference-core.html#cancel-范围和托儿所

有一个 FastAPI 服务器的示例似乎可以工作,尽管它会在返回之前聚合响应:

import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse


app = FastAPI()


@app.get("/")
async def root():
    # What to put below?
    result = await main()
    return StreamingResponse(iter(result))


async def main():
    send_stream, receive_stream = anyio.create_memory_object_stream()

    result = []
    async with anyio.create_task_group() as tg:
        async with send_stream:
            for num in range(5):
                tg.start_soon(sometask, num, send_stream.clone())

        async with receive_stream:
            async for entry in receive_stream:
                # What to do here???
                result.append(entry)

    return result


async def sometask(num, send_stream):
    await anyio.sleep(1)
    async with send_stream:
        await send_stream.send(f'number {num}\n')



if __name__ == "__main__":
    import uvicorn
    # Debug-only configuration
    uvicorn.run(app)

所以,问题是,是否有类似于@trio_util.trio_async_generatorin 的东西anyio,或者是否可以@trio_util.trio_async_generator直接与 FastAPI 一起使用?

也许还有其他解决方案?

4

1 回答 1

1
import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


@app.get("/")
async def root():
    return StreamingResponse(main())


async def main():
    send_stream, receive_stream = anyio.create_memory_object_stream()

    async with anyio.create_task_group() as tg:
        async with send_stream:
            for num in range(5):
                tg.start_soon(sometask, num, send_stream.clone())

        async with receive_stream:
            async for entry in receive_stream:
                yield entry


async def sometask(num, send_stream):
    async with send_stream:
        for i in range(1000):
            await anyio.sleep(1)
            await send_stream.send(f"number {num}\n")


if __name__ == "__main__":
    import uvicorn

    # Debug-only configuration
    uvicorn.run(app)

出乎意料的是,它起作用了。

于 2021-11-19T17:57:22.313 回答