1

我在 S3 中有一堆文件,它们包含一个大于内存的数据帧。

目前,我使用 Dask 将文件读入数据帧,使用较小的数据集执行内部连接(每次调用此函数时都会更改,而huge_df基本上是完整的数据集并且不会更改),调用计算以获取小得多的熊猫数据框,然后做一些处理。例如:

huge_df = ddf.read_csv("s3://folder/**/*.part") 
merged_df = huge_df.join(small_df, how='inner', ...)
merged_df = merged_df.compute()
...other processing...

大部分时间都花在从 S3 下载文件上。我的问题是:有没有办法使用 Dask 将 S3 中的文件缓存在磁盘上,以便在随后调用此代码时,我可以从磁盘读取数据帧文件,而不是从 S3 读取?我想我不能只是打电话huge_df.to_csv(./local-dir/),因为这会带入huge_df记忆中,这是行不通的。

我确信有一种方法可以结合使用其他工具和标准 Python IO 实用程序来做到这一点,但我想看看是否有办法使用 Dask 从 S3 下载文件内容并将它们存储在本地磁盘上没有把一切都带入记忆。

4

1 回答 1

2

这样做huge_df.to_csv会奏效,因为它会将每个分区写入本地的单独文件,因此整个事情不会立即在内存中。

但是,要回答具体问题,dask 用于fsspec管理文件操作,并且它允许本地缓存,例如,您可以这样做

huge_df = ddf.read_csv("simplecache::s3://folder/**/*.part")

默认情况下,这会将文件存储在一个临时文件夹中,当您退出 python 会话时,该文件夹会被清理,但您可以使用可选参数提供选项storage_options={"simplecache": {..}}来指定缓存位置,或者使用“filecache”而不是“simplecache”如果您希望本地副本在一段时间后过期或检查目标以获取更新版本。

请注意,显然,只有当所有工作人员都可以访问相同的缓存位置时,这些才适用于分布式集群,因为分区的加载可能发生在您的任何工作人员身上。

于 2020-06-07T18:39:21.190 回答