问题标签 [dask-delayed]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
98 浏览

python - 在 DataFrame 中使用 dask 进行有效的条件成对行操作的正确方法

我有以下顺序代码:

但是数据框中的行数接近 10 6。因此,我想以某种方式加快此操作。我正在考虑将 dask 与 group by 一起使用。以下是我的方法:

我不知道为什么,但我遇到了一个关键错误KeyError: 'l'。这也是使用 dask 的正确方法。

0 投票
1 回答
2326 浏览

dask - 使用 dask.delayed 和 pandas.DataFrame 将 dask.bag 字典转换为 dask.dataframe

我正在努力将dask.bag字典转换dask.delayed pandas.DataFrames成最终的dask.dataframe

我有一个函数(make_dict)将文件读入一个相当复杂的嵌套字典结构,另一个函数(make_df)将这些字典转换为一个pandas.DataFrame(每个文件的结果数据帧约为 100 mb)。我想将所有数据框附加到一个dask.dataframe中以进行进一步分析。

到目前为止,我一直在使用dask.delayed对象来加载、转换和附加所有工作正常的数据(参见下面的示例)。但是对于未来的工作,我想将加载的字典存储在dask.bagusing 中dask.persist()

我设法将数据加载到dask.bag中,从而生成一个 dicts 列表或pandas.DataFrame我可以在调用后在本地使用的列表compute()。但是,当我尝试将其dask.bag转换为dask.dataframeusingto_delayed()时,我遇到了一个错误(见下文)。

感觉好像我在这里遗漏了一些相当简单的东西,或者我的方法可能dask.bag是错误的?

下面的示例显示了我使用简化函数的方法并引发了相同的错误。任何有关如何解决此问题的建议表示赞赏。

我最终想使用分布式调度程序做什么:

0 投票
1 回答
322 浏览

dask - 如何在与提交它的机器不同的机器上获得 Dask 计算的结果?

我在 Django 服务器后面使用 Dask,这里总结了我的基本设置:https : //github.com/MoonVision/django-dask-demo/ 可以在这里找到 Dask 客户端:https ://github.com /MoonVision/django-dask-demo/blob/master/demo/daskmanager/daskmanager.py

我希望能够将任务的保存与提交它的服务器分开,以实现稳健性和可扩展性。我还想了解有关任务处理状态的更详细信息,现在即使任务正在处理,未来状态也始终处于待处理状态。粗略估计完成百分比也很好。

现在,如果 Web 服务器死了,客户端将被删除并且任务将停止,因为没有客户端仍然持有未来。我可以通过使用fire_and_forget来解决这个问题,但是我无法在任务完成时保存任务状态和结果。

我看到跟踪状态并在 fire_and_forget 后保存结果的方法:

  1. 我可以有一个调度程序插件,将所有传输发送到 AMPQ 服务器(RabbitMQ)。我喜欢它的健壮性,并且能够订阅调度程序输出的某些消息,并且知道每条消息都会被处理。我不确定如何使用这种方法自行获得结果。我可以手动将一个节点添加到每个图表的末尾以保存结果,但宁愿让它在幕后。

  2. get_task_stream在单独的服务器上或以某种方式使用它。有了这个,如果服务器关闭,我似乎可能会错过一些消息,所以这似乎是一个更糟糕的选择 1。

  3. 其他选择?

实现这一目标的最佳方法是什么?

编辑:刚刚测试过,似乎当提交任务的客户端关闭时,它创建的所有期货都从处理转移到遗忘,即使调用 fire_and_forget。

0 投票
1 回答
296 浏览

dask - 如何在每个工作人员上延迟 dask 以允许顺序执行进程?

我需要工人一次处理一个任务,并在开始新的任务之前完成当前流程。我无法做到:(1)每个工人在任何时候最多运行一个任务,(2)让工人在开始新的程序之前完成一个程序;原子事务。

我在具有 40 个节点的集群上使用 dask.distributed Client;4 核和 15GB 内存。我处理的管道有大约 8-10GB 的任务,因此在一个工作上有两个任务将导致应用程序失败。

我试图分配我的工人资源和任务分配,dask-worker scheduler-ip:port --nprocs 1 --resources process=1futures = [client.submit(func, f, resources={'process': 1}) for f in futures]没有成功。

我的代码如下:

截至目前,我有两种情况:

1-我运行所有输入并且应用程序以MemoryError

2-我运行一个子样本,但它运行如下:

加载(img-1)->加载(img-2)->foo(img-1)->加载(img-3)->...->保存(img-1)->保存(img-2 )->...

TLDR:这就是我想对每个工人做的事情:

加载(img-1)->foo(img-1)->保存(img-1)->加载(img-7)->...

0 投票
1 回答
42 浏览

dask - 如何解释 Dask 可视化中的后缀数字?

使用 dask 可视化图形时,生成的图形有 2 种节点。方形节点和圆形节点。

一般来说,方形节点似乎是实际值。而圆圈是产生这些值的函数。然而,这些似乎在这里有些复杂。

方形节点总是以类似#1或的数字作为后缀#0。我想知道为什么有些正方形节点是#0,有些正方形节点是#1。有时 aDelayedLeaf可以用#0. 但其他时候,任务的输出被可视化为#1. 管理这个后缀的规则是什么?

fst.png:

fst.png

snd.png:

snd.png

输出.png:

输出.png

0 投票
1 回答
29 浏览

dask - Dask:为可视化()选择什么函数变量最好

我试图更深入地了解 Dask 的延迟,所以我决定在这里研究这些示例。我修改了一些代码以反映我想如何使用 Dask(见下文)。但结果与我预期的不同,即。元组与列表。当我尝试应用 '.visualize()' 来查看执行图的样子时,我什么也得不到。

我研究了“delayed.ipynb”中的所有示例,它们都可以正常工作,包括所有的可视化。然后,我修改了一个示例的“for”循环:

对函数调用使用列表推导。结果是原始工作示例的变体。

我尝试打印出“结果”(函数调用),它提供了预期的延迟调用列表,但是当我打印“计算”的结果时,我意外地将所需的列表作为元组的一部分。为什么我不得到一份清单?

当我尝试“可视化”执行图时,我什么也得不到。我期望在生成的列表中看到尽可能多的节点。

我认为我没有对示例进行任何重大修改,所以我不明白什么?

0 投票
1 回答
204 浏览

python - 在 Dask 中使用尚未实现的 Pandas 函数

我相信我在 Dask 教程之一中看到了有关如何使用 Dask 框架中尚未实现的 Pandas 函数的建议,但在使用 Dask 数据帧时,我似乎放错了地方。例如,我想使用 Pandas 函数“ewm”。

作为一种解决方法,我将我的 Dask 数据帧转换为 Pandas 数据帧,在 Pandas 数据帧上运行 ewm,然后将它们转换回 Dask,以便以后进行更多内存密集型操作。不是最有效的。

有没有更好的策略呢?

0 投票
1 回答
1641 浏览

python - Dask 延迟最佳实践的批处理指令是如何工作的?

我想我遗漏了一些东西(仍然是 Dask Noob),但我正在尝试批处理建议以避免从这里执行过多的 Dask 任务:

https://docs.dask.org/en/latest/delayed-best-practices.html

并且不能让它们工作。这是我尝试过的:

批次现在包含延迟对象:

但是当我计算它们时,我得到批处理函数指针(我认为??):

我有两个问题:

  1. 这真的应该如何运行,因为它似乎与页面的第一行相矛盾,因为它立即Best practices运行而不是懒惰。delayed(f(x))

  2. 如何获得上述批量运行的结果?

0 投票
1 回答
1384 浏览

python - 为什么dask不并行执行

有人可以指出我在执行 dask 时做错了什么,因为它似乎没有使用多核。

[用可重现的代码更新]

使用 dask的代码:

没有 dask的正常实现代码:

那么,如果没有 dask 实际上更快,那怎么可能呢?

0 投票
0 回答
385 浏览

python - 在 dask 中生成批量图像

我刚开始是dask因为它提供了强大的并行处理能力。我的磁盘上有一些40000图像,我将使用这些图像来构建一个使用一些 DL 库的分类器,比如Kerasor TF。我在 pandas 数据框中收集了这个元信息(图像路径和相应的标签),如下所示:

现在这是我的简单任务:使用 dask 以一种lazy时尚的方式读取图像和相应的标签。batch size对图像进行一些处理,并在32中将批次传递给分类器。

  1. 定义读取和预处理函数:

    /li>
  2. 获取延迟的 dask 数组:

    /li>

现在这是我的问题:

Q1。如何从这个数组batch中获取数据?batch_size=32这相当于现在的惰性生成器吗?如果不是,可以让它表现得像一个人吗?

Q2。如何选择有效chunksize的更好的批量生成?例如,如果我有4核心,图像的大小是(224,224,3),我怎样才能使我的批处理高效?