问题标签 [streamz]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - Streamz/Dask:收集不等待缓冲区的所有结果
进口:
模拟工作量:
假设我想在本地 Dask 客户端上处理一些工作负载:
这按预期工作,但sink(print)
当然会强制等待每个结果,因此流不会并行执行。
但是,如果我使用buffer()
允许缓存结果,则gather()
似乎不再正确收集所有结果,并且解释器在获得结果之前退出。这种方法:
...不会为我打印任何结果。Python 解释器只是在启动脚本后不久退出并在 buffer()
发出结果之前退出,因此不会打印任何内容。
但是,如果主进程被迫等待一段时间,结果将以并行方式打印(因此它们不会相互等待,而是几乎同时打印):
这是为什么?我认为gather()
应该等待一批 10 个结果,因为buffer()
在将它们刷新到gather()
. 为什么gather()
在这种情况下不阻塞?
有没有一种很好的方法来检查 Stream 是否仍然包含正在处理的元素以防止主进程过早退出?
python-xarray - Xarray NetCDF 文件的流式传输
我想知道是否有一种方法可以直接从 NetCDF 文件流式传输数据,因为它是用 `xarray.
我想我可以像这样“创建”一个非缓冲文件?
我也知道我可以用 Xarray 打开它:
但是,我不确定这些数组是否会不断更新?整个事情的目的如下:我有一个产生输出的数值模型,我想在模型运行时可视化一些变量以了解当前状态。我知道这得到了 Holoviews 人的支持:https ://hvplot.holoviz.org/user_guide/Streaming.html
streamz
这需要我用图书馆制作自己的流吗?https://streamz.readthedocs.io/en/latest/index.html
关于如何让它为 netcdf 工作的任何提示都会很棒!
干杯,
保罗
python-3.x - Dask:持续提交,处理所有提交的数据
有 500 个,不断增长DataFrames
,我想将(对于每个 DataFrame 独立)数据的操作提交到dask
. 我的主要问题是:可以dask
保存连续提交的数据,所以我可以submit
对所有提交的数据进行函数处理——而不仅仅是新提交的数据?
但是让我们通过一个例子来解释它:
创建一个dask_server.py
:
现在我可以从 mymy_stream.py
和 start 连接到submit
数据gather
:
这可以按预期工作,而且非常快!!!
但接下来,我想实际上是在计算发生之前append
的lines
第一个- 想知道这是否可能?因此,在我们的示例中,我想计算已提交的所有行,而不仅仅是最后提交的行。mean
问题/方法:
- 这种累积计算可行吗?
- 糟糕的替代方案 1:每次有新行到达时,我都会在本地缓存所有行并将
submit
所有数据缓存到集群 。这就像一个指数开销。试过了,可以,但是很慢! - 黄金选项:Python 程序 1 推送数据。可以将另一个客户端(来自另一个 python 程序)连接到该累积数据,并将分析逻辑从插入逻辑中移开。我认为发布的数据集是要走的路,但是是否适用于这种高速附加?
websocket - 正确使用 streamz 和 websocket
我正在尝试找出使用streamz
. 我的流数据是使用 加载的websocket-client
,之后我这样做:
虽然这可行,但我发现我的流无法跟上流数据的速度(所以我在流中看到的数据比流数据落后几秒钟,这既是高容量又是高频率的数据)。我尝试将其设置gen.sleep(0.001)
为较小的值(删除它会完全停止 jupyter 实验室),但问题仍然存在。
这是使用 websocket 将 streamz 与流数据连接的正确方法吗?