0

我正在尝试找出使用streamz. 我的流数据是使用 加载的websocket-client,之后我这样做:

# open a stream and push updates into the stream
stream = Stream()

# establish a connection
ws = create_connection("ws://localhost:8765")

# get continuous updates
from tornado import gen
from tornado.ioloop import IOLoop

async def f():
    while True:
        await gen.sleep(0.001)
        data = ws.recv()
        stream.emit(data)
        
IOLoop.current().add_callback(f)

虽然这可行,但我发现我的流无法跟上流数据的速度(所以我在流中看到的数据比流数据落后几秒钟,这既是高容量又是高频率的数据)。我尝试将其设置gen.sleep(0.001)为较小的值(删除它会完全停止 jupyter 实验室),但问题仍然存在。

这是使用 websocket 将 streamz 与流数据连接的正确方法吗?

4

1 回答 1

2

我认为不websocket-client提供异步 API,因此它阻塞了事件循环。

您应该使用异步 websocket 客户端,例如Tornado 提供的客户端:

from tornado.websocket import websocket_connect

ws = websocket_connect("ws://localhost:8765")

async def f():
    while True:
        data = await ws.read_message()

        if data is None:
            break
        else:
            await stream.emit(data)

        # considering you're receiving data from a localhost
        # socket, it will be really fast, and the `await` 
        # statement above won't pause the while-loop for 
        # enough time for the event loop to have chance to 
        # run other things.
        # Therefore, sleep for a small time to suspend the 
        # while-loop.

        await gen.sleep(0.0001) 

如果您正在从/向远程连接接收/发送数据,那么您不需要睡觉,这将足够慢以暂停await语句处的 while 循环。

于 2021-03-22T05:58:09.183 回答