8

我有以下问题:给定一个运行 fastapi 的后端,它有一个流式端点,用于更新前端,我想在每次调用更新后端状态的函数时发送这些更新(可以通过计划的作业或命中并导致状态更新的不同端点)。

我想实现的一个简单版本是:

from fastapi import FastAPI
from starlette.responses import StreamingResponse

class State:
    def __init__(self):
        self.messages = []

    def update(self, new_messages):
        self.messages = new_messages
        # HERE: notify waiting stream endpoint

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            # HERE lies the question: wait for state to be update
            for message in state.messages:
                yield 'data: {}\n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(), media_type="text/event-stream")

我希望它永远运行下去。每次状态更新时,都会event_stream解除阻塞并发送消息。

我看过线程和异步,但我觉得我缺少一些关于如何在 python 中执行此操作的简单概念。

4

2 回答 2

18

FastAPI 基于 Starlette,Server-Sent Events插件可用于 Starlette

import asyncio
import uvicorn
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse

MESSAGE_STREAM_DELAY = 1  # second
MESSAGE_STREAM_RETRY_TIMEOUT = 15000  # milisecond
app = FastAPI()


@app.get('/stream')
async def message_stream(request: Request):
    def new_messages(): ...
    async def event_generator():
        while True:
            # If client was closed the connection
            if await request.is_disconnected():
                break

            # Checks for new messages and return them to client if any
            if new_messages():
                yield {
                        "event": "new_message",
                        "id": "message_id",
                        "retry": MESSAGE_STREAM_RETRY_TIMEOUT,
                        "data": "message_content"
                }

            await asyncio.sleep(MESSAGE_STREAM_DELAY)

    return EventSourceResponse(event_generator())


if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)

于 2020-07-09T14:19:48.843 回答
2

我能找到解决这个问题的最简单方法是使用threading.Condition.

因此变成了:

import threading

from fastapi import FastAPI
from starlette.responses import StreamingResponse

condition = threading.Condition()

class State:
    def __init__(self):
        self.messages = []

    def update(self, new_messages):
        self.messages = new_messages
        with condition:
            condition.notify()

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            with condition:
                condition.wait()

            for message in state.messages:
                yield 'data: {}\n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(), media_type="text/event-stream")




于 2019-11-17T09:28:46.660 回答