我们正在考虑使用 dask,尤其是它的惰性计算和 dag 功能。
我们有一个中等复杂的计算 dag,输入未知。我们希望能够提前构建它,然后在不同的输入上使用它。
我认为我们可以使用 dict / tuple 接口来做到这一点:
from dask.threaded import get
import pandas as pd
power = lambda x, y: x**y
dsk = {'x': pd.Series(pd.np.random.rand(20)),
'y': 2,
'z': (power, 'x', 'y'),
'w': (sum, ['x', 'y', 'z'])}
然后我们就有dsk
了便携的 dag,可以x
用我们想要的任何东西替换。(实际上,我们最初不需要将其包含在上面)。
dsk['x'] = pd.Series(pd.np.random.rand(20))
get(dsk, 'w')
但是我们可以这样做dask.imperative
吗?我的初步结果表明我们无法达到x
:
x=pd.Series()
def filter_below_3(ds):
return ds[ds<3]
f=do(filter_below_3)
graph=f(x)
graph.dask
# {'filter_below_3-0ae5a18c-206d-4293-84b6-eb0d39243296': (<function __main__.filter_below_3>, [])}
有办法吗?