1

我在 Python 中使用多处理池及其.apply_async()同时运行多个工作程序的方法。

但是由于使用with而不是任意创建实例而存在问题。

这是我到目前为止所做的:


常见部分代码片段:

from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time

def worker(x):
    print(f"{x} started.")
    sleep(x)
    print(f"{x} finished.")
    return f"{x} finished."

result_list = []
def log_result(result):
    result_list.append(result)

第一个通过Python 2方式运行良好的代码片段:

tick = time()

pool = Pool()
for i in range(6):
    pool.apply_async(worker, args=(i, ), callback=log_result)
pool.close()
pool.join()

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

出去:

1 started.
2 started.
0 started.
0 finished.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time:  6.022687673568726
['0 finished.', '1 finished.', '2 finished.', '3 finished.', '4 finished.', '5 finished.']
5

通过Python 3方式运行良好的第二个代码片段:

tick = time()

with ProcessPoolExecutor() as executor:
    for i in range(6):
        executor.submit(worker, i)

print('Total elapsed time: ', time() - tick)
print(i)  # Indicates that all iteration has been done.

出去:

0 started.
0 finished.
1 started.
2 started.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time:  6.017550945281982
5

额外的:

  • 可以推断出Python 3的方式比Python 2的方式快。

问题:

现在问题出在这里,我想使用Python 3方法实现Python 2with方式,但任务未完成:

tick = time()

with Pool() as pool:
    for i in range(6):
        pool.apply_async(worker, args=(i,), callback=log_result)

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

出去:

Total elapsed time:  0.10628008842468262
[]
5

但是,如果我在一些精简任务完成sleep(1)后放置了一个(建立一个块):pool.apply_async(...)

tick = time()

with Pool() as pool:
    for i in range(6):
        pool.apply_async(worker, args=(i,), callback=log_result)
        sleep(1)

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

出去:

0 started.
0 finished.
1 started.
2 started.
1 finished.
3 started.
4 started.
2 finished.
5 started.
3 finished.
Total elapsed time:  6.022568702697754
['0 finished.', '1 finished.', '2 finished.', '3 finished.']
5

我错过了什么?

4

1 回答 1

1

concurrent.futures.Executor并且multiprocessing.Pool有两个完全不同的上下文管理器实现。

concurrent.futures.Executor根据文档调用shutdown(wait=True)有效地等待所有排队的作业完成。

如果您使用 with 语句,您可以避免显式调用此方法,这将关闭 Executor(就像 Executor.shutdown() 在等待设置为 True 时调用一样)

multiprocessing.Pool调用terminate而不是close然后join导致所有正在进行的工作过早中断。在文档中。

池对象现在支持上下文管理协议 - 请参阅上下文管理器类型。enter () 返回池对象,exit () 调用 terminate()。

如果你想multiprocessing.Pool和它的上下文管理器一起使用,你需要自己等待结果。

with Pool() as pool:
    async_result = pool.apply_async(worker, args=(i,), callback=log_result)
    async_result.wait()
于 2020-01-13T11:57:42.307 回答