1

我有一个数据集,每个文件包含一个时间序列。我真的很高兴 dask 如何处理我们集群上的 ~1k 文件(在我的例子中是一个目录)。但我有大约 50 个目录。

发生的有趣的事情是,构建 dask 图似乎比实际问题消耗更多的内存和 CPU。这仅在调度程序上。以下最低限度的代码应该只创建图表,但似乎已经在调度程序上做了很多熊猫的东西:

    df=intake.open_csv(TRAIN_PATH+"{folder_name}/{file_name}.csv",csv_kwargs={"dtype": dtypes.to_dict()}).to_dask()
    features=df.groupby(['folder_name','file_name']).agg(["min","max"])

注意:我在这里的模式使用摄入量。我也一直在使用read_csvfrom dask with include_path_column=Trueand pathas group。我设法使上述步骤更快,但随后features.compute()似乎有效地扩展了导致相同情况的图表,即调度程序在集群开始运行之前挂起。

最简单的方法是实际使用 dask 反模式并执行循环。但是我想知道,是否可以做得更好(这是出于教育目的,所以风格和简单性很重要)

有没有一种很好的方法可以在一个图形中读取许多文件,而不会使图形大小超出线性范围。

4

1 回答 1

0

这是我想出的一个解决方案map_partitions,它似乎生成了一个不错的图表。

import dask.bag as db

def get_features(file):
    data= pd.read_csv(file)
    data["path"]=file
    return data.groupby("path").agg(["min","max"])

csvs=db.from_sequence(files)
features=csvs.map_partitions(lambda x: [get_features(f) for f in x]).\
reduction(pd.concat,pd.concat).compute()

但是,该解决方案并没有推广到用例之外:例如,如果功能将跨越多个文件。它也不会构建一个 dask 数据框:如果每个文件有太多组,那会很糟糕。

于 2020-10-24T20:17:59.627 回答