19

我想同时运行 program.py 的多个实例,同时限制同时运行的实例数量(例如,限制在我的系统上可用的 CPU 内核数量)。例如,如果我有 10 个内核并且总共需要运行 1000 次 program.py,那么在任何给定时间只会创建并运行 10 个实例。

我尝试过使用多处理模块、多线程和使用队列,但在我看来,没有任何东西可以轻松实现。我遇到的最大问题是找到一种方法来限制同时运行的进程数量。这很重要,因为如果我一次创建 1000 个进程,它就相当于一个分叉炸弹。我不需要以编程方式从进程返回的结果(它们输出到磁盘),并且所有进程都彼此独立运行。

谁能给我建议或示例,说明如何在 python 甚至 bash 中实现它?我会发布到目前为止使用队列编写的代码,但它不能按预期工作,并且可能已经走错了路。

非常感谢。

4

4 回答 4

26

我知道您提到 Pool.map 方法对您没有多大意义。该地图只是为其提供工作源的一种简单方法,并且可以调用以应用于每个项目。地图的func可以是对给定参数进行实际工作的任何入口点。

如果这似乎不适合您,我在这里有一个关于使用生产者-消费者模式的非常详细的答案:https ://stackoverflow.com/a/11196615/496445

本质上,您创建了一个队列,并启动了 N 个工人。然后,您要么从主线程提供队列,要么创建一个提供队列的生产者进程。工作人员只是继续从队列中获取工作,并且发生的并发工作永远不会超过您启动的进程数。

您还可以选择对队列进行限制,以便在已经有太多未完成的工作时阻塞生产者,如果您还需要限制生产者消耗的速度和资源。

被调用的工作函数可以做任何你想做的事情。这可以是一些系统命令的包装器,或者它可以导入您的 python 库并运行主例程。那里有特定的进程管理系统,可让您设置配置以在有限的资源下运行任意可执行文件,但这只是执行此操作的基本 python 方法。

我的另一个答案的片段:

基本池:

from multiprocessing import Pool

def do_work(val):
    # could instantiate some other library class,
    # call out to the file system,
    # or do something simple right here.
    return "FOO: %s" % val

pool = Pool(4)
work = get_work_args()
results = pool.map(do_work, work)

使用流程管理器和生产者

from multiprocessing import Process, Manager
import time
import itertools

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()

        # exit signal 
        if item == None:
            return

        # fake work
        time.sleep(.5)
        result = item

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    # this could also be started in a producer process
    # instead of blocking
    iters = itertools.chain(get_work_args(), (None,)*num_workers)
    for item in iters:
        work.put(item)

    for p in pool:
        p.join()

    print results
于 2012-08-17T01:02:10.217 回答
3

您应该使用流程主管。一种方法是使用Circus提供的 API “以编程方式”执行此操作,文档站点现在处于脱机状态,但我认为这只是一个暂时的问题,无论如何,您可以使用 Circus 来处理这个问题。另一种方法是使用supervisord并将进程的参数设置为numprocs您拥有的核心数。

使用 Circus 的示例:

from circus import get_arbiter

arbiter = get_arbiter("myprogram", numprocesses=3)
try:
    arbiter.start()
finally:
    arbiter.stop()
于 2012-08-17T00:18:00.793 回答
2

Bash 脚本而不是 Python,但我经常将它用于简单的并行处理:

#!/usr/bin/env bash
waitForNProcs()
{
 nprocs=$(pgrep -f $procName | wc -l)
 while [ $nprocs -gt $MAXPROCS ]; do
  sleep $SLEEPTIME
  nprocs=$(pgrep -f $procName | wc -l)
 done
}
SLEEPTIME=3
MAXPROCS=10
procName=myPython.py
for file in ./data/*.txt; do
 waitForNProcs
 ./$procName $file &
done

或者对于非常简单的情况,另一个选项是 xargs 其中 P 设置 procs 的数量

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 
于 2012-08-17T00:04:56.037 回答
1

虽然关于使用 multiprocessing.pool 有很多答案,但关于如何使用 multiprocessing.Process 的代码片段并不多,这在内存使用问题时确实更有益。启动 1000 个进程会使 CPU 过载并杀死内存。如果每个进程及其数据管道都是内存密集型的,那么 OS 或 Python 本身就会限制并行进程的数量。我开发了以下代码来限制同时批量提交给 CPU 的作业数量。批量大小可以与 CPU 内核的数量成比例地缩放。在我的 Windows PC 中,每批作业的数量可以高达可用 CPU 课程的 4 倍。

import multiprocessing
def func_to_be_multiprocessed(q,data):
    q.put(('s'))
q = multiprocessing.Queue()
worker = []
for p in range(number_of_jobs):
    worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \
        args=(q,data)...))
num_cores = multiprocessing.cpu_count()
Scaling_factor_batch_jobs = 3.0
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs
num_of_batches = number_of_jobs // num_jobs_per_batch
for i_batch in range(num_of_batches):
    floor_job = i_batch * num_jobs_per_batch
    ceil_job  = floor_job + num_jobs_per_batch
    for p in worker[floor_job : ceil_job]:
                                         worker.start()
    for p in worker[floor_job : ceil_job]:
                                         worker.join()
for p in worker[ceil_job :]:
                           worker.start()
for p in worker[ceil_job :]:
                           worker.join()
for p in multiprocessing.active_children():
                           p.terminate()
result = []
for p in worker:
   result.append(q.get())

唯一的问题是,如果任何批次中的任何作业无法完成并导致挂起情况,则其余批次的作业将不会启动。因此,要处理的函数必须具有适当的错误处理例程。

于 2017-12-29T13:16:50.760 回答