我正在尝试使用看起来像这样的 dask 分发并行化嵌套循环:
@dask.delayed
def delayed_a(e):
a = do_something_with(e)
return something
@dask.delayed
def delayed_b(element):
computations = []
for e in element:
computations.add(delayed_a(e))
b = dask.compute(*computations, scheduler='distributed',
num_workers=4)
return b
list = [some thousands of elements here]
computations = []
for element in list:
computations.append(delayed_b(element))
results = dask.compute(*computations, scheduler='distributed',
num_workers=4)
如您所见,我正在使用distributed
调度程序。首先,我创建了一个computations
带有惰性delayed_b
函数的列表,该函数将list
. 然后,delayed_b
创建一组computations
正在调用delayed_a
函数的新集合,并且所有内容都在分布式中执行。这个伪代码正在工作,但我发现如果delayed_a
不存在它会更快。那么我的问题是——进行分布式并行 for 循环的正确方法是什么?
在历史的尽头,我想做的是:
list = [some thousands of elements here]
for element in list:
for e in element:
do_something_with(e)
对于使用dask.distributed
.