使用 FastAPI 我试图检测StreamingResponse是否已完全被客户端使用或是否已取消。
我有以下示例应用程序:
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def ainfinite_generator():
while True:
yield b"some fake data "
await asyncio.sleep(.001)
async def astreamer(generator):
try:
async for data in generator:
yield data
except Exception as e:
# this isn't triggered by a cancelled request
print(e)
finally:
# this always throws a StopAsyncIteration exception
# no matter whether the generator was consumed or not
leftover = await generator.__anext__()
if leftover:
print("we didn't finish")
else:
print("we finished")
@app.get("/")
async def infinite_stream():
return StreamingResponse(astreamer(ainfinite_generator()))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
这似乎是“消耗”异步生成器中的第async for in generator
一个astreamer
。StopAsyncIteration
在该循环之后,即使生成器是上面定义的“无限” ,进一步尝试进行下一次迭代也会失败并出现异常。
我已经查看了PEP-525,我唯一看到的是,如果将异常抛出到生成器中,它将导致任何进一步尝试从生成器中读取以抛出 StopAsyncIteration 异常,但我看不到那个将会发生。至少,我在 Starlette 的 StreamingResponse类中没有看到这一点(它似乎对“内容”没有多大作用)。生成器在执行完之后不会“释放”async for in gen
吗?