我想用 Dask 做以下事情:
- 从 HDF5 文件加载矩阵
- 并行计算每个条目
这是我的代码:
def blocked_func(x):
return np.random.random()
with h5py.File(file_path) as f:
d = f['/data']
arr = da.from_array(d, chunks=(chunks_row, chunks_col))
arr2 = arr.map_blocks(blocked_func, dtype='float32').compute()
但是代码会抛出以下错误:
File ".../remote_fr_thinkpad/test_big_data.py", line 43, in <module>
arr2 = arr.map_blocks(blocked_func, dtype='float32').compute()
File ".../anaconda3/lib/python3.7/site-packages/dask/base.py", line 156, in compute
(result,) = compute(self, traverse=False, **kwargs)
File ".../anaconda3/lib/python3.7/site-packages/dask/base.py", line 399, in compute
return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ".../anaconda3/lib/python3.7/site-packages/dask/base.py", line 399, in <listcomp>
return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ".../anaconda3/lib/python3.7/site-packages/dask/array/core.py", line 779, in finalize
return concatenate3(results)
File ".../anaconda3/lib/python3.7/site-packages/dask/array/core.py", line 3497, in concatenate3
chunks = chunks_from_arrays(arrays)
File ".../anaconda3/lib/python3.7/site-packages/dask/array/core.py", line 3327, in chunks_from_arrays
result.append(tuple([shape(deepfirst(a))[dim] for a in arrays]))
File ".../anaconda3/lib/python3.7/site-packages/dask/array/core.py", line 3327, in <listcomp>
result.append(tuple([shape(deepfirst(a))[dim] for a in arrays]))
IndexError: tuple index out of range
我四处搜索并尝试了 dask 的 gu_func,但这引发了同样的错误。
谢谢你的帮助。