2

我有一个分布式 dask 集群设置,我用它来加载和转换一堆数据。奇迹般有效。

我想用它做一些并行处理。这是我的功能

el = 5000
n_using = 26
n_across= 6

mat = np.random.random((el,n_using,n_across))
idx = np.tril_indices(n_across*2, -n_across)

def get_vals(c1, m, el, idx):
    m1 = m[c1,:,:]
    corr_vals = np.zeros((el, (n_across//2)*(n_across+1)))
    for c2 in range(c1+1, el):
        corr = np.corrcoef(m1.T, m[c2,:,:].T)
        corr_vals[c2] = corr[idx]
        
    return corr_vals

lazy_get_val = dask.delayed(get_vals, pure=True)

这是我正在尝试做的单处理器版本:

arrays = [get_vals(c1, mat, el, idx) for c1 in range(el)]
all_corr = np.stack(arrays, axis=0)

工作正常,但需要几个小时。这是我在黎明时做的事情:

lazy_list = [lazy_get_val(c1, mat, el, idx) for c1 in range(el)]
arrays = [da.from_delayed(lazy_item, dtype=float, shape=(el, 21)) for lazy_item in lazy_list]
all_corr = da.stack(arrays, axis=0)

即使它运行all_corr[1].compute(),它也只是坐在那里不响应。当我中断内核时,它似乎卡在/distributed/utils.py:

~/.../lib/python3.6/site-packages/distributed/utils.py 同步(循环,函数,*args,**kwargs)

    249     else:
    250         while not e.is_set():
--> 251             e.wait(10)
    252     if error[0]:
    253         six.reraise(*error[0])

关于调试这个有什么建议吗?


其他事情:

  • 如果我用较小的mat(el=1000)运行它,它运行良好。
  • 如果我做el = 5000,它会挂起。
  • 如果我中断内核并使用 再次运行它el = 1000,它就会挂起。
4

1 回答 1

2

在示例中添加导入后,我运行了一些东西,构建图表时速度非常慢。这可以通过避免将 numpy 数组直接放在延迟调用中来改进,如下所示:

# mat = np.random.random((el,n_using,n_across))
# idx = np.tril_indices(n_across*2, -n_across)
mat = dask.delayed(np.random.random)((el,n_using,n_across))
idx = dask.delayed(np.tril_indices)(n_across*2, -n_across)

或者通过删除pure=Truedask.delayed 的关键字(当您设置 pure=True 时,它​​必须对所有输入的内容进行哈希处理以获得它们的唯一键,您这样做了 5000 次)。%snakeviz我通过使用IPython 中的魔法分析您的代码发现了这一点。

然后我跑all_corr[1].compute()了,很好。然后我跑了all_corr.compute(),它似乎会进展到完成,但不是很快。我怀疑您的任务太小以至于开销太大,或者您的代码在 Python for 循环中花费了太多时间,因此遇到了 GIL 问题。不确定是哪个。

我建议尝试的下一件事是使用 dask.distributed 调度程序,它可以更好地处理 GIL 问题并加剧开销问题。看看它是如何执行的可能有助于隔离问题。

于 2018-06-29T12:04:19.057 回答