6

我一直在尝试使用 并行化一些代码,concurrent.futures.ProcessPoolExecutor但一直遇到奇怪的死锁,而ThreadPoolExecutor. 一个最小的例子:

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        executor.submit(test)

在 python 3.2.2(在 64 位 Ubuntu 上)中,这似乎在提交所有作业后一直挂起 - 每当提交的作业数量大于工人数量时,这似乎就会发生。如果我用它替换ProcessPoolExecutorThreadPoolExecutor可以完美地工作。

作为调查的尝试,我给了每个未来一个回调来打印的值i

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test)

        def callback(f):
            print('callback {}'.format(i))
        future.add_done_callback(callback)

这让我更加困惑 - iprint out bycallback的值是它被调用时的值,而不是它被定义时的值(所以我从来没有看到callback 0,但我得到了很多callback 99s)。再次ThreadPoolExecutor打印出期望值。

想知道这是否可能是一个错误,我尝试了 python 的最新开发版本。现在,代码至少似乎终止了,但我仍然得到错误的i打印值。

所以任何人都可以解释:

  • ProcessPoolExecutor在 python 3.2 和显然修复了这个死锁的当前开发版本之间发生了什么

  • i为什么要打印的“错误”值

编辑:正如jukiewicz在下面指出的那样,打印当然i会在调用回调时打印值,我不知道我在想什么......如果我传递一个值为的可调用对象i作为其属性之一,按预期工作。

编辑:更多信息:所有回调都已执行,因此看起来executor.shutdown(由 调用executor.__exit__)无法判断进程已完成。这似乎在当前的 python 3.3 中已完全修复,但似乎对 and 进行了很多更改multiprocessingconcurrent.futures所以我不知道是什么解决了这个问题。由于我不能使用 3.3(它似乎与 numpy 的发行版或开发版都不兼容),我尝试简单地将其多处理和并发包复制到我的 3.2 安装中,这似乎工作正常。尽管如此,据我所知,ProcessPoolExecutor在最新版本中完全被破坏但没有其他人受到影响,这似乎有点奇怪。

4

1 回答 1

3

我修改了代码如下,解决了这两个问题。函数被定义为一个闭包,因此每次callback都会使用更新的值。i至于死锁,这很可能是在所有任务完成之前关闭 Executor 的原因。等待期货完成也可以解决这个问题。

from concurrent import futures

def test(i):
    return i

def callback(f):
    print('callback {}'.format(f.result()))


with futures.ProcessPoolExecutor(4) as executor:
    fs = []
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test, i)
        future.add_done_callback(callback)
        fs.append(future)

    for _ in futures.as_completed(fs): pass

更新:哦,对不起,我还没有阅读你的更新,这似乎已经解决了。

于 2012-03-04T08:34:53.070 回答