5

我有一长串 .zarr 数组,我想将它们合并到一个数组中并写入磁盘。

我的代码大致如下所示:

import dask.array
import zarr
import os

local_paths = ['parts/X_00000000.zarr',
 'parts/X_00000001.zarr',
 'parts/X_00000002.zarr',
 'parts/X_00000003.zarr',
 'parts/X_00000004.zarr',
 'parts/X_00000005.zarr',
 ...]

result_path = "testtest"
os.makedirs(result_path)

Xs = [dask.array.from_zarr(zarr.DirectoryStore(p)) for p in local_paths]
X = dask.array.concatenate(Xs, axis=0)
X = X.rechunk({0: 10000, 1:-1, 2:-1, 3:-1})
dask.array.to_zarr(X, zarr.DirectoryStore(result_path))

每个数组都local_paths包含一个 64x64 图像列表。这些列表都有不同的长度。所以第一个的形状可能是(100, 64, 64, 3),第二个的形状可能是(200, 64, 64, 3)

执行此代码的最后一行会导致我的内存完全耗尽,然后 Jupyter 笔记本完全崩溃(没有给我错误消息或异常)。

为了调查问题,我打印了任务图,因此将最后一行替换为以下两行:

k = dask.array.to_zarr(X, zarr.DirectoryStore(result_path), compute=False)
k.visualize()

它非常大(链接),所以我只截取了其中两个有趣的部分: 在此处输入图像描述

这种结构一直在重复。Dask 获取连接的输出,重新分配数据然后尝试存储它。注意粗黑条是重叠过渡的结果。

现在看看这些转换来自哪里:

在此处输入图像描述

查看create中间的节点。我假设这是图中创建 zarr DirectoryStore 的部分。节点的前身对create所有存储节点都有过渡!

这是我的猜测为什么 dask 内存不足。它首先尝试解决所有重新分块合并,当它应该创建 DirectoryStore 时,已经没有内存了。并且没有一个store节点可以执行,因为create节点是每个节点的先决条件。

我的假设是真的吗?如果是,我该怎么做才能强制 dask 先创建 DirectoryStore?如果没有,还有什么可能是我内存不足的问题?

UPDATE 当我使用dask.config.set(scheduler='single-threaded')DirectoryStore 的创建(创建节点)不是问题。我只是查看了输出目录,并且已经写入了一些文件。所以它必须是任务图本身,太大了。

在此处输入图像描述

4

0 回答 0