2

我们有一个在 python 3.2 (Fedora Core 14 64b) 中运行的 Web 服务服务器,但由于新的依赖项(不支持 3.2)而被迫向后移植到 python 2.6.7。有一段代码使用了并发期货,已被重写为使用 multiprocessing.Pool 并行执行几个关键部分。代码现在如下所示:

import multiprocessing
def _run_threads(callable_obj, args, threads):
    pool = multiprocessing.Pool(processes=threads)
    process_list = [pool.apply_async(callable_obj, a) for a in args]
    pool.close()
    pool.join()
    return [x.get() for x in process_list]

对“线程”名称的令人困惑的滥用表示歉意。这些是过程。

自从实现了这个功能,我们发现它有时会挂起。当我们最终杀死父(主)进程时,我们会得到一个乱码的回溯;但有几行似乎很关键:

[snip]
Process PoolWorker-445:
[snip]
File "/usr/lib64/python2.6/multiprocessing/pool.py", line 59, in worker
task = get()
File "/usr/lib64/python2.6/multiprocessing/queues.py", line 352, in get
return recv()
racquire()
[snip]

在我看来,从现有证据来看,池中的子进程未能从父进程接收“关闭”信号,因此它正在等待工作。父母坐等孩子关机。服务器挂起。对于这样一个关键的服务器来说,这种情况发生的不确定性但过于频繁(一天一次)。

run_threads() 函数的编码有问题吗?这是已知解决方法的已知问题吗?显然,我们将其用于时间要求严格的处理,因此除非绝对必要,否则我们不希望为顺序执行重新编码。坚持使用 multiprocessing.Pool 的原因之一是可以轻松访问并行运行的操作的返回码。

谢谢

4

1 回答 1

0

I am not sure where this issue has its origin. It's definitely very interesting. However, maybe a little restructuring solves the problem. I think you do not require to terminate the pool processes before having collected your results, right? Maybe sticking to the 'canonical' way of using a Pool, as documented, helps:

result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1)           # raises TimeoutError

Or, in your case, call x.get() for x in process_list before closing/joining the pool. If the problem persists and occurs during get(), we at least know it has nothing to do with close().

于 2012-09-18T21:51:29.507 回答