1

我有一个工作流程,我想与 dask 并行化,但我正在努力寻找一种有效的方法来做到这一点。我已经分析了代码并确定了我想要加速的方面。这是伪代码:

for i in range(n):
    x = expensive_iris_extract_operation(i)
    r = cheaper_numpy_based_operation(x)
    save_to_disk(r)

基于 numpy 的操作不需要并行化,当在我的本地机器上运行时,它只占不到 5% 的运行时间。我想通过在单独的进程上运行每次迭代来加速 for 循环(特别是昂贵的提取操作是花费大部分时间的地方)。所以我的第一次尝试是在 range(n) 和地图上使用 dask bag

 x = very_expensive_iris_extract_operation(i)
 r = cheap_numpy_based_operation(x)
 save_to_disk(r)

将其作为 i 的函数。

我将它发送到我的基于 slurm 的计算集群以在 n 个进程上运行。它成功地产生了 n 个进程,但在 numpy 操作上超时。调查一下,我认为这是因为每个进程只运行一个线程。当我在没有 dask 的情况下运行代码时,我只得到 1 个进程,但有多个线程,并且基于 numpy 的操作很快。我尝试使用线程调度程序运行代码,然后运行,但仅使用一个进程,因此使用多处理器集群没有加速。(实际上它比没有 dask 的情况下运行要慢)。

所以我想我的问题是我想利用多处理,但如果我这样做,它会禁用多线程,这也是我需要的。

还有其他方法可以解决这个问题吗?谢谢

编辑:我尝试使用 Python 的本机多处理模块,但遇到了同样的问题。如果我将循环中的可迭代对象映射到进程池中的进程,则该函数在遇到 numpy 操作时会停止(嗯,不是停止,但非常慢)(在本例中为 np.apply_along_axis 操作))仅以串行方式运行会更快。

编辑2:我通过重构我的numpy操作来解决这个问题,所以它们只是在循环之外串行发生。(而且我的 iris 操作在多个进程中并行发生。)这行得通,但对我来说,numpy 所做的多线程似乎在子进程内部不起作用似乎仍然很奇怪。

4

0 回答 0