我正在使用 Dask 分布式和 Datashader 运行一些简单的测试,但我遇到了两个我无法解决的问题,也无法理解它为什么会发生。
我正在处理的数据包含 17 亿行,每行 97 列,分布在 64 个 parquet 文件中。我的测试代码如下,其中我只是在散点图中绘制了两列数据,遵循https://datashader.org/user_guide/Performance.html底部的示例代码:
def plot(file_path):
dask_df = dd.read_parquet(file_path, engine='pyarrow')
cvs = ds.Canvas(plot_width=600, plot_height=300)
agg = cvs.points(dask_df, 'x', 'y')
img = tf.shade(agg, cmap=['lightblue', 'darkblue'])
return img
futures = [dask_client.submit(plot,file) for f in files_paths]
result = [f.result() for f in futures] #array with each plot per file
这两个问题如下:
首先,我的员工将太多数据带入内存。例如,我只用一个工作人员和一个文件运行了前面的代码。尽管一个文件是 11gb,但 Dask 仪表板显示大约 50gb 加载到内存中。我发现的唯一解决方案是更改以下行,显式显示一小部分列:
def plot(file_path):
dask_df = dd.read_parquet(file_path, columns=['x','y',...], engine='pyarrow')
…
虽然这很有效(并且很有意义,因为我只使用 2 列来绘制图),但对于为什么工作人员使用这么多内存仍然令人困惑。
第二个问题是,即使我在 ~/.config/dask/distributed.yaml 文件中配置了 70% 的溢出应该发生,但我的工作人员会因为内存不足而不断崩溃:
Distributed.nanny - 警告 - Worker 超出了 95% 的内存预算。重启distributed.nanny - 警告 - 重启工人
最后,当我绘制所有点时,columns=['x','y','a','b','c']
在读取数据时只带了 5 列,我得到了不合理的缓慢时间。尽管文件被分成 8 个磁盘以加速 I/O 并使用 8 个内核(8 个工作人员),但绘制 17 亿个点需要 5 分钟。
我正在使用:dask 2.18.0、distributed 2.19.0、datashader 0.10.0 和 python 3.7.7。
我已经为此苦苦挣扎了整整一周,所以任何建议都将受到高度赞赏。请随时向我询问可能缺少的任何其他信息。