0

尝试构建多对多流式架构,我们打算使用 WebRTC 进行流式传输,使用 WebSockets 进行信号传输。我们计划创建从每个客户端到服务器的 PeerConnection,并将通过每个流接收到的数据放在 RabbitMQ 队列中,并将数据流式传输到其他客户端

我们怎样才能做到这一点?当新客户端想要流式传输数据时,我们如何使用多处理模块来创建新的服务器实例。我假设我们应该只有一个信令服务器,并且有多个基于没有客户端的对等连接。

信令.py

class WebSocket:
    def __init__(self, handler, host='0.0.0.0', port=8765):
        self.host = host
        self.port = port
        self.handler = handler

    async def serve(self):
        print('starting server')
        async with websockets.serve(self.handler, self.host, self.port):
            await asyncio.Future()


if __name__ == '__main__':
    server = Server() # Server() instance has to dynamically created
    socket = WebSocket(handler=server.handler) # A dynamic handler? 
    try:
        asyncio.get_event_loop().run_until_complete(socket.serve())
    except KeyboardInterrupt:
        pass
    finally:
        asyncio.get_event_loop().close()

服务器.py

class Server:
    def __init__(self, host='0.0.0.0', port=8765):
        self.host = host
        self.port = port
        self.signaling = WebSocketSignaling(host, port)
        self.pc = RTCPeerConnection()

    async def consume_signaling(self):
        while True:
            obj = await self.signaling.receive()
            if isinstance(obj, RTCSessionDescription):
                await self.pc.setRemoteDescription(obj)
                if obj.type == "offer":
                    await self.pc.setLocalDescription(await self.pc.createAnswer())
                    await self.signaling.send(self.pc.localDescription)
            elif isinstance(obj, RTCIceCandidate):
                await self.pc.addIceCandidate(obj)
            elif obj is BYE:
                print("Exiting")
                break

    async def handler(self, websocket, path):
        await self.signaling.connect(websocket)
        pc = self.pc

        @pc.on("datachannel")
        def on_datachannel(channel):
            @channel.on("message")
            async def on_message(message):
               # parse chunks of data and add to queue


        await self.consume_signaling()
4

0 回答 0