我有一大堆 Parquet 文件,我试图按列排序。未压缩的数据约为 14Gb,因此 Dask 似乎是适合这项工作的工具。我对 Dask 所做的只是:
- 读取镶木地板文件
- 对其中一列进行排序(称为“朋友”)
- 在单独的目录中写入 parquet 文件
如果没有 Dask 进程(只有一个,我正在使用同步调度程序)耗尽内存并被杀死,我无法做到这一点。这让我很吃惊,因为没有一个分区未压缩超过约 300 mb。
我编写了一个小脚本来使用我的数据集越来越大的部分来分析 Dask,并且我注意到 Dask 的内存消耗随着输入的大小而变化。这是脚本:
import os
import dask
import dask.dataframe as dd
from dask.diagnostics import ResourceProfiler, ProgressBar
def run(input_path, output_path, input_limit):
dask.config.set(scheduler="synchronous")
filenames = os.listdir(input_path)
full_filenames = [os.path.join(input_path, f) for f in filenames]
rprof = ResourceProfiler()
with rprof, ProgressBar():
df = dd.read_parquet(full_filenames[:input_limit])
df = df.set_index("friend")
df.to_parquet(output_path)
rprof.visualize(file_path=f"profiles/input-limit-{input_limit}.html")
以下是电话会议产生的图表visualize()
:
输入限制 = 2
输入限制 = 4
输入限制 = 8
输入限制 = 16
完整的数据集是大约 50 个输入文件,因此以这种增长速度,我并不感到惊讶,因为工作会占用我 32gb 机器上的所有内存。
我的理解是,Dask 的全部意义在于允许您对大于内存的数据集进行操作。我的印象是人们正在使用 Dask 处理比我的 ~14gb 大得多的数据集。他们如何通过扩展内存消耗来避免这个问题?我在这里做错了什么?
在这一点上,我对使用不同的调度程序或并行性不感兴趣。我只是想知道为什么 Dask 消耗的内存比我认为必要的多得多。