1

我目前正在通过以下方式使用 Dask ......

  • S3 上有以下格式的文件列表:

<day1>/filetype1.gz

<day1>/filetype2.gz

<day2>/filetype1.gz

<day2>/filetype2.gz

...ETC

  • 我的代码读取所有文件filetype1并建立一个数据框并设置索引(例如df1 = ddf.read_csv(day1/filetype1.gz, blocksize=None, compression='gzip').set_index(index_col):)

  • 读取所有文件filetype2并建立一个大数据框(类似于上面)。

  • 通过 将两个数据帧合并在一起merged_df = ddf.merge(df1, df2, how='inner', left_index=True, right_index=True)

  • 通过以下方式将结果写入 S3:merged_df.to_csv(<s3_output_location>)

注意:这里的目标实际上是在特定日期内合并(即,合并给定日期的 filetype1 和 filetype2),每天重复,并存储所有这些连接的并集,但看起来就像一天做连接一次不会利用并行性,让 Dask 管理更大的连接会更有性能。我认为 Dask 会根据文档中的以下行(https://docs.dask.org/en/latest/dataframe-joins.html)以内存感知方式管理更大的连接:

如果找不到足够的内存,那么 Dask 将不得不读写数据到磁盘,这可能会导致其他性能成本。

我看到 aMemoryError发生在对 的调用中to_csv。我猜这是因为to_csvcallscompute尝试计算连接的完整结果,然后尝试存储该结果。完整的文件内容当然无法放入内存,但我认为(希望)Dask 会运行计算并以内存感知的方式存储生成的 Dataframe。关于我应该如何使用 Dask 来完成我想做的事情的任何指导或建议?提前致谢。

4

1 回答 1

0

我看到调用 to_csv 时发生了 MemoryError。我猜这是因为 to_csv 调用了计算,它尝试计算连接的完整结果,然后尝试存储该结果。完整的文件内容当然不能放入内存,但我认为(希望)Dask 会运行计算并以内存感知的方式存储生成的 Dataframe

一般来说,Dask 会按照您期望的方式将事情分块并运行。以低内存的方式进行分布式连接很难,但通常是可行的。如果没有更多信息,我不知道如何在这里提供更多帮助,我很感激在 Stack Overflow 上很难简洁地提供这些信息。我通常的建议是密切关注仪表板。

注意:这里的目标实际上是在特定日期内合并(即,合并给定日期的 filetype1 和 filetype2),每天重复,并存储所有这些连接的并集,但看起来就像一天做连接一次不会利用并行性,让 Dask 管理更大的连接会更高效

一般来说,你的直觉是正确的,一次给 Dask 更多的工作是好的。但是,在这种情况下,您似乎知道 Dask 不知道的数据。您知道每个文件只与另一个文件交互。一般来说,连接必须以一个数据集的所有行都可以与另一个数据集的所有行交互的方式完成,因此 Dask 的算法在这里必须非常通用,这可能很昂贵。

在您的情况下,我会使用 Pandas 和延迟的 Dask 一次完成所有计算。

lazy_results = []
for fn in filenames:
    left = dask.delayed(pd.read_csv, fn + "type-1.csv.gz")
    right = dask.delayed(pd.read_csv, fn + "type-1.csv.gz")
    merged = left.merge(right)
    out = merged.to_csv(...)
    lazy_results.append(out)

dask.compute(*lazy_results)
于 2020-06-13T15:52:47.157 回答