0

我正在尝试更改 python-gearman 工人在其工作周期内可用的任务。我这样做的原因是允许我对我的工作进程进行一点控制,并允许它们从数据库中重新加载。我需要每个工作人员定期重新加载,但我不想简单地终止进程,我希望服务始终可用,这意味着我必须分批重新加载。所以我会让 4 名工人重新加载,而另外 4 名工人可以处理,然后重新加载接下来的 4 名工人。

过程:

  1. 开始重新加载过程 4 次。
    1. 注销reload进程
    2. 重新加载数据集
    3. 注册一个finishReload任务
    4. 返回
  2. 重复步骤 1,直到没有reload注册任务的工作人员。
  3. 开始finishReload(1) 任务,直到没有finishReload可用任务的工人。

(1)finishReload任务注销finishReload任务并注册reload任务然后返回。

现在,我遇到的问题是,当我更改工作进程可用的任务时,作业会失败。没有错误消息或异常,gearmand 日志中只有一个“错误”。这是一个复制问题的快速程序。

工人

import gearman 
def reversify(gmWorker, gmJob): 
        return "".join(gmJob.data[::-1]) 
def strcount(gmWorker, gmJob): 
        gmWorker.unregister_task('reversify')  # problem line 
        return str(len(gmJob.data)) 
worker = gearman.GearmanWorker(['localhost:4730']) 
worker.register_task('reversify', reversify) 
worker.register_task('strcount', strcount) 
while True: 
        worker.work() 

客户

import gearman 
client = gearman.GearmanClient(['localhost:4730']) 
a = client.submit_job('reversify', 'spam and eggs') 
print a.result 
>>> sgge dna maps 

a = client.submit_job('strcount', 'spam and eggs') 
...

请让我知道是否有任何我可以解释的事情。

编辑:我知道有人会要求查看我提到的日志。我也将这个问题发布到了 Google 上的 gearman 组,并且那里有日志

4

2 回答 2

1

看起来像子类化 GearmanWorker 类并添加一些标志可以解决这个问题。在开始从工作人员向服务器发出新命令之前,我需要让作业完成,这似乎会中断当前作业。因此,如果我们覆盖该on_job_complete函数,我们可以检查启用/禁用标志并在我们发出send_job_complete命令后对其进行操作。新的工人计划如下:

工人

import gearman

def reversify(gmWorker, gmJob):
        return "".join(gmJob.data[::-1])

def enable_reversify(gmWorker, gmJob):
        myWorker.enableReversify = 1
        return 'OK'

def strcount(gmWorker, gmJob):
        myWorker.enableReversify = -1
        return str(len(gmJob.data))

class myWorker(gearman.GearmanWorker):

        enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on

        def on_job_complete(self, current_job, job_result):
                self.send_job_complete(current_job, job_result)
                ### check the flag here and enable or disable tasks ###
                if myWorker.enableReversify == -1:
                        self.unregister_task('reversify')
                if myWorker.enableReversify == 1:
                        self.register_task('reversify', reversify)
                myWorker.enableReversify = 0 # reset the flag
                return True

worker = myWorker(['localhost:4730']) 
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)

while True:
        worker.work() 
于 2011-03-21T15:46:50.483 回答
0

乍一看,问题似乎是您正在开始一项工作,然后在工作完成之前从工作服务器取消注册工作人员执行该工作的能力。

于 2011-03-21T01:56:25.543 回答