2

我正在使用 Dask 分布式和 Datashader 运行一些简单的测试,但我遇到了两个我无法解决的问题,也无法理解它为什么会发生。

我正在处理的数据包含 17 亿行,每行 97 列,分布在 64 个 parquet 文件中。我的测试代码如下,其中我只是在散点图中绘制了两列数据,遵循https://datashader.org/user_guide/Performance.html底部的示例代码:

def plot(file_path):
    dask_df = dd.read_parquet(file_path,  engine='pyarrow') 
    cvs = ds.Canvas(plot_width=600, plot_height=300)
    agg = cvs.points(dask_df, 'x', 'y')
    img = tf.shade(agg, cmap=['lightblue', 'darkblue'])
    return img

futures = [dask_client.submit(plot,file) for f in files_paths]
result = [f.result() for f in futures]  #array with each plot per file

这两个问题如下:

首先,我的员工将太多数据带入内存。例如,我只用一个工作人员和一个文件运行了前面的代码。尽管一个文件是 11gb,但 Dask 仪表板显示大约 50gb 加载到内存中。我发现的唯一解决方案是更改以下行,显式显示一小部分列:

def plot(file_path):
    dask_df = dd.read_parquet(file_path,  columns=['x','y',...], engine='pyarrow') 
    …

虽然这很有效(并且很有意义,因为我只使用 2 列来绘制图),但对于为什么工作人员使用这么多内存仍然令人困惑。

第二个问题是,即使我在 ~/.config/dask/distributed.yaml 文件中配置了 70% 的溢出应该发生,但我的工作人员会因为内存不足而不断崩溃:

Distributed.nanny - 警告 - Worker 超出了 95% 的内存预算。重启distributed.nanny - 警告 - 重启工人

最后,当我绘制所有点时,columns=['x','y','a','b','c'] 在读取数据时只带了 5 列,我得到了不合理的缓慢时间。尽管文件被分成 8 个磁盘以加速 I/O 并使用 8 个内核(8 个工作人员),但绘制 17 亿个点需要 5 分钟。

我正在使用:dask 2.18.0、distributed 2.19.0、datashader 0.10.0 和 python 3.7.7。

我已经为此苦苦挣扎了整整一周,所以任何建议都将受到高度赞赏。请随时向我询问可能缺少的任何其他信息。

4

1 回答 1

1

虽然这很有效(并且很有意义,因为我只使用 2 列来绘制图),但对于为什么工作人员使用这么多内存仍然令人困惑。

Parquet 是一种相对有效的格式。例如,您的数据可能在磁盘上被压缩但在 Pandas 中未压缩,或者 Pandas 字符串类型可能导致一些膨胀(Pandas 使用 Python 字符串,它们很大)。

第二个问题是,即使我在 ~/.config/dask/distributed.yaml 文件中配置了 70% 的溢出应该发生,但我的工作人员会因为内存不足而不断崩溃:

我不知道该告诉你什么。Dask 无法阻止 Python 函数耗尽 RAM。我会与 datashader 人员联系,但我希望他们的代码非常紧凑。

最后,当我绘制所有点时,在读取数据时只带 5 列 columns=['x','y','a','b','c'] ,我得到了不合理的缓慢时间。尽管文件被分成 8 个磁盘以加速 I/O 并使用 8 个内核(8 个工作人员),但绘制 17 亿个点需要 5 分钟。

很难诊断堆栈溢出导致的性能问题。我建议遵循此处的指导:https ://docs.dask.org/en/latest/understanding-performance.html

于 2020-08-08T00:30:22.930 回答