我正在尝试使用 FastAPI 构建一个服务器发送事件端点,但我不确定我想要完成的事情是否可行或者我将如何去做。
问题简介
基本上假设我有一个run_task(limit, task)
异步函数,它发送异步请求、进行事务或类似的东西。假设每个任务run_task
都可以返回一些 JSON 数据。
我想run_task(limit, task)
异步运行多个任务(多个),为此我使用三重奏和托儿所,如下所示:
async with trio.open_nursery() as nursery:
limit = trio.CapacityLimiter(10)
for task in tasks:
nursery.start_soon(run_task, limit, task)
最后,我想通过 FastAPI 端点返回每个任务的结果
起初,我只是创建了一个包含一个列表的对象,并将该对象(通过引用)传递给 each run_task
,当任务完成时,我会将 JSON 数据作为字典推送,并通过端点返回整个对象任务完成了。
这可行,但我发现效率低下,发送请求的客户端需要等待所有任务完成才能显示数据,但是,某些任务可能会很慢,这意味着从其他任务获取的数据最终会停滞不前.
我想完成什么
每当任务完成时,我希望 API 直接返回所述任务的数据(我之前会添加到对象中),以便客户端可以实时显示所述数据。
那时我发现了服务器发送事件和 Web 套接字是什么。服务器发送的事件似乎是解决我的问题的合适方法,因为我不需要双向通信。
由于 FastAPI 是在 Starlette 上构建的,我决定使用sse-Starlette来构建一个带有服务器发送事件的端点,为此我需要像这样构建一个端点
@router.get('/stream')
async def runTasks(
param1: str,
request: Request
):
event_generator = status_event_generator(request, param1)
return EventSourceResponse(event_generator)
实际问题
顾名思义status_event_generator
, sse-starlette 需要返回一个事件生成器,这就是我卡住的地方。我希望生成器在完成时产生任务的数据(以便客户端可以实时接收每个任务的数据),但是,这些任务在异步三重奏托儿所内,所以我不确定如何继续
根据异步生成器功能中的托儿所内部产生是否不好?,看来(如果我理解正确的话)我不能只投入收益run_task(limit, task)
并期望它起作用