0

我正在尝试将 dask 用于令人尴尬的并行工作负载。

例如,将两个大型数组逐个元素相加:

x1 = ...
x2 = ...

def func(a, b):
    sleep(0.001 + len(a) * 1e-5)
    return a + b + 1

我发现 [1] 似乎回答了这个问题,但是,我的函数并不昂贵,并且可以一次计算任意数量的数据。

我真的很想对数据进行分块以避免OOM,而 dask.array 似乎可以做到这一点。func我设法通过包装让它“工作” da.as_gufunc(signature="(),()->()", output_dtypes=float, vectorize=True)- 但是这会按元素应用函数,我需要它在块上。

我也尝试过dask.bag.from_sequence- 但这会将数组转换为列表x1x2这会导致 OOM(或可怕的性能)。

我最接近我想要的是:

from dask.diagnostics import ProgressBar
import zarr
import dask.array as da


d1 = da.ones((1024, 1024, 1024, 128),chunks=64)
d2 = da.ones((1024, 1024, 1024, 128),chunks=64)

d3 = d1 + d2 + 1
                                                                                                                                                                                                                      
with ProgressBar():
    d3.to_zarr("three.zarr")

但是我需要d1 + d2 + 1用我的功能替换。

[1] https://examples.dask.org/applications/embarrassingly-parallel.html

4

1 回答 1

0

看来我快到了:

da.map_blocks允许以预期的方式应用功能:

d3 = da.map_blocks(func, d1, d2)
                                                                                                                                                                                                                    
with ProgressBar():
    d3.to_zarr("three.zarr")
于 2021-11-11T14:20:01.440 回答