我正在构建一个非常大的 DAG 以提交给分布式调度程序,其中节点对本身可能非常大的数据帧进行操作。一种模式是我有大约 50-60 个函数来加载数据并构建每个数百 MB 的 pandas 数据帧(并在逻辑上表示单个表的分区)。我想将这些连接到图中下游节点的单个 dask 数据帧中,同时最大限度地减少数据移动。我像这样链接任务:
dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
dfs = [dask.delayed(pandas_to_dask)(df) for df in dfs]
return dask.delayed(concat_all)(dfs)
在哪里
def pandas_to_dask(df):
return dask.dataframe.from_pandas(df).to_delayed()
我已经尝试了各种concat_all
实施,但这似乎是合理的:
def concat_all(dfs):
dfs = [dask.dataframe.from_delayed(df) for df in dfs]
return dask.dataframe.multi.concat(dfs, axis='index', join='inner')
所有 pandas 数据框的索引都是不相交的,并且是排序/单调的。
concat_all
但是,即使每个人的内存预算实际上都相当大,我也不希望它会移动数据,但我正在杀死死于此功能的工作人员(集群管理器正在杀死他们超过他们的内存预算)。我有理由确定,compute()
在使用 dask 数据帧的图形节点中调用之前,我总是切分到合理的数据子集。
--memory-limit
到目前为止,我正在玩但没有成功。我至少正确地解决了这个问题吗?有没有我遗漏的注意事项?