0

使用 Quart 我试图通过 websocket 从一个客户端接收数据,然后让 Quart websocket 服务器通过 websocket 将其发送到不同的客户端。

两个客户端将单独共享相同的 url,其他对客户端将拥有自己的 url。此回声测试分别适用于两个客户端:

@copilot_ext.websocket('/ws/<unique_id>')
async def ws(unique_id):
    while True:
        data = await websocket.receive()
        await websocket.send(f"echo {data}") 

我尝试使用此处的示例进行广播https://pgjones.gitlab.io/quart/tutorials/websocket_tutorial.html#broadcasting尽管我可以捕获并打印不同的 websocket,但从一个客户端向另一个客户端发送数据并没有太多运气:(

connected_websockets = set()

def collect_websocket(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        global connected_websockets
        send_channel, receive_channel = trio.open_memory_channel(2)
        connected_websockets.add(send_channel)
        try:
            return await func(send_channel, *args, **kwargs)
        finally:
            connected_websockets.remove(send_channel)
    return wrapper

@copilot_ext.websocket('/ws/<unique_id>')
@collect_websocket
async def ws(que, unique_id):
    while True:
        data = await websocket.receive()
        for send_channel in connected_websockets:
            await send_channel.send(f"message {data}")
            print(send_channel)

仅存储 websocket 对象并遍历它们也不起作用

connected_websockets = set()

@copilot_ext.websocket('/ws/<unique_id>')
async def ws(unique_id):
    global connected_websockets
    while True:
        data = await websocket.receive()
        connected_websockets.add(websocket)
        for websockett in connected_websockets:
            await websockett.send(f"message {data}")
            print(type(websockett))
4

1 回答 1

0

我认为这个片段可以构成您想要实现的目标的基础。这个想法是房间是由房间 ID 键入的队列的集合。然后每个连接的客户端在房间中都有一个队列,任何其他客户端都将消息发送到该队列。然后 send_task 在后台运行以将任何消息发送到其队列中的客户端。我希望这是有道理的,

import asyncio
from collections import defaultdict

from quart import Quart, websocket

app = Quart(__name__)

websocket_rooms = defaultdict(set)

async def send_task(ws, queue):
    while True:
        message = await queue.get()
        await ws.send(message)

@app.websocket("/ws/<id>/")
async def ws(id):
    global websocket_rooms
    queue = asyncio.Queue()
    websocket_rooms[id].add(queue)
    try:
        task = asyncio.ensure_future(send_task(websocket._get_current_object(), queue))
        while True:
            message = await websocket.receive()
            for other in websocket_rooms[id]:
                if other is not queue:
                    await other.put(message)
    finally:
        task.cancel()
        await task
        websocket_rooms[id].remove(queue)
于 2021-04-17T11:15:34.010 回答