4

客观的

我的目标是使用音频流。从逻辑上讲,这是我的目标:

  1. 音频流来自 WebSocket A(FastAPI端点)
  2. 音频流被桥接到不同的 WebSocket,B,它将返回一个 JSON(Rev-ai的 WebSocket)
  3. Json 结果通过 WebSocket A 实时发回。因此,虽然音频流仍在进来。

可能的解决方案

为了解决这个问题,我有很多想法,但最终我一直在尝试过渡WebSocket AWebSocket B. 到目前为止,我的尝试涉及一个ConnectionManager类,其中包含一个Queue.queue. 音频流的块被添加到这个队列中,这样我们就不会直接从WebSocket A.

ConnectionManager还包含一个生成器方法,用于从队列中产生所有值。

我的 FastAPI 实现使用websocket A如下:

@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            chunk = await websocket.receive_bytes()
            manager.add_to_buffer(chunk)
    except KeyboardInterrupt:
        manager.disconnect()

在此摄取的同时,我希望有一个任务将我们的音频流桥接到WebSocket B,并将获得的值发送到WebSocket A。可以通过上述generator方法来消费音频流。

由于 WebSocket B 如何使用消息,因此生成器方法是必要的,如 Rev-ai 的示例所示

streamclient = RevAiStreamingClient(access_token, config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
    # return through websocket A this value
    print(response)

这是最大的挑战之一,因为我们需要将数据消耗到生成器中并实时获取结果。

最新尝试

我一直在尝试我的运气asyncio;据我了解,一种可能性是创建一个在后台运行的协程。我在这方面没有成功,但听起来很有希望。

我曾考虑通过FastAPI启动事件来触发它,但我无法实现并发。我尝试使用event_loops,但它给了我一个nested event loop相关的错误。

警告

如果您的洞察力认为 FastAPI 是可选的,在某种程度上 WebSocket A 也是如此。归根结底,最终目标是通过我们自己的 API 端点接收音频流,通过 Rev.ai 的 WebSocket 运行它,执行一些额外的处理,并将结果发回。

4

1 回答 1

9

websocket <-> websocket 的桥接器

下面是一个简单的 webscoket 代理示例,其中 websocketA和 websocketB都是 FastAPI 应用程序中的端点,但 websocketB可以位于其他地方,只需更改其地址即可ws_b_uri。对于 websocket 客户端,使用websockets库。

为了进行数据转发,Aendpoint 的代码启动了两个任务forwardreverse并通过asyncio.gather(). 两个方向的数据传输以并行方式发生。

import asyncio

from fastapi import FastAPI
from fastapi import WebSocket
import websockets
app = FastAPI()

ws_b_uri = "ws://localhost:8001/ws_b"


async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
    while True:
        data = await ws_a.receive_bytes()
        print("websocket A received:", data)
        await ws_b.send(data)


async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
    while True:
        data = await ws_b.recv()
        await ws_a.send_text(data)
        print("websocket A sent:", data)


@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
    await ws_a.accept()
    async with websockets.connect(ws_b_uri) as ws_b_client:
        fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
        rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
        await asyncio.gather(fwd_task, rev_task)


@app.websocket("/ws_b")
async def websocket_b(ws_b_server: WebSocket):
    await ws_b_server.accept()
    while True:
        data = await ws_b_server.receive_bytes()
        print("websocket B server recieved: ", data)
        await ws_b_server.send_text('{"response": "value from B server"}')

更新(网桥 websocket <-> 同步生成器)

考虑到问题的最后更新,问题是WebSocketB隐藏在同步生成器后面(实际上有两个,一个用于输入,另一个用于输出),实际上,任务变成了如何制作WebSocket 和同步生成器之间的桥梁。而且由于我从未使用过该rev-ai库,因此我制作了一个存根函数stream_client_startstreamclient.start它需要一个生成器(MEDIA_GENERATOR原始)并返回一个生成器(response_generator原始)。

在这种情况下,我通过 开始在单独的线程中处理生成器run_in_executor,并且为了不重新发明轮子,我使用janus库中的队列进行通信,它允许您通过队列绑定同步和异步代码。因此,也有两个队列,一个为A -> B,另一个为B -> A


import asyncio
import time
from typing import Generator
from fastapi import FastAPI
from fastapi import WebSocket
import janus
import queue

app = FastAPI()


# Stub generator function (using websocket B in internal)
def stream_client_start(input_gen: Generator) -> Generator:
    for chunk in input_gen:
        time.sleep(1)
        yield f"Get {chunk}"


# queue to generator auxiliary adapter
def queue_to_generator(sync_queue: queue.Queue) -> Generator:
    while True:
        yield sync_queue.get()


async def forward(ws_a: WebSocket, queue_b):
    while True:
        data = await ws_a.receive_bytes()
        print("websocket A received:", data)
        await queue_b.put(data)


async def reverse(ws_a: WebSocket, queue_b):
    while True:
        data = await queue_b.get()
        await ws_a.send_text(data)
        print("websocket A sent:", data)


def process_b_client(fwd_queue, rev_queue):
    response_generator = stream_client_start(queue_to_generator(fwd_queue))
    for r in response_generator:
        rev_queue.put(r)


@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
    loop = asyncio.get_event_loop()
    fwd_queue = janus.Queue()
    rev_queue = janus.Queue()
    await ws_a.accept()

    process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
    fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
    rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
    await asyncio.gather(process_client_task, fwd_task, rev_task)
于 2020-12-21T15:43:27.250 回答