1

抱歉,代码太长了,我试图让它尽可能简单但可重现。

简而言之,这个 python 脚本启动了四个进程,将数字随机分配到列表中。然后,将结果添加到multiprocessing.Queue().

import random
import multiprocessing
import numpy
import sys

def work(subarray, queue):
    result = [numpy.array([], dtype=numpy.uint64) for i in range (0, 4)]

    for element in numpy.nditer(subarray):
        index = random.randint(0, 3)
        result[index] = numpy.append(result[index], element)

    queue.put(result)
    print "after the queue.put"

jobs = []
queue = multiprocessing.Queue()

subarray = numpy.array_split(numpy.arange(1, 10001, dtype=numpy.uint64), 4)

for i in range(0, 4):
    process = multiprocessing.Process(target=work, args=(subarray[i], queue))
    jobs.append(process)
    process.start()

for j in jobs:
    j.join()

print "the end"

所有进程都运行这print "after the queue.put"条线。但是,它并没有达到要求print "the end"。很奇怪,如果我将arangefrom更改100011001,它就会结束。怎么了?

4

3 回答 3

1

我会将我的评论扩展为一个简短的答案。由于我也不理解奇怪的行为,它只是一种解决方法。

第一个观察是,如果 queue.put 行被注释掉,代码会运行到最后,所以它一定是与队列有关的问题。结果实际上已添加到队列中,因此问题必须在队列和加入之间的相互作用中。

以下代码按预期工作

import random
import multiprocessing
import numpy
import sys
import time

def work(subarray, queue):
    result = [numpy.array([], dtype=numpy.uint64) for i in range (4)]

    for element in numpy.nditer(subarray):
        index = random.randint(0, 3)
        result[index] = numpy.append(result[index], element)

    queue.put(result)
    print("after the queue.put")


jobs = []
queue = multiprocessing.Queue()

subarray = numpy.array_split(numpy.arange(1, 15001, dtype=numpy.uint64), 4)


for i in range(4):
    process = multiprocessing.Process(target=work, args=(subarray[i], queue))
    jobs.append(process)
    process.start()

res = []
while len(res)<4:
    res.append(queue.get())

print("the end")
于 2018-08-14T20:41:24.073 回答
1

大多数子进程在 put 调用时被阻塞。 多处理队列放置

必要时阻塞,直到有空闲插槽可用。

这可以通过在加入之前添加对 queue.get() 的调用来避免。

此外,在多处理代码中,请通过以下方式隔离父进程:

if __name__ == '__main__':
    # main code here

使用多处理时在 Windows 中强制使用 if name ==“<strong>ma​​in”

于 2018-08-14T21:26:58.760 回答
1

这是原因:

加入使用队列的进程

请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目都由“馈送”线程馈送到底层管道。(子进程可以调用队列的 cancel_join_thread() 方法来避免这种行为。)

这意味着无论何时使用队列,您都需要确保所有已放入队列的项目最终都会在进程加入之前被删除。否则,您无法确定将项目放入队列的进程将终止。还要记住,非守护进程将自动加入。

于 2018-08-15T13:55:53.680 回答