2

我正在尝试使用 FastAPI 构建一个服务器发送事件端点,但我不确定我想要完成的事情是否可行或者我将如何去做。

问题简介

基本上假设我有一个run_task(limit, task)异步函数,它发送异步请求、进行事务或类似的东西。假设每个任务run_task都可以返回一些 JSON 数据。

我想run_task(limit, task)异步运行多个任务(多个),为此我使用三重奏和托儿所,如下所示:

async with trio.open_nursery() as nursery:
    limit = trio.CapacityLimiter(10)
    for task in tasks:
        nursery.start_soon(run_task, limit, task)

最后,我想通过 FastAPI 端点返回每个任务的结果

起初,我只是创建了一个包含一个列表的对象,并将该对象(通过引用)传递给 each run_task,当任务完成时,我会将 JSON 数据作为字典推送,并通过端点返回整个对象任务完成了。

这可行,但我发现效率低下,发送请求的客户端需要等待所有任务完成才能显示数据,但是,某些任务可能会很慢,这意味着从其他任务获取的数据最终会停滞不前.

我想完成什么

每当任务完成时,我希望 API 直接返回所述任务的数据(我之前会添加到对象中),以便客户端可以实时显示所述数据。

那时我发现了服务器发送事件和 Web 套接字是什么。服务器发送的事件似乎是解决我的问题的合适方法,因为我不需要双向通信。

由于 FastAPI 是在 Starlette 上构建的,我决定使用sse-Starlette来构建一个带有服务器发送事件的端点,为此我需要像这样构建一个端点

@router.get('/stream')
async def runTasks(
        param1: str,
        request: Request
):
    event_generator = status_event_generator(request, param1)
    return EventSourceResponse(event_generator)

实际问题

顾名思义status_event_generator, sse-starlette 需要返回一个事件生成器,这就是我卡住的地方。我希望生成器在完成时产生任务的数据(以便客户端可以实时接收每个任务的数据),但是,这些任务在异步三重奏托儿所内,所以我不确定如何继续

根据异步生成器功能中的托儿所内部产生是否不好?,看来(如果我理解正确的话)我不能只投入收益run_task(limit, task)并期望它起作用

4

1 回答 1

1

使用 websockets 的解决方案

我决定最终使用 websockets 而不是 SSE,因为我意识到我需要将对象作为数据传递到我的端点,虽然 SEE 可以接受查询参数,但将对象作为查询参数处理太麻烦了。

带有 FastAPI 的 websockets 基于 starlette,并且非常易于使用,可以像这样实现它们来解决上述问题:

@router.websocket('/stream')
async def runTasks(
        websocket: WebSocket
):
    # Initialise websocket
    await websocket.accept()

    # Receive data
    tasks = await websocket.receive_json()

    async with trio.open_nursery() as nursery:
        limit = trio.CapacityLimiter(10)
        for task in tasks:
            nursery.start_soon(run_task, limit, task, websocket)

要返回数据,我们可以简单地使用await websocket.send_json()in run_task(这是一个简化的示例,您最好使用您的 Nursery 处理 websocket 闭包和边缘情况)

SSE 解决方案

为了回答最初的问题,感谢@user3840170 和https://discuss.python.org/t/preventing-yield-inside-certain-context-managers/1091,我们应该可以通过在某处开设托儿所来解决问题在更广泛的范围内,将包含遍历生成器的循环,并在生成器本身中使用该托儿所来生成后台任务。

于 2022-01-16T23:27:53.170 回答