问题标签 [dask-delayed]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 在 DataFrame 中使用 dask 进行有效的条件成对行操作的正确方法
我有以下顺序代码:
但是数据框中的行数接近 10 6。因此,我想以某种方式加快此操作。我正在考虑将 dask 与 group by 一起使用。以下是我的方法:
我不知道为什么,但我遇到了一个关键错误KeyError: 'l'
。这也是使用 dask 的正确方法。
dask - 使用 dask.delayed 和 pandas.DataFrame 将 dask.bag 字典转换为 dask.dataframe
我正在努力将dask.bag
字典转换dask.delayed
pandas.DataFrames
成最终的dask.dataframe
我有一个函数(make_dict)将文件读入一个相当复杂的嵌套字典结构,另一个函数(make_df)将这些字典转换为一个pandas.DataFrame
(每个文件的结果数据帧约为 100 mb)。我想将所有数据框附加到一个dask.dataframe
中以进行进一步分析。
到目前为止,我一直在使用dask.delayed
对象来加载、转换和附加所有工作正常的数据(参见下面的示例)。但是对于未来的工作,我想将加载的字典存储在dask.bag
using 中dask.persist()
。
我设法将数据加载到dask.bag
中,从而生成一个 dicts 列表或pandas.DataFrame
我可以在调用后在本地使用的列表compute()
。但是,当我尝试将其dask.bag
转换为dask.dataframe
usingto_delayed()
时,我遇到了一个错误(见下文)。
感觉好像我在这里遗漏了一些相当简单的东西,或者我的方法可能dask.bag
是错误的?
下面的示例显示了我使用简化函数的方法并引发了相同的错误。任何有关如何解决此问题的建议表示赞赏。
我最终想使用分布式调度程序做什么:
dask - 如何在与提交它的机器不同的机器上获得 Dask 计算的结果?
我在 Django 服务器后面使用 Dask,这里总结了我的基本设置:https : //github.com/MoonVision/django-dask-demo/ 可以在这里找到 Dask 客户端:https ://github.com /MoonVision/django-dask-demo/blob/master/demo/daskmanager/daskmanager.py
我希望能够将任务的保存与提交它的服务器分开,以实现稳健性和可扩展性。我还想了解有关任务处理状态的更详细信息,现在即使任务正在处理,未来状态也始终处于待处理状态。粗略估计完成百分比也很好。
现在,如果 Web 服务器死了,客户端将被删除并且任务将停止,因为没有客户端仍然持有未来。我可以通过使用fire_and_forget来解决这个问题,但是我无法在任务完成时保存任务状态和结果。
我看到跟踪状态并在 fire_and_forget 后保存结果的方法:
我可以有一个调度程序插件,将所有传输发送到 AMPQ 服务器(RabbitMQ)。我喜欢它的健壮性,并且能够订阅调度程序输出的某些消息,并且知道每条消息都会被处理。我不确定如何使用这种方法自行获得结果。我可以手动将一个节点添加到每个图表的末尾以保存结果,但宁愿让它在幕后。
get_task_stream在单独的服务器上或以某种方式使用它。有了这个,如果服务器关闭,我似乎可能会错过一些消息,所以这似乎是一个更糟糕的选择 1。
其他选择?
实现这一目标的最佳方法是什么?
编辑:刚刚测试过,似乎当提交任务的客户端关闭时,它创建的所有期货都从处理转移到遗忘,即使调用 fire_and_forget。
dask - 如何在每个工作人员上延迟 dask 以允许顺序执行进程?
我需要工人一次处理一个任务,并在开始新的任务之前完成当前流程。我无法做到:(1)每个工人在任何时候最多运行一个任务,(2)让工人在开始新的程序之前完成一个程序;原子事务。
我在具有 40 个节点的集群上使用 dask.distributed Client;4 核和 15GB 内存。我处理的管道有大约 8-10GB 的任务,因此在一个工作上有两个任务将导致应用程序失败。
我试图分配我的工人资源和任务分配,dask-worker scheduler-ip:port --nprocs 1 --resources process=1
但futures = [client.submit(func, f, resources={'process': 1}) for f in futures]
没有成功。
我的代码如下:
截至目前,我有两种情况:
1-我运行所有输入并且应用程序以MemoryError
2-我运行一个子样本,但它运行如下:
加载(img-1)->加载(img-2)->foo(img-1)->加载(img-3)->...->保存(img-1)->保存(img-2 )->...
TLDR:这就是我想对每个工人做的事情:
加载(img-1)->foo(img-1)->保存(img-1)->加载(img-7)->...
dask - Dask:为可视化()选择什么函数变量最好
我试图更深入地了解 Dask 的延迟,所以我决定在这里研究这些示例。我修改了一些代码以反映我想如何使用 Dask(见下文)。但结果与我预期的不同,即。元组与列表。当我尝试应用 '.visualize()' 来查看执行图的样子时,我什么也得不到。
我研究了“delayed.ipynb”中的所有示例,它们都可以正常工作,包括所有的可视化。然后,我修改了一个示例的“for”循环:
对函数调用使用列表推导。结果是原始工作示例的变体。
我尝试打印出“结果”(函数调用),它提供了预期的延迟调用列表,但是当我打印“计算”的结果时,我意外地将所需的列表作为元组的一部分。为什么我不得到一份清单?
当我尝试“可视化”执行图时,我什么也得不到。我期望在生成的列表中看到尽可能多的节点。
我认为我没有对示例进行任何重大修改,所以我不明白什么?
python - 在 Dask 中使用尚未实现的 Pandas 函数
我相信我在 Dask 教程之一中看到了有关如何使用 Dask 框架中尚未实现的 Pandas 函数的建议,但在使用 Dask 数据帧时,我似乎放错了地方。例如,我想使用 Pandas 函数“ewm”。
作为一种解决方法,我将我的 Dask 数据帧转换为 Pandas 数据帧,在 Pandas 数据帧上运行 ewm,然后将它们转换回 Dask,以便以后进行更多内存密集型操作。不是最有效的。
有没有更好的策略呢?
python - Dask 延迟最佳实践的批处理指令是如何工作的?
我想我遗漏了一些东西(仍然是 Dask Noob),但我正在尝试批处理建议以避免从这里执行过多的 Dask 任务:
https://docs.dask.org/en/latest/delayed-best-practices.html
并且不能让它们工作。这是我尝试过的:
批次现在包含延迟对象:
但是当我计算它们时,我得到批处理函数指针(我认为??):
我有两个问题:
这真的应该如何运行,因为它似乎与页面的第一行相矛盾,因为它会立即
Best practices
运行而不是懒惰。delayed(f(x))
如何获得上述批量运行的结果?
python - 为什么dask不并行执行
有人可以指出我在执行 dask 时做错了什么,因为它似乎没有使用多核。
[用可重现的代码更新]
使用 dask的代码:
没有 dask的正常实现代码:
那么,如果没有 dask 实际上更快,那怎么可能呢?
python - 在 dask 中生成批量图像
我刚开始是dask
因为它提供了强大的并行处理能力。我的磁盘上有一些40000
图像,我将使用这些图像来构建一个使用一些 DL 库的分类器,比如Keras
or TF
。我在 pandas 数据框中收集了这个元信息(图像路径和相应的标签),如下所示:
现在这是我的简单任务:使用 dask 以一种lazy
时尚的方式读取图像和相应的标签。batch size
对图像进行一些处理,并在32中将批次传递给分类器。
定义读取和预处理函数:
/li>获取延迟的 dask 数组:
/li>
现在这是我的问题:
Q1。如何从这个数组batch
中获取数据?batch_size=32
这相当于现在的惰性生成器吗?如果不是,可以让它表现得像一个人吗?
Q2。如何选择有效chunksize
的更好的批量生成?例如,如果我有4
核心,图像的大小是(224,224,3)
,我怎样才能使我的批处理高效?