2

我想使用 python 的 multiprocessing 模块运行多个 gearman 工作进程,但似乎该进程是以顺序模式执行的。如果我在多个终端中运行单独的 worker.py 程序,那么它工作正常。但是我想减轻在很多终端中手动指定 worker.py 的负担。有没有其他选择?

import sys , os , simplejson
from fabric import *
from fabric.api import *
import gearman
from gearman import GearmanWorker
from multiprocessing import Pool


##--Global Variables--##
#Spawing minimun 5 worker threads for Gearman



#executing the Job. gmJob consist of dict[host , cmd , pass] 
def exe_job(gmWorker , gmJob ):
 print " the worker process is " , os.getpid()
 d = simplejson.loads(gmJob.data)
 env.host_string = d['host'] 
 env.password = d['pass']  #will store the password .
 cmds = d['cmd']
 print cmds
 for i in cmds:
  sudo (i )  # using fabric functions to ssh into system  
 return "job sucessfull"

def start_exe():
 #woker node id to be specified in here
 gm_worker = gearman.GearmanWorker(['localhost:4730'])
 #gm_worker.set_client_id('client1')
 gm_worker.register_task('exe_job',exe_job)
 gm_worker.work()


if __name__ == '__main__':
 p = Pool(5)
 result = p.apply_async(start_exe)
 print result.get()
4

1 回答 1

1

我也找不到这个问题的答案,所以我深入研究并发现您基本上必须使用队列来跟踪您打开和关闭的进程(齿轮工工人的错误) . 无论如何,我将它构建成一个模块并将其发布在 pypi 上。这仍然是一项正在进行的工作,但我将尝试在第二天左右添加文档和示例:

我还包括了通过 json 进行通信的客户端和工作类(我提到这一点是因为您的示例似乎使用了 json)。

让我知道你的想法。我绝对可以用更多的眼光来寻找错误或告诉我我在哪里对代码做了一些完全疯狂的事情。

于 2012-01-18T03:11:30.137 回答