4

背景
我有一个列表,其中包含经过预处理并保存为 .npy 二进制文件的数千个图像堆栈(3D numpy 数组)的路径。

案例研究我想计算所有图像的平均值,为了加快分析速度,我认为并行处理。

使用方法 dask.delayed

# List with the file names
flist_img_to_filter

# I chunk the list of paths in sublists. The number of chunks correspond to 
# the number of cores used for the analysis
chunked_list
# Scatter the images sublists to be able to process in parallel
futures = client.scatter(chunked_list)

# Create dask processing graph
output = []
for future in futures:
    ImgMean = delayed(partial_image_mean)(future)
    output.append(ImgMean)
    ImgMean_all = delayed(sum)(output)
    ImgMean_all = ImgMean_all/len(futures)

 # Compute the graph
 ImgMean = ImgMean_all.compute()

使用 dask.arrays 修改自Matthew Rocklin 博客的方法

imread = delayed(np.load, pure=True)  # Lazy version of imread
# Lazily evaluate imread on each path
lazy_values = [imread(img_path) for img_path in flist_img_to_filter]     

arrays = [da.from_delayed(lazy_value, dtype=np.uint16,shape=shape) for 
lazy_value in lazy_values]

# Stack all small Dask arrays into one
stack = da.stack(arrays, axis=0)

ImgMean = stack.mean(axis=0).compute()               

问题

1.dask.delayed方法中是否需要预先分块列表?如果我分散原始列表,我会为每个元素获得一个未来。有没有办法告诉工人处理它有权访问的期货?
2.dask.arrays方法明显较慢且内存使用率较高。这是使用 dask.arrays 的“坏方法”吗?
3.有没有更好的方法来解决这个问题?

谢谢!

4

1 回答 1

0

在 dask.delayed 方法中是否有必要预先分块列表?如果我分散原始列表,我会为每个元素获得一个未来。有没有办法告诉工人处理它有权访问的期货?

简单的答案是否定的,从 Dask 版本 0.15.4 开始,没有非常健壮的方法可以提交关于“该工作人员当前存在的某种类型的所有任务”的计算。

who_has但是,您可以使用或has_what客户端方法轻松地询问调度程序哪些键存在于调度程序上。

from dask.distributed import wait
import wait

futures = dask.persist(futures)
wait(futures)
client.who_has(futures)

dask.arrays 方法明显更慢并且内存使用率更高。这是使用 dask.arrays 的“坏方法”吗?

在调用 mean 以进行并行/内存权衡之前,您可能希望使用函数的split_every=关键字或您的数组将图像组合在一起(可能类似于您上面所做的)。meanrechunk

有没有更好的方法来解决这个问题?

您也可以尝试as_completed并在数据完成时计算运行方式。为此,您必须从延迟切换到期货

于 2017-10-26T12:47:58.063 回答