0

我被困在一个陌生的地方。我有一堆延迟的函数调用,我想按特定顺序执行。虽然并行执行是微不足道的:

res = client.compute([myfuncs])
res = client.gather(res)

我似乎找不到以非阻塞方式按顺序执行它们的方法。

这是一个最小的例子:

import numpy as np
from time import sleep
from datetime import datetime

from dask import delayed
from dask.distributed import LocalCluster, Client


@delayed
def dosomething(name):
    res = {"name": name, "beg": datetime.now()}
    sleep(np.random.randint(10))
    res.update(rand=np.random.rand())
    res.update(end=datetime.now())
    return res


seq1 = [dosomething(name) for name in ["foo", "bar", "baz"]]
par1 = dosomething("whaat")
par2 = dosomething("ahem")
pipeline = [seq1, par1, par2]

给定上面的例子,我想并行运行seq1,par1和, 但: "foo", "bar", 和 "baz" 的组成部分是按顺序运行的。par2seq1

4

1 回答 1

1

您绝对可以作弊并为您的函数添加一个可选的依赖项,如下所示:

@dask.delayed
def dosomething(name, *args):
     ...

这样您就可以使任务相互依赖,即使您在下一次运行该函数时不使用一个结果:

inputs = ["foo", "bar", "baz"]
seq1 = [dosomething(inputs[0])]
for bit in inputs[1:]:
    seq1.append(dosomething(bit, seq1[-1]))

或者,您可以阅读分布式调度程序的“futures”界面,从而可以实时监控任务的进度。

于 2019-02-07T20:12:46.210 回答