尝试构建多对多流式架构,我们打算使用 WebRTC 进行流式传输,使用 WebSockets 进行信号传输。我们计划创建从每个客户端到服务器的 PeerConnection,并将通过每个流接收到的数据放在 RabbitMQ 队列中,并将数据流式传输到其他客户端
我们怎样才能做到这一点?当新客户端想要流式传输数据时,我们如何使用多处理模块来创建新的服务器实例。我假设我们应该只有一个信令服务器,并且有多个基于没有客户端的对等连接。
信令.py
class WebSocket:
def __init__(self, handler, host='0.0.0.0', port=8765):
self.host = host
self.port = port
self.handler = handler
async def serve(self):
print('starting server')
async with websockets.serve(self.handler, self.host, self.port):
await asyncio.Future()
if __name__ == '__main__':
server = Server() # Server() instance has to dynamically created
socket = WebSocket(handler=server.handler) # A dynamic handler?
try:
asyncio.get_event_loop().run_until_complete(socket.serve())
except KeyboardInterrupt:
pass
finally:
asyncio.get_event_loop().close()
服务器.py
class Server:
def __init__(self, host='0.0.0.0', port=8765):
self.host = host
self.port = port
self.signaling = WebSocketSignaling(host, port)
self.pc = RTCPeerConnection()
async def consume_signaling(self):
while True:
obj = await self.signaling.receive()
if isinstance(obj, RTCSessionDescription):
await self.pc.setRemoteDescription(obj)
if obj.type == "offer":
await self.pc.setLocalDescription(await self.pc.createAnswer())
await self.signaling.send(self.pc.localDescription)
elif isinstance(obj, RTCIceCandidate):
await self.pc.addIceCandidate(obj)
elif obj is BYE:
print("Exiting")
break
async def handler(self, websocket, path):
await self.signaling.connect(websocket)
pc = self.pc
@pc.on("datachannel")
def on_datachannel(channel):
@channel.on("message")
async def on_message(message):
# parse chunks of data and add to queue
await self.consume_signaling()