36

我正在阅读有关 Python 中多处理模块的各种教程,并且无法理解为什么/何时调用process.join(). 例如,我偶然发现了这个例子:

nums = range(100000)
nprocs = 4

def worker(nums, out_q):
    """ The worker function, invoked in a process. 'nums' is a
        list of numbers to factor. The results are placed in
        a dictionary that's pushed to a queue.
    """
    outdict = {}
    for n in nums:
        outdict[n] = factorize_naive(n)
    out_q.put(outdict)

# Each process will get 'chunksize' nums and a queue to put his out
# dict into
out_q = Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []

for i in range(nprocs):
    p = multiprocessing.Process(
            target=worker,
            args=(nums[chunksize * i:chunksize * (i + 1)],
                  out_q))
    procs.append(p)
    p.start()

# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultdict = {}
for i in range(nprocs):
    resultdict.update(out_q.get())

# Wait for all worker processes to finish
for p in procs:
    p.join()

print resultdict

据我了解,process.join()将阻塞调用进程,直到调用其 join 方法的进程完成执行。我也相信上述代码示例中启动的子进程在完成目标函数后,即在他们将结果推送到out_q. 最后,我相信这会out_q.get()阻止调用过程,直到有结果要提取。因此,如果您考虑以下代码:

resultdict = {}
for i in range(nprocs):
    resultdict.update(out_q.get())

# Wait for all worker processes to finish
for p in procs:
    p.join()

主进程被out_q.get()调用阻塞,直到每个工作进程完成将其结果推送到队列。因此,当主进程退出 for 循环时,每个子进程都应该已完成执行,对吗?

如果是这种情况,此时是否有任何理由调用这些p.join()方法?不是所有的工作进程都已经完成,那么这如何导致主进程“等待所有工作进程完成”?我问主要是因为我在多个不同的示例中看到了这一点,并且我很好奇我是否无法理解某些内容。

4

3 回答 3

22

在您调用之前join,所有工作人员都已将他们的结果放入他们的队列中,但他们不一定会返回,他们的进程可能还没有终止。他们可能会也可能不会这样做,这取决于时间。

调用join确保所有进程都有时间正确终止。

于 2013-01-20T22:04:59.907 回答
21

尝试运行这个:

import math
import time
from multiprocessing import Queue
import multiprocessing

def factorize_naive(n):
    factors = []
    for div in range(2, int(n**.5)+1):
        while not n % div:
            factors.append(div)
            n //= div
    if n != 1:
        factors.append(n)
    return factors

nums = range(100000)
nprocs = 4

def worker(nums, out_q):
    """ The worker function, invoked in a process. 'nums' is a
        list of numbers to factor. The results are placed in
        a dictionary that's pushed to a queue.
    """
    outdict = {}
    for n in nums:
        outdict[n] = factorize_naive(n)
    out_q.put(outdict)

# Each process will get 'chunksize' nums and a queue to put his out
# dict into
out_q = Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []

for i in range(nprocs):
    p = multiprocessing.Process(
            target=worker,
            args=(nums[chunksize * i:chunksize * (i + 1)],
                  out_q))
    procs.append(p)
    p.start()

# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultdict = {}
for i in range(nprocs):
    resultdict.update(out_q.get())

time.sleep(5)

# Wait for all worker processes to finish
for p in procs:
    p.join()

print resultdict

time.sleep(15)

并打开任务管理器。您应该能够看到 4 个子进程在被操作系统终止之前进入僵尸状态几秒钟(由于加入调用):

在此处输入图像描述

在更复杂的情况下,子进程可能永远处于僵尸状态(就像您在另一个问题中询问的情况一样),如果您创建足够多的子进程,您可以填充进程表,从而给操作系统带来麻烦(这可能会杀死避免失败的主要过程)。

于 2013-01-20T22:28:38.443 回答
1

我不完全确定实现细节,但 join 似乎也有必要反映一个进程确实已经终止(例如在调用 terminate 之后)。在此处的示例中,如果您在终止进程后未调用 join ,process.is_alive()则会返回True,即使该进程已通过process.terminate()调用终止。

于 2020-03-09T22:12:58.827 回答