@unutbu 的回答很好,但有一种破坏性较小的方法:使用 aPool
来传递任务。然后你就不必为自己的队列搞砸了。例如,
import os
NUM_CPUS = None # defaults to all available
def worker(f1, f2):
os.system("run program x on f1 and f2")
def test_run(pool):
filelist = os.listdir(files_dir)
for f1 in filelist:
for f2 in filelist:
pool.apply_async(worker, args=(f1, f2))
if __name__ == "__main__":
import multiprocessing as mp
pool = mp.Pool(NUM_CPUS)
test_run(pool)
pool.close()
pool.join()
那“看起来更像”您开始使用的代码。并不是说这一定是一件好事;-)
在 Python 3 的最新版本中,Pool
对象也可以用作上下文管理器,因此尾部可以简化为:
if __name__ == "__main__":
import multiprocessing as mp
with mp.Pool(NUM_CPUS) as pool:
test_run(pool)
编辑:使用 concurrent.futures 代替
对于像这样的非常简单的任务,Python 3concurrent.futures
可以更容易使用。test_run()
从上往下替换上面的代码,如下所示:
def test_run():
import concurrent.futures as cf
filelist = os.listdir(files_dir)
with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
for f1 in filelist:
for f2 in filelist:
pp.submit(worker, f1, f2)
if __name__ == "__main__":
test_run()
如果您不希望工作进程中的异常无声地消失,那么它需要更加出色。这是所有并行机制的潜在问题。问题是在主程序中通常没有好的方法来引发异常,因为它们发生在可能与主程序当时正在做的事情无关的上下文(工作进程)中。在主程序中获得(重新)引发的异常的一种方法是显式询问结果;例如,将上面的内容更改为:
def test_run():
import concurrent.futures as cf
filelist = os.listdir(files_dir)
futures = []
with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
for f1 in filelist:
for f2 in filelist:
futures.append(pp.submit(worker, f1, f2))
for future in cf.as_completed(futures):
future.result()
然后,如果工作进程中发生异常,当它应用于表示失败的进程间调用future.result()
的对象时,它将在主程序中重新引发该异常。Future
此时可能比您想知道的要多;-)