46

我有一个 Python 脚本,我想将它用作另一个 Python 脚本的控制器。我有一个有 64 个处理器的服务器,所以想为第二个 Python 脚本生成多达 64 个子进程。子脚本被称为:

$ python create_graphs.py --name=NAME

其中 NAME 类似于 XYZ、ABC、NYU 等。

在我的父控制器脚本中,我从列表中检索名称变量:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

所以我的问题是,作为孩子产生这些过程的最佳方式是什么?我想一次将子进程的数量限制为 64,因此需要跟踪状态(如果子进程已完成或未完成),以便我可以有效地保持整代运行。

我考虑使用 subprocess 包,但拒绝了它,因为它一次只产生一个孩子。我终于找到了多处理器包,但我承认被整个线程与子进程文档所淹没。

现在,我的脚本subprocess.call一次只生成一个孩子,如下所示:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

我真的希望它一次产生 64 个孩子。在其他 stackoverflow 问题中,我看到人们使用队列,但这似乎会影响性能?

4

4 回答 4

67

您正在寻找的是多处理中的进程池类。

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

为了便于理解,这里有一个计算示例。以下将在 N 个进程上划分 10000 个任务,其中 N 是 cpu 计数。请注意,我将 None 作为进程数传递。这将导致 Pool 类使用 cpu_count 作为进程数(参考

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results
于 2009-05-19T20:26:13.217 回答
3

这是我根据 Nadia 和 Jim 的评论提出的解决方案。我不确定这是否是最好的方法,但它确实有效。被调用的原始子脚本需要是一个 shell 脚本,因为我需要使用一些 3rd 方应用程序,包括 Matlab。所以我不得不把它从 Python 中取出并用 bash 编码。

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

这似乎是一个合理的解决方案?我尝试使用 Jim 的 while 循环格式,但我的脚本什么也没返回。我不确定为什么会这样。这是我用 Jim 的“while”循环替换“for”循环运行脚本时的输出:

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

当我用“for”循环运行它时,我得到了一些更有意义的东西:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

所以这行得通,我很高兴。但是,我仍然不明白为什么我不能使用 Jim 的 'while' 样式循环而不是我正在使用的 'for' 循环。感谢所有的帮助 - 我对 @stackoverflow 的知识广度印象深刻。

于 2009-06-17T16:16:38.957 回答
1

我肯定会使用多处理而不是使用子进程滚动我自己的解决方案。

于 2009-05-19T20:04:01.373 回答
1

我认为您不需要队列,除非您打算从应用程序中获取数据(如果您确实需要数据,我认为将其添加到数据库可能更容易)

但试试这个尺寸:

将您的 create_graphs.py 脚本的内容全部放入一个名为“create_graphs”的函数中

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

我知道这将导致比处理器少 1 个线程,这可能很好,它让处理器来管理线程、磁盘 i/o 以及计算机上发生的其他事情。如果您决定要使用最后一个核心,只需添加一个即可

编辑:我想我可能误解了 my_list 的目的。您根本不需要my_list跟踪线程(因为它们都被threads列表中的项目引用)。但这是提供进程输入的好方法——甚至更好:使用生成器函数;)

的目的my_listthreads

my_list保存您需要在函数中处理的数据
threads只是当前正在运行的线程的列表

while 循环做了两件事,启动新线程来处理数据,并检查是否有任何线程完成运行。

因此,只要您有(a)更多数据要处理,或者(b)未完成运行的线程......您想要编程以继续运行。一旦两个列表都为空,它们将评估为False并且 while 循环将退出

于 2009-05-19T20:04:26.350 回答