6

我试图用来dask.delayed计算一个大矩阵以供以后计算使用。我只在一台本地机器上运行代码。当我使用dask单机调度程序时,它工作正常,但有点慢。要访问更多选项和性能监视器以改进我想dask.distributed在单台机器上使用的代码。然而,用dask.distributed客户端运行相同的代码会慢慢耗尽所有可用内存并崩溃而没有任何结果。

是否有不同的设置问题的方法可以让dask.distributed客户端以更好的内存效率完成?

  • 我通读了 dask.delayed最佳实践指南,认为我们使用它是正确的。
  • 我在本地 Win 10 PC (64GB RAM) 和 Azure Win Server 2012 VM (256 GB) 上运行它,结果相同。
  • 我试过手动设置块。
  • 我曾尝试使用优化块大小,包括按行和列自动分块(行块在调度程序stack.rechunk中似乎运行得更快)。dask
  • 我试过使用compute()and persist()(同样的结果)。
  • 我尝试dask.distributed使用线程和进程调度程序启动客户端并调整工作人员的数量。threads在死亡之前更快地使用更多的 RAM。
  • 我已尝试根据此答案dask.distributed设置内存限制,但内存限制被忽略。cluster = distributed.LocalCluster(memory_limit = 8e9)
  • 如果我减小问题的大小(nXnY以下),dask.distributed客户端确实完成了任务,但是它仍然需要比dask调度程序更多的时间和内存。

这个例子重现了这个问题:

import dask
import distributed
import numpy as np
import dask.array as da

def calcRow(X,Y):
    Tx = np.transpose(X * (X + Y)) # Simplified work
    return (Tx)

# Specify size of (nY x nX) matrix
nX = 1000000 #  Distributed fails with nX >= 1000000 and nY >= 5000
nY = 5000

# Fill with random data
x = np.random.rand(nX,1)
y = np.random.rand(nY,1)

# Setup dask.distributed client.
# Comment out these two lines to use the standard dask scheduler,
# which does work 
client = distributed.Client()
client

# Build the matrix
row = dask.delayed(calcRow, pure=True)   # Build 1 row
makeRows = [row(x, y[ii]) for ii in range(nY)] # Loop for all nY rows
buildMat = [da.from_delayed(makeRow, dtype=float, shape=(10,nX))
            for makeRow in makeRows] # Build matrix
stack = da.vstack(buildMat)
my_matrix = stack.compute() # Calculate the matrix entries

实际上,我的问题要大得多,并且calcRow本身就是一个庞大、缓慢、复杂的计算,但形状和矩阵构建步骤是相同的​​。

我了解最佳做法是scatter在调用之前将数据放入内存compute,但我没有分散的功能,只有一个delayed数组。

如果我注释掉 2dask.distributed条客户端行,上面的示例使用最多 0.25 GB 的 RAM 在 60 秒内运行。但是有了这些行,代码会在 3-4 分钟内爬升到完全使用内存 (64GB) 并继续运行,直到系统变得不稳定。

如果我在其中构建矩阵,dask则可以启动一个dask.distributed客户端,并在以后的dask.distributed计算中使用该矩阵,没有问题。它只是建立导致问题的矩阵。

我几乎觉得这是一个错误,但不能确定我的代码不是罪魁祸首。我真的很重视可能可以使代码运行或证明错误的建议。

编辑1: 我还尝试将装饰器应用于calcRow

@dask.delayed
def calcRow(X,Y):

并使用:

makeRows = [calcRow(x, y[ii]) for ii in range(nY)]

但这似乎是相同的?

编辑2: 如果我开始distributed.client使用processes=False它会更快地消耗所有系统内存,但实际上会提供以下警告,这可能是诊断性的:

Distributed.worker - 警告 - 内存使用率很高,但工作人员没有数据要存储到磁盘。也许其他一些进程正在泄漏内存?进程内存:40.27 GB -- Worker 内存限制:8.00 GB

4

0 回答 0