我正在尝试将 dask 用于令人尴尬的并行工作负载。
例如,将两个大型数组逐个元素相加:
x1 = ...
x2 = ...
def func(a, b):
sleep(0.001 + len(a) * 1e-5)
return a + b + 1
我发现 [1] 似乎回答了这个问题,但是,我的函数并不昂贵,并且可以一次计算任意数量的数据。
我真的很想对数据进行分块以避免OOM,而 dask.array 似乎可以做到这一点。func
我设法通过包装让它“工作” da.as_gufunc(signature="(),()->()", output_dtypes=float, vectorize=True)
- 但是这会按元素应用函数,我需要它在块上。
我也尝试过dask.bag.from_sequence
- 但这会将数组转换为列表x1
,x2
这会导致 OOM(或可怕的性能)。
我最接近我想要的是:
from dask.diagnostics import ProgressBar
import zarr
import dask.array as da
d1 = da.ones((1024, 1024, 1024, 128),chunks=64)
d2 = da.ones((1024, 1024, 1024, 128),chunks=64)
d3 = d1 + d2 + 1
with ProgressBar():
d3.to_zarr("three.zarr")
但是我需要d1 + d2 + 1
用我的功能替换。
[1] https://examples.dask.org/applications/embarrassingly-parallel.html