我试图用来dask.delayed
计算一个大矩阵以供以后计算使用。我只在一台本地机器上运行代码。当我使用dask
单机调度程序时,它工作正常,但有点慢。要访问更多选项和性能监视器以改进我想dask.distributed
在单台机器上使用的代码。然而,用dask.distributed
客户端运行相同的代码会慢慢耗尽所有可用内存并崩溃而没有任何结果。
是否有不同的设置问题的方法可以让dask.distributed
客户端以更好的内存效率完成?
- 我通读了 dask.delayed最佳实践指南,认为我们使用它是正确的。
- 我在本地 Win 10 PC (64GB RAM) 和 Azure Win Server 2012 VM (256 GB) 上运行它,结果相同。
- 我试过手动设置块。
- 我曾尝试使用优化块大小,包括按行和列自动分块(行块在调度程序
stack.rechunk
中似乎运行得更快)。dask
- 我试过使用
compute()
andpersist()
(同样的结果)。 - 我尝试
dask.distributed
使用线程和进程调度程序启动客户端并调整工作人员的数量。threads
在死亡之前更快地使用更多的 RAM。 - 我已尝试根据此答案
dask.distributed
设置内存限制,但内存限制被忽略。cluster = distributed.LocalCluster(memory_limit = 8e9)
- 如果我减小问题的大小(
nX
及nY
以下),dask.distributed
客户端确实完成了任务,但是它仍然需要比dask
调度程序更多的时间和内存。
这个例子重现了这个问题:
import dask
import distributed
import numpy as np
import dask.array as da
def calcRow(X,Y):
Tx = np.transpose(X * (X + Y)) # Simplified work
return (Tx)
# Specify size of (nY x nX) matrix
nX = 1000000 # Distributed fails with nX >= 1000000 and nY >= 5000
nY = 5000
# Fill with random data
x = np.random.rand(nX,1)
y = np.random.rand(nY,1)
# Setup dask.distributed client.
# Comment out these two lines to use the standard dask scheduler,
# which does work
client = distributed.Client()
client
# Build the matrix
row = dask.delayed(calcRow, pure=True) # Build 1 row
makeRows = [row(x, y[ii]) for ii in range(nY)] # Loop for all nY rows
buildMat = [da.from_delayed(makeRow, dtype=float, shape=(10,nX))
for makeRow in makeRows] # Build matrix
stack = da.vstack(buildMat)
my_matrix = stack.compute() # Calculate the matrix entries
实际上,我的问题要大得多,并且calcRow
本身就是一个庞大、缓慢、复杂的计算,但形状和矩阵构建步骤是相同的。
我了解最佳做法是scatter
在调用之前将数据放入内存compute
,但我没有分散的功能,只有一个delayed
数组。
如果我注释掉 2dask.distributed
条客户端行,上面的示例使用最多 0.25 GB 的 RAM 在 60 秒内运行。但是有了这些行,代码会在 3-4 分钟内爬升到完全使用内存 (64GB) 并继续运行,直到系统变得不稳定。
如果我在其中构建矩阵,dask
则可以启动一个dask.distributed
客户端,并在以后的dask.distributed
计算中使用该矩阵,没有问题。它只是建立导致问题的矩阵。
我几乎觉得这是一个错误,但不能确定我的代码不是罪魁祸首。我真的很重视可能可以使代码运行或证明错误的建议。
编辑1:
我还尝试将装饰器应用于calcRow
:
@dask.delayed
def calcRow(X,Y):
并使用:
makeRows = [calcRow(x, y[ii]) for ii in range(nY)]
但这似乎是相同的?
编辑2:
如果我开始distributed.client
使用processes=False
它会更快地消耗所有系统内存,但实际上会提供以下警告,这可能是诊断性的:
Distributed.worker - 警告 - 内存使用率很高,但工作人员没有数据要存储到磁盘。也许其他一些进程正在泄漏内存?进程内存:40.27 GB -- Worker 内存限制:8.00 GB