3

I have a python script which is something like that:

def test_run():
     global files_dir
     for f1 in os.listdir(files_dir):
          for f2 os.listdir(files_dir):
               os.system("run program x on f1 and f2")

what is the best way to call each of the os.system calls on different processor? using subprocess or multiprocessing pool?

NOTE : each run of the program will generate an output file.

4

2 回答 2

9

@unutbu 的回答很好,但有一种破坏性较小的方法:使用 aPool来传递任务。然后你就不必为自己的队列搞砸了。例如,

import os
NUM_CPUS = None  # defaults to all available

def worker(f1, f2):
    os.system("run program x on f1 and f2")

def test_run(pool):
     filelist = os.listdir(files_dir)
     for f1 in filelist:
          for f2 in filelist:
               pool.apply_async(worker, args=(f1, f2))

if __name__ == "__main__":
     import multiprocessing as mp
     pool = mp.Pool(NUM_CPUS)
     test_run(pool)
     pool.close()
     pool.join()

那“看起来更像”您开始使用的代码。并不是说这一定是一件好事;-)

在 Python 3 的最新版本中,Pool对象也可以用作上下文管理器,因此尾部可以简化为:

if __name__ == "__main__":
     import multiprocessing as mp
     with mp.Pool(NUM_CPUS) as pool:
         test_run(pool)

编辑:使用 concurrent.futures 代替

对于像这样的非常简单的任务,Python 3concurrent.futures可以更容易使用。test_run()从上往下替换上面的代码,如下所示:

def test_run():
     import concurrent.futures as cf
     filelist = os.listdir(files_dir)
     with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
         for f1 in filelist:
             for f2 in filelist:
                 pp.submit(worker, f1, f2)

if __name__ == "__main__":
     test_run()

如果您不希望工作进程中的异常无声地消失,那么它需要更加出色。这是所有并行机制的潜在问题。问题是在主程序中通常没有好的方法来引发异常,因为它们发生在可能与主程序当时正在做的事情无关的上下文(工作进程)中。在主程序中获得(重新)引发的异常的一种方法是显式询问结果;例如,将上面的内容更改为:

def test_run():
     import concurrent.futures as cf
     filelist = os.listdir(files_dir)
     futures = []
     with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
         for f1 in filelist:
             for f2 in filelist:
                 futures.append(pp.submit(worker, f1, f2))
     for future in cf.as_completed(futures):
         future.result()

然后,如果工作进程中发生异常,当它应用于表示失败的进程间调用future.result()的对象时,它将在主程序中重新引发该异常。Future

此时可能比您想知道的要多;-)

于 2013-12-29T01:44:10.033 回答
3

您可以混合使用subprocessmultiprocessing。为什么两者都有?如果您只是天真地使用子流程,您将产生与任务一样多的子流程。您可以轻松地拥有数千个任务,并且同时产生许多子流程可能会使您的机器瘫痪。

因此,您可以改为使用multiprocessing仅生成与您的机器具有 CPU 一样多的工作进程 ( mp.cpu_count())。然后每个工作进程可以从队列中读取任务(文件名对),并产生一个子进程。然后工作人员应该等到子进程完成后再处理队列中的另一个任务。

import multiprocessing as mp
import itertools as IT
import subprocess

SENTINEL = None
def worker(queue):
    # read items from the queue and spawn subproceses
    # The for-loop ends when queue.get() returns SENTINEL
    for f1, f2 in iter(queue.get, SENTINEL):
        proc = subprocess.Popen(['prog', f1, f2])
        proc.communicate()

def test_run(files_dir):
    # avoid globals when possible. Pass files_dir as an argument to the function
    # global files_dir  
    queue = mp.Queue()

    # Setup worker processes. The workers will all read from the same queue.
    procs = [mp.Process(target=worker, args=[queue]) for i in mp.cpu_count()]
    for p in procs:
        p.start()

    # put items (tasks) in the queue
    files = os.listdir(files_dir)
    for f1, f2 in IT.product(files, repeat=2):
        queue.put((f1, f2))
    # Put sentinels in the queue to signal the worker processes to end    
    for p in procs:    
        queue.put(SENTINEL)

    for p in procs:
        p.join()
于 2013-12-29T01:28:40.723 回答