问题标签 [streamz]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
256 浏览

python - Streamz/Dask:收集不等待缓冲区的所有结果

进口:

模拟工作量:

假设我想在本地 Dask 客户端上处理一些工作负载:

这按预期工作,但sink(print)当然会强制等待每个结果,因此流不会并行执行。

但是,如果我使用buffer()允许缓存结果,则gather()似乎不再正确收集所有结果,并且解释器在获得结果之前退出。这种方法:

...不会为我打印任何结果。Python 解释器只是在启动脚本后不久退出并 buffer()发出结果之前退出,因此不会打印任何内容。

但是,如果主进程被迫等待一段时间,结果将以并行方式打印(因此它们不会相互等待,而是几乎同时打印):

这是为什么?我认为gather()应该等待一批 10 个结果,因为buffer()在将它们刷新到gather(). 为什么gather()在这种情况下不阻塞?

有没有一种很好的方法来检查 Stream 是否仍然包含正在处理的元素以防止主进程过早退出?

0 投票
1 回答
210 浏览

python-xarray - Xarray NetCDF 文件的流式传输

我想知道是否有一种方法可以直接从 NetCDF 文件流式传输数据,因为它是用 `xarray.

我想我可以像这样“创建”一个非缓冲文件?

我也知道我可以用 Xarray 打开它:

但是,我不确定这些数组是否会不断更新?整个事情的目的如下:我有一个产生输出的数值模型,我想在模型运行时可视化一些变量以了解当前状态。我知道这得到了 Holoviews 人的支持:https ://hvplot.holoviz.org/user_guide/Streaming.html

streamz这需要我用图书馆制作自己的流吗?https://streamz.readthedocs.io/en/latest/index.html

关于如何让它为 netcdf 工作的任何提示都会很棒!

干杯,
保罗

0 投票
1 回答
262 浏览

python-3.x - Dask:持续提交,处理所有提交的数据

有 500 个,不断增长DataFrames,我想将(对于每个 DataFrame 独立)数据的操作提交到dask. 我的主要问题是:可以dask保存连续提交的数据,所以我可以submit对所有提交的数据进行函数处理——而不仅仅是新提交的数据?

但是让我们通过一个例子来解释它:

创建一个dask_server.py

现在我可以从 mymy_stream.py和 start 连接到submit数据gather

这可以按预期工作,而且非常快!!!

但接下来,我想实际上是在计算发生之前appendlines第一个- 想知道这是否可能?因此,在我们的示例中,我想计算已提交的所有行,而不仅仅是最后提交行。mean

问题/方法:

  1. 这种累积计算可行吗?
  2. 糟糕的替代方案 1:每次有新行到达时,我都会在本地缓存所有行并将submit 所有数据缓存到集群 。这就像一个指数开销。试过了,可以,但是很慢!
  3. 黄金选项:Python 程序 1 推送数据。可以将另一个客户端(来自另一个 python 程序)连接到该累积数据,并将分析逻辑从插入逻辑中移开。我认为发布的数据集是要走的路,但是是否适用于这种高速附加?

可能相关:分布式变量Actors Worker

0 投票
1 回答
101 浏览

websocket - 正确使用 streamz 和 websocket

我正在尝试找出使用streamz. 我的流数据是使用 加载的websocket-client,之后我这样做:

虽然这可行,但我发现我的流无法跟上流数据的速度(所以我在流中看到的数据比流数据落后几秒钟,这既是高容量又是高频率的数据)。我尝试将其设置gen.sleep(0.001)为较小的值(删除它会完全停止 jupyter 实验室),但问题仍然存在。

这是使用 websocket 将 streamz 与流数据连接的正确方法吗?

0 投票
0 回答
21 浏览

dask - Streamz + Dask 员工的入住率低于我的预期

我有一个 Dask 集群,在本地机器上运行了 32 个工作人员,并尝试针对它运行以下 Streamz 工作流:

在此处输入图像描述

我只看到在任何特定时间都有几个工人被占用:

在此处输入图像描述

我看到在本地运行时占用率增加:

client = Client(n_workers=32, processes=True, threads_per_worker=1, memory_limit='32GB')

但在任何给定时间仍然没有接近 32 名工人被占用(最多约 8 名)。

为什么会这样,为什么任务流显示的并行运行的任务比占用率建议的要多?