问题标签 [dask-delayed]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
dask - 使用 Dask 计算会导致执行挂起
这是我之前关于使用 Dask 计算访问大数组中的一个元素的问题之一的潜在答案的后续问题。
为什么使用 Dask 计算会导致执行在下面挂起?这是工作代码片段:
python - 嵌套的 dask.compute 不阻塞
dask.compute(...) 应该是一个阻塞调用。但是,当我嵌套了 dask.compute,而内部的执行 I/O(如 dask.dataframe.read_parquet)时,内部的 dask.compute 不会阻塞。这是一个伪代码示例:
如果我启动 2 个工作人员,每个工作人员有 8 个进程,例如:
,那么我预计最多 2 x 8 个并发的 inner_func 正在运行,因为 inner_func(files).compute() 应该是阻塞的。然而,我观察到的是,在一个工作进程中,一旦它开始 read_parquet 步骤,可能会有另一个 inner_func(files).compute() 开始运行。所以最后可能会有多个 inner_func(files).compute() 运行,有时它可能会导致内存不足错误。
这是预期的行为吗?如果是这样,有什么方法可以强制每个工作进程执行一个 inner_func(files).compute() 吗?
dask - Dask Delayed 忽略因变量的名称
在使用创建计算图时,delayed
我试图分配名称,以便在可视化该图时它是可读的。但是,对于依赖于函数的延迟变量,该name
参数似乎不会影响键。这是一个玩具示例:
您可以在此处看到可视化(我无法嵌入图像),但我看到的不是“avg_result”,而是“calc_avg-#0”,而不是“ratio_result”,我看到的是“calc_ratio-#1”。如果我查看x.key
或y.key
它们与我提供的名称不匹配。这是预期的行为吗?
python - 使用 Dask 并行化 HDF 读-译-写
TL;DR:我们在将 Pandas 代码与从同一个 HDF 读取和写入的 Dask 并行化时遇到问题
我正在从事一个通常需要三个步骤的项目:读取、翻译(或组合数据)和写入这些数据。就上下文而言,我们正在处理医疗记录,我们收到不同格式的索赔,将它们转换为标准化格式,然后将它们重新写入磁盘。理想情况下,我希望以某种形式保存中间数据集,以便以后可以通过 Python/Pandas 访问。
目前,我选择 HDF 作为我的数据存储格式,但是我遇到了运行时问题。在大量人口中,我的代码目前可能需要几天时间。这导致我调查 Dask,但我不确定我是否已将 Dask 最好地应用于我的情况。
下面是我的工作流程的一个工作示例,希望有足够的示例数据来了解运行时问题。
读取(在本例中为创建)数据
翻译/写入数据
顺序方法
上面的代码在我的机器上运行大约需要 9 分钟。
达斯克方法
这种 Dask 方法需要 13 秒(!)
虽然这是一个很大的改进,但我们通常对以下几点感到好奇:
鉴于这个简单的例子,使用 Dask 数据帧、连接它们并使用 groupby/apply 的方法是最好的方法吗?
实际上,我们有多个这样的进程,它们从同一个 HDF 读取,并写入同一个 HDF。我们最初的代码库的结构允许一次运行整个工作流程
member_id
。当我们尝试并行化它们时,它有时会在小样本上工作,但大多数时候会产生分段错误。像这样的并行化工作流程,使用 HDF 读/写是否存在已知问题?我们也在努力制作一个这样的例子,但我们想我们会在这里发布这个,以防触发建议(或者如果这个代码可以帮助面临类似问题的人)。
任何和所有反馈表示赞赏!
arrays - 多个图像意味着 dask.delayed 与 dask.array
背景
我有一个列表,其中包含经过预处理并保存为 .npy 二进制文件的数千个图像堆栈(3D numpy 数组)的路径。
案例研究我想计算所有图像的平均值,为了加快分析速度,我认为并行处理。
使用方法 dask.delayed
使用 dask.arrays
修改自Matthew Rocklin 博客的方法
问题
1.在dask.delayed
方法中是否需要预先分块列表?如果我分散原始列表,我会为每个元素获得一个未来。有没有办法告诉工人处理它有权访问的期货?
2.该dask.arrays
方法明显较慢且内存使用率较高。这是使用 dask.arrays 的“坏方法”吗?
3.有没有更好的方法来解决这个问题?
谢谢!
dask - 懒惰地从 CSV 加载 dask 数据帧(内部延迟)
在使用 dask.distributed 时,我试图在 S3 上的延迟函数中从 CSV 加载 dask 数据帧,如下所示:
read_csv()不需要与分布式客户端交互,所以我认为这是可能的。然后在客户端计算机上计算func1返回的延迟对象。
在那之前它看起来不错,打印结果
但是,Failed to serialize (<dask.bytes.core.OpenFile object at ...>, ..., ..., '\n'). Exception: can't pickle thread.lock objects
当我尝试进一步处理它时它失败了,例如
有没有办法让这个方案发挥作用,或者我在这里错过了一些更基本的限制?
PS。我得到的错误日志:
python - 使用延迟加载将 LAZ 读取到 Dask 数据帧
操作 将多个 LAZ 点云文件读取到 Dask DataFrame。
问题
将 LAZ(压缩)解压缩为 LAS(未压缩)需要大量内存。Dask 创建的不同文件大小和多个进程会导致MemoryError
's.
尝试
我尝试限制遵循指南的工人数量,但似乎不起作用。
问题 如何以非标准格式加载如此大量的数据?
例子
以下示例是我当前的实现。它每 5 个对所有输入文件进行分组,以限制最多 5 个并行解压缩进程。然后重新分区并写入 Parquet 以启用进一步处理。对我来说,这个实现似乎完全错过了 Dask 的重点。
dask - dask dataframe to parquet 因内存错误而失败
我从多个 hdfs 文件创建了 dask 数据帧,然后尝试将最终数据帧写回 hdfs(parquet)。但它因内存错误消息而失败。
最终的 dask 数据框将有 100 万条记录和 600 列。
Dask 集群大小:5 个节点,每个节点具有 55G 内存。
例外:
请指出我正确的方向来克服这个问题。