1

使用 FastAPI 我试图检测StreamingResponse是否已完全被客户端使用或是否已取消。

我有以下示例应用程序:

import asyncio

import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


async def ainfinite_generator():
    while True:
        yield b"some fake data "
        await asyncio.sleep(.001)


async def astreamer(generator):
    try:
        async for data in generator:
            yield data
    except Exception as e:
        # this isn't triggered by a cancelled request
        print(e)
    finally:
        # this always throws a StopAsyncIteration exception
        # no matter whether the generator was consumed or not
        leftover = await generator.__anext__()
        if leftover:
            print("we didn't finish")
        else:
            print("we finished")


@app.get("/")
async def infinite_stream():
    return StreamingResponse(astreamer(ainfinite_generator()))


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

这似乎是“消耗”异步生成器中的第async for in generator一个astreamerStopAsyncIteration在该循环之后,即使生成器是上面定义的“无限” ,进一步尝试进行下一次迭代也会失败并出现异常。

我已经查看了PEP-525,我唯一看到的是,如果将异常抛出到生成器中,它将导致任何进一步尝试从生成器中读取以抛出 StopAsyncIteration 异常,但我看不到那个将会发生。至少,我在 Starlette 的 StreamingResponse中没有看到这一点(它似乎对“内容”没有多大作用)。生成器在执行完之后不会“释放”async for in gen吗?

4

1 回答 1

1

下面的代码显示了如何监视协程(在我的例子中是异步生成器)上的取消。正如评论中提到的,如果异步生成器被取消,它会向生成器中注入一个异常,从那时起,任何尝试获取生成器中的下一个项目都会引发StopAsyncIteration异常。见PEP 525。要确定异步生成器是否被取消,只需尝试/asyncio.CancelledError排除异常(源自BaseException)。

这里还有一些代码用于展示如何处理普通的生成器,这有点宽容。GeneratorExit如果您保持相同的 try/except 流程,则如果它们被取消,则会引发异常。

棘手的部分是这些异常中的大多数都派生自BaseException类,StopIteration这与我期望的从Exception类派生的异常不同。

而且,顺便说一句,实际取消发生在starlette

import asyncio
import time

import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


def infinite_generator():
    # not blocking, so doesn't need to be async
    # but if it was blocking, you could make this async and await it
    while True:
        yield b"some fake data "


def finite_generator():
    # not blocking, so doesn't need to be async
    # but if it was blocking, you could make this async and await it
    x = 0
    while x < 10000:
        yield f"{x}"
        x += 1


async def astreamer(generator):
    try:
        # if it was an async generator we'd do:
        # "async for data in generator:"
        # (there is no yield from async_generator)
        for i in generator:
            yield i
            await asyncio.sleep(.001)

    except asyncio.CancelledError as e:
        print('cancelled')


def streamer(generator):
    try:
        # note: normally we would do "yield from generator"
        # but that won't work with next(generator) in the finally statement
        for i in generator:
            yield i
            time.sleep(.001)

    except GeneratorExit:
        print("cancelled")
    finally:
        # showing that we can check here to see if all data was consumed
        # the except statement above effectively does the same thing
        try:
            next(generator)
            print("we didn't finish")
            return
        except StopIteration:
            print("we finished")


@app.get("/infinite")
async def infinite_stream():
    return StreamingResponse(streamer(infinite_generator()))


@app.get("/finite")
async def finite_stream():
    return StreamingResponse(streamer(finite_generator()))


@app.get("/ainfinite")
async def infinite_stream():
    return StreamingResponse(astreamer(infinite_generator()))


@app.get("/afinite")
async def finite_stream():
    return StreamingResponse(astreamer(finite_generator()))


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
于 2020-07-20T14:57:01.597 回答