3

如何.from_delayed()从一个延迟序列并行计算?

def foo():
    df1, df2 = ...  # prepare two pd.DataFrame() in one foo() call
    return df1, df2

dds = [dask.delayed(foo)() for _ in range(5)]  # 5 delayed pairs (df1, df2)...
df1 = dd.from_delayed([d[0] for d in dds], meta=...)
df2 = dd.from_delayed([d[1] for d in dds], meta=...)
client.compute([
    df1.to_parquet(file1, write_index=True, engine='fastparquet', compute=False),
    df2.to_parquet(file2, write_index=True, engine='fastparquet', compute=False)
], sync=True)

这里foo()将被调用10次。是否可以创建图形以便仅调用5次?

谢谢

4

1 回答 1

1

感谢您提供清晰的示例。原则上你是正确的,foo应该只调用五次。我的猜测是优化在这里行为不端。短期内,我建议从最近的版本中尝试以下内容:

dask.config.set({"optimization.fuse.active": False})

... your code follows
于 2020-05-23T17:12:35.590 回答