2

我有一大堆 Parquet 文件,我试图按列排序。未压缩的数据约为 14Gb,因此 Dask 似乎是适合这项工作的工具。我对 Dask 所做的只是:

  1. 读取镶木地板文件
  2. 对其中一列进行排序(称为“朋友”)
  3. 在单独的目录中写入 parquet 文件

如果没有 Dask 进程(只有一个,我正在使用同步调度程序)耗尽内存并被杀死,我无法做到这一点。这让我很吃惊,因为没有一个分区未压缩超过约 300 mb。

我编写了一个小脚本来使用我的数据集越来越大的部分来分析 Dask,并且我注意到 Dask 的内存消耗随着输入的大小而变化。这是脚本:

import os
import dask
import dask.dataframe as dd
from dask.diagnostics import ResourceProfiler, ProgressBar

def run(input_path, output_path, input_limit):
    dask.config.set(scheduler="synchronous")

    filenames = os.listdir(input_path)
    full_filenames = [os.path.join(input_path, f) for f in filenames]

    rprof = ResourceProfiler()
    with rprof, ProgressBar():
        df = dd.read_parquet(full_filenames[:input_limit])
        df = df.set_index("friend")
        df.to_parquet(output_path)

    rprof.visualize(file_path=f"profiles/input-limit-{input_limit}.html")

以下是电话会议产生的图表visualize()

输入限制 = 2

使用 input_limit=2

输入限制 = 4

使用 input_limit=4

输入限制 = 8

使用 input_limit=8

输入限制 = 16

使用 input_limit=16

完整的数据集是大约 50 个输入文件,因此以这种增长速度,我并不感到惊讶,因为工作会占用我 32gb 机器上的所有内存。

我的理解是,Dask 的全部意义在于允许您对大于内存的数据集进行操作。我的印象是人们正在使用 Dask 处理比我的 ~14gb 大得多的数据集。他们如何通过扩展内存消耗来避免这个问题?我在这里做错了什么?

在这一点上,我对使用不同的调度程序或并行性不感兴趣。我只是想知道为什么 Dask 消耗的内存比我认为必要的多得多。

4

1 回答 1

1

事实证明,这是在 2021.03.0 版本中修复的 Dask 中的性能回归。

有关更多信息,请参阅此Github 问题

于 2021-04-16T18:37:46.710 回答