2

我正在编写一个 python 脚本(用于 cygwin 和 linux 环境)来对使用 subprocess.Popen() 从命令行运行的程序运行回归测试。基本上,我有一组作业,其中一部分需要根据开发人员的需要运行(大约 10 到 1000 个)。每项工作可能需要几秒钟到 20 分钟才能完成。

我的作业在多个处理器上成功运行,但我试图通过智能地排序作业(基于过去的性能)以首先运行更长的作业来节省一些时间。复杂之处在于,某些作业(稳态计算)需要先于其他作业(基于稳态确定的初始条件的瞬态)运行。

我目前的处理方法是在同一个进程上递归地运行父作业和所有子作业,但有些作业有多个长期运行的子作业。父作业完成后,我想将子代添加回池中以供其他进程使用,但需要将它们添加到队列的头部。我不确定我可以用 multiprocessing.Pool 做到这一点。我用 Manager 找了一些例子,但它们似乎都是基于网络的,并不是特别适用。任何以代码形式提供的帮助或指向关于多处理的好教程的链接(我已经用谷歌搜索了......)将不胜感激。这是到目前为止我所拥有的代码的骨架,评论指出我希望在其他处理器上产生的子作业。

import multiprocessing
import subprocess

class Job(object):
  def __init__(self, popenArgs, runTime, children)
    self.popenArgs = popenArgs #list to be fed to popen
    self.runTime = runTime #Approximate runTime for the job
    self.children = children #Jobs that require this job to run first

def runJob(job):
  subprocess.Popen(job.popenArgs).wait()
  ####################################################
  #I want to remove this, and instead kick these back to the pool
  for j in job.children: 
    runJob(j)
  ####################################################

def main(jobs):
  # This jobs argument contains only jobs which are ready to be run
  # ie no children, only parent-less jobs
  jobs.sort(key=lambda job: job.runTime, reverse=True)
  multiprocessing.Pool(4).map(runJob, jobs)
4

1 回答 1

1

首先,让我再说一遍 Armin Rigo 的评论:这里没有理由使用多进程而不是多线程。在控制过程中,您大部分时间都在等待子流程完成;您没有需要并行处理的 CPU 密集型工作。

使用线程还可以更轻松地解决您的主要问题。现在,您将作业存储在其他作业的属性中,即隐式依赖图。您需要一个单独的数据结构来根据调度对作业进行排序。此外,每个作业树当前都与一个工作进程相关联。您希望将您的工作人员与用于保存工作的数据结构分离。然后每个工人从同一个任务队列中提取工作;工作人员完成其工作后,它将工作的孩子排入队列,然后可以由任何可用的工作人员处理。

由于您希望在父作业完成后将子作业插入行的最前面,因此类似堆栈的容器似乎符合您的需求;该模块提供了一个可以使用 Queue的线程安全类。LifoQueue

import threading
import subprocess
from Queue import LifoQueue

class Job(object):
  def __init__(self, popenArgs, runTime, children):
    self.popenArgs = popenArgs
    self.runTime = runTime
    self.children = children

def run_jobs(queue):
  while True:
    job = queue.get()
    subprocess.Popen(job.popenArgs).wait()
    for child in job.children: 
      queue.put(child)
    queue.task_done()

# Parameter 'jobs' contains the jobs that have no parent.
def main(jobs):
  job_queue = LifoQueue()
  num_workers = 4
  jobs.sort(key=lambda job: job.runTime)
  for job in jobs:
    job_queue.put(job)
  for i in range(num_workers):
    t = threading.Thread(target=run_jobs, args=(job_queue,))
    t.daemon = True
    t.start()
  job_queue.join()

几点注意事项:(1)我们无法通过监视工作线程知道所有工作何时完成,因为它们不跟踪要完成的工作。这是队列的工作。因此主线程监视队列对象以了解所有工作何时完成(job_queue.join())。因此,我们可以将工作线程标记为守护线程,因此只要主线程执行,进程就会退出,而无需等待工作线程。因此,我们避免了在主线程和工作线程之间进行通信的需要,以便告诉后者何时跳出它们的循环并停止。

(2) 当所有已入队的任务都被标记为已完成时,我们知道所有工作都已完成(具体而言,当task_done()被调用的次数等于已入队的项目数时)。以队列为空作为所有工作完成的条件是不可靠的;在从中弹出作业和将该作业的子项排入队列之间,队列可能会暂时且误导性地为空。

于 2013-08-16T17:10:03.430 回答