我被困在一个陌生的地方。我有一堆延迟的函数调用,我想按特定顺序执行。虽然并行执行是微不足道的:
res = client.compute([myfuncs])
res = client.gather(res)
我似乎找不到以非阻塞方式按顺序执行它们的方法。
这是一个最小的例子:
import numpy as np
from time import sleep
from datetime import datetime
from dask import delayed
from dask.distributed import LocalCluster, Client
@delayed
def dosomething(name):
res = {"name": name, "beg": datetime.now()}
sleep(np.random.randint(10))
res.update(rand=np.random.rand())
res.update(end=datetime.now())
return res
seq1 = [dosomething(name) for name in ["foo", "bar", "baz"]]
par1 = dosomething("whaat")
par2 = dosomething("ahem")
pipeline = [seq1, par1, par2]
给定上面的例子,我想并行运行seq1
,par1
和, 但: "foo", "bar", 和 "baz" 的组成部分是按顺序运行的。par2
seq1