3

我正在使用以下模式进行多处理:

    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    outQ.put('STOP')

    for result in iter(outQ.get, 'STOP'):
        # save result

哪个工作正常。但是,如果我通过 发送一个 numpy 数组outQ,则'STOP'不会以 结尾outQ,导致我的结果获取循环提前终止。

这是一些重现行为的代码。

import multiprocessing
import numpy as np

def worker(inQ, outQ):
    for i in iter(inQ.get, 'STOP'):
        result = np.random.rand(1,100)
        outQ.put(result)
        inQ.task_done()
    inQ.task_done() # for the 'STOP'

def main():
    nProcesses = 8
    data = range(1000)

    inQ = multiprocessing.JoinableQueue()
    outQ = multiprocessing.Queue()
    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    print outQ.qsize()
    outQ.put('STOP')

    cnt = 0
    for result in iter(outQ.get, 'STOP'):
        cnt += 1
    print "got %d items" % cnt
    print outQ.qsize()

if __name__ == '__main__':
    main()

如果您将 替换为result = np.random.rand(1,100)类似result = i*i代码的内容,则可以按预期工作。

这里发生了什么?我在这里做一些根本错误的事情吗?我本来期望outQ.put()afterinQ.join()做我想做的,因为join()直到所有进程都完成所有put()s 之前的块。

为我工作的解决方法是使用 进行结果获取循环while outQ.qsize() > 0,这可以找到。但我读qsize()的不可靠。只有在不同的进程运行时才不可靠吗?qsize()做完之后我可以依靠inQ.join()吗?

我希望有些人建议使用multiprocessing.Pool.map(),但是在使用 numpy 数组(ndarrays)执行此操作时,我遇到了 pickle 错误。

感谢您的关注!

4

2 回答 2

1

由于您知道要从 中获得多少项目outQ,因此另一种解决方法是明确等待该数量的项目:

import multiprocessing as mp
import numpy as np
import Queue

N=100

def worker(inQ, outQ):
    while True:
        i,item=inQ.get()
        result = np.random.rand(1,N)
        outQ.put((i,result))
        inQ.task_done()

def main():
    nProcesses = 8
    data = range(N)
    inQ = mp.JoinableQueue()
    outQ = mp.Queue()    

    for i,item in enumerate(data):
        inQ.put((i,item))

    for i in xrange(nProcesses):
        proc=mp.Process(target=worker, args=[inQ, outQ])
        proc.daemon=True
        proc.start()

    inQ.join()
    cnt=0
    for _ in range(N):
        result=outQ.get()
        print(result)
        cnt+=1
        print(cnt)      
    print('got {c} items'.format(c=cnt))

if __name__ == '__main__':
    main()
于 2011-03-16T21:02:51.577 回答
1

numpy 数组使用丰富的比较。所以 a=='STOP' 返回一个 numpy 数组,而不是 bool,并且该 numpy 数组不能被强制为 bool。在幕后,iter(outQ.get, 'STOP') 只是在进行比较,并且可能在尝试将结果转换为布尔值时将异常视为 False。您将不得不进行手动 while 循环,从队列中拉出项目,检查 isinstance(item, basestring),然后再将其与“STOP”进行比较。

while True:
    item = outQ.get()
    if isinstance(item, basestring) and item == 'STOP':
        break
    cnt += 1

检查 qsize() 可能也可以正常工作,因为在输入队列加入后没有其他进程添加到队列中。

于 2011-03-16T14:35:30.827 回答