2

最近我陷入了“加密狂热”,并开始在一些交易所编写自己的 API 包装器。

特别是 Binance 有一个流式 websocket 端点。

您可以在其中流式传输数据,但通过 websocket 端点。我想我会用 sanic 自己尝试一下。

这是我的 websocket 路线

@ws_routes.websocket("/hello")
async def hello(request, ws):
    while True:
        await ws.send("hello")

现在我在 2 台不同的机器上有 2 个客户端连接到它

async def main():
    async with aiohttp.ClientSession() as session:

        ws  = await session.ws_connect("ws://192.168.86.31:8000/hello")
        while True:
            data = await ws.receive()
            print(data)

但是,只有一个客户端能够连接并接收来自服务器的发送数据。我假设由于while循环阻塞并阻止其他连接连接,因为它没有连接yield

我们如何在不阻塞其他连接的情况下使其流式传输到多个客户端?

我考虑增加更多的工人,这似乎可以解决问题,但我不明白那不是一个非常可扩展的解决方案。因为每个客户都是自己的工人,如果你有数千个甚至只有 10 个客户,那么每个客户 10 个工人 1 个。

那么币安是如何进行 websocket 流式传输的呢?或者地狱如何推特流端点工作?

它如何能够为多个并发客户端提供无限流?因为最终这就是我想要做的

4

2 回答 2

2

解决这个问题的方法是这样的。

我正在使用sanic框架

class Stream:
    def __init__(self):
        self._connected_clients = set()

    async def __call__(self, *args, **kwargs):
        await self.stream(*args, **kwargs)

    async def stream(self, request, ws):
        self._connected_clients.add(ws)

        while True:
            disconnected_clients = []
            for client in self._connected_clients:  # check for disconnected clients
                if client.state == 3:  # append to a list because error will be raised if removed from set while iterating over it 
                    disconnected_clients.append(client)
            for client in disconnected_clients:  # remove disconnected clients
                self._connected_clients.remove(client)

            await asyncio.wait([client.send("Hello") for client in self._connected_clients]))


ws_routes.add_websocket_route(Stream(), "/stream")
  1. 跟踪每个websocket会话
  2. 附加到一个listset
  3. 检查无效websocket会话并从websocket会话容器中删除
  4. 做一个await asyncio.wait([ws_session.send() for ws_session [list of valid sessions]])基本上是广播的。

5.利润!

这基本上是 pubsub 设计模式

于 2018-01-21T10:24:55.720 回答
1

可能是这样的?

import aiohttp
import asyncio
loop = asyncio.get_event_loop()
async def main():
    async with aiohttp.ClientSession() as session:
        ws  = await session.ws_connect("ws://192.168.86.31:8000/hello")
        while True:
            data = await ws.receive()
            print(data)

multiple_coroutines = [main() for _ in range(10)]
loop.run_until_complete(asyncio.gather(*multiple_coroutines))
于 2018-01-20T14:13:28.940 回答