2

我正在使用 Python 程序来计算浮点数列表的平均值。遵循程序逻辑:

  1. 该程序以一些参数开始。
  2. 创建了一个“主机组”列表。
  3. For-in 循环遍历列表“hostgroups”,启动一个函数 worker(hosgroup,var1, var2,var3,...)
  4. 在工作者函数内部,使用工作者的一些输入变量构建了两个变量
    • 4a。在工作人员内部,使用工作人员的一些输入变量调用子工作人员函数
    • 4b。subworker 返回一些新变量
    • 4c。回到工人
    • 4d。有些事情已经完成
    • 4d。最后在worker中调用了一个带有一些变量的final-function。

到目前为止,一切顺利!

我的下一步是设置多处理...谁能提供帮助?

更新:这是我的实际方法:

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, hostgroup, lock):
        self.hostgroup = hostgroup
        self.lock = lock
    def __call__(self):
        print 'Doing something fancy for %s!' % self.hostgroup
        try:
            lock.acquire()
            worker(self.hostgroup,hostsfile,mod_inputfile,outputdir,testmode,backup_dir,start_time,end_time,rrdname,unit,yesterday,now_epoch,rrd_interval,rrd_heartbeat,name)
        finally:
            lock.release()
    def __str__(self):
        return 'str %s' % self.hostgroup

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print 'Creating %d consumers' % num_consumers
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    for hostgroup in hostgroups:
        tasks.put(Task(hostgroup,lock))

    # Add a poison pill for each consumer
    for i in xrange(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

---> 很好,到目前为止!但是不可能有锁,所有的结果都是一样的......为什么 lock.acquire() 不起作用?

4

1 回答 1

3

我发现multiprocessing.Pool它比 Queue 类更容易使用。基本设置是

from multiprocessing import Pool
p = Pool(processes=<number of processes>)
p.map(function, [a, b, c])

它将在独立进程中调用function(a), function(b),function(c)

于 2012-08-07T13:45:05.520 回答