我目前正在通过以下方式使用 Dask ......
- S3 上有以下格式的文件列表:
<day1>/filetype1.gz
<day1>/filetype2.gz
<day2>/filetype1.gz
<day2>/filetype2.gz
...ETC
我的代码读取所有文件
filetype1
并建立一个数据框并设置索引(例如df1 = ddf.read_csv(day1/filetype1.gz, blocksize=None, compression='gzip').set_index(index_col)
:)读取所有文件
filetype2
并建立一个大数据框(类似于上面)。通过 将两个数据帧合并在一起
merged_df = ddf.merge(df1, df2, how='inner', left_index=True, right_index=True)
。通过以下方式将结果写入 S3:
merged_df.to_csv(<s3_output_location>)
注意:这里的目标实际上是在特定日期内合并(即,合并给定日期的 filetype1 和 filetype2),每天重复,并存储所有这些连接的并集,但看起来就像一天做连接一次不会利用并行性,让 Dask 管理更大的连接会更有性能。我认为 Dask 会根据文档中的以下行(https://docs.dask.org/en/latest/dataframe-joins.html)以内存感知方式管理更大的连接:
如果找不到足够的内存,那么 Dask 将不得不读写数据到磁盘,这可能会导致其他性能成本。
我看到 aMemoryError
发生在对 的调用中to_csv
。我猜这是因为to_csv
callscompute
尝试计算连接的完整结果,然后尝试存储该结果。完整的文件内容当然无法放入内存,但我认为(希望)Dask 会运行计算并以内存感知的方式存储生成的 Dataframe。关于我应该如何使用 Dask 来完成我想做的事情的任何指导或建议?提前致谢。