1

我现在正在处理一个大数据集。我的输入将是 4 个不同的数据集,我必须对每个数据集应用一个特定的函数。所以我所做的是读取所有四个数据集并使用 pool.map 将函数并行应用于每个数据集。所以现在我有一个父进程和 4 个子进程。到此为止一切都很好。

Q1。现在每个进程内部会发生什么。在我对每个数据集应用的函数中,我将每个元组与其他元组进行比较,因此它是一种递归。有没有办法让它并行,因为这个比较可能需要很长时间,因为数据集会很大。如何制作它,因为它已经是一个子进程?是否可以在子进程中再次并行化它,因为我有更多的处理器,所以我想利用它。

Q2。对于这个递归任务的并行化,我想到的是,如果我将元组 x 与元组 y(每个元组与所有其他元组)进行比较,我可以为 x 制作块,每个块都与 y 进行比较。我想这可以通过两个“for 循环”来完成。任何建议如何做到这一点?

4

1 回答 1

4

回复:Q1,如果您使用 a 创建子进程multiprocessing.Pool,那么不,工作进程不能有子进程。尝试创建一个将引发异常:

AssertionError: daemonic processes are not allowed to have children

原因很清楚 - a 中的进程Pool是守护进程,而守护进程不能有子进程。这样做的原因是终止父进程将终止其守护子进程,但守护子进程将无法终止子进程,这将留下孤立进程。这在文档中有所说明:

请注意,不允许守护进程创建子进程。否则,如果守护进程在其父进程退出时被终止,它将使其子进程成为孤儿。

您可以通过创建一组非守护程序Process对象的父进程来解决此问题,而不是使用Pool. 然后,每个孩子都可以创建自己的multiprocessing.Pool

import multiprocessing

def subf(x):
    print "in subf"

def f(x):
    print "in f"
    p = multiprocessing.Pool(2)
    p.map(subf, range(2))


if __name__ == "__main__":
    processes = []
    for i in range(2):
        proc = multiprocessing.Process(target=f, args=(i,))
        proc.start()
        processes.append(proc)

输出:

in f
in f
in subf
in subf
in subf
in subf

这种方法对您来说似乎没问题,因为您的初始数据集仅包含四个项目。您可以Process在数据集中为每个项目创建一个,并且仍然有一些空闲 CPU 可供每个子进程在小型Pool.

回复:Q2,听起来你可以用itertools.product你想要比较的每对元组创建一个大的迭代。然后,您可以使用pool.map并行比较每一对。这是一个显示其工作原理的示例:

def f(x):
    print(x)

if __name__ == "__main__":
    # Create two lists of tuples, like your use-case
    x = zip(range(3), range(3,6))
    y = zip(range(6, 9), range(9, 12))

    pool = multiprocessing.Pool()
    pool.map(f, itertools.product(x, y))

输出:

((0, 3), (6, 9))
((0, 3), (7, 10))
((0, 3), (8, 11))
((1, 4), (6, 9))
((1, 4), (7, 10))
((1, 4), (8, 11))
((2, 5), (6, 9))
((2, 5), (8, 11))
((2, 5), (7, 10))
于 2014-09-21T04:47:29.047 回答