8

我的 python redis 队列中有一个嵌套的作业结构。首先执行 rncopy 作业。完成后,接下来是 3 个依赖注册作业。当所有这 3 个作业的计算完成后,我想触发一个作业以向我的前端发送 websocket 通知。

我目前的尝试:

    rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
    t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
    t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
    fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
    notify = redisqueue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))

不幸的是,多作业依赖功能似乎从未合并到主控中。我看到目前在 git 上有两个拉取请求。有没有我可以使用的解决方法?

很抱歉未能提供可重现的示例。

4

2 回答 2

3

新版本 (RQ >= 1.8)

您可以简单地使用depends_on参数,传递列表或元组。

rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)

notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))

# you can also use a list instead of a tuple:
# notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=[t1c_reg, t2_reg, fla_reg])

旧版本 (RQ < 1.8)

我使用这个解决方法:如果依赖项是n,我创建n-1个真实函数的包装器:每个包装器依赖于不同的工作。

这个解决方案有点内卷,但它有效。

rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)

notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=t1c_reg)

def first_wrapper(patient_finished, patientid,t2_reg_id,fla_reg_id):
    queue = Queue('YOUR-QUEUE-NAME'))
    queue.enqueue(second_wrapper, patient_finished, patientid, fla_reg_id, timeout=6000, depends_on=t2_reg_id)

def second_wrapper(patient_finished, patientid,fla_reg_id):
    queue = Queue('YOUR-QUEUE-NAME'))
    queue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=fla_reg_id)

一些警告:

  • 我没有将队列对象传递给包装器,因为会出现一些序列化问题;所以,队列必须按名称恢复......

  • 出于同样的原因,我将 job.id(而不是 job 对象)传递给包装器。

于 2020-05-10T13:27:12.280 回答
3

我创建了一个“rq-manager”来解决具有多个和树状依赖的类似问题: https ://github.com/crispyDyne/rq-manager

具有多个依赖项的项目结构如下所示。

def simpleTask(x):
    return 2*x
project = {'jobs':[
            {
                'blocking':True, # this job must finished before moving on.
                'func':simpleTask,'args': 0
            },
            {
                'blocking':True, # this job, and its child jobs, must finished before moving on.
                'jobs':[ # these child jobs will run in parallel
                    {'func':simpleTask,'args': 1},
                    {'func':simpleTask,'args': 2},
                    {'func':simpleTask,'args': 3}],
            },
            { # this job will only run when the blocking jobs above finish.
                'func':simpleTask,'args': 4
            }
        ]}

然后交给经理来完成。

from rq_manager import manager, getProjectResults

managerJob = q.enqueue(manager,project)
projectResults = getProjectResults(managerJob)

返回

projectResults = [0, [2, 4, 6], 8]

当依赖作业需要父级的结果时。我创建了一个执行第一个作业的函数,然后将其他作业添加到项目中。所以对于你的例子:

def firstTask(patientid,imagepath):

    raw_nifti_result  = raw_nifti_copymachine(patientid,imagepath)

    moreTasks = {'jobs':[
        {'func':modality_registrator,'args':(patientid, "t1c", raw_nifti_result)},
        {'func':modality_registrator,'args':(patientid, "t2", raw_nifti_result)},
        {'func':modality_registrator,'args':(patientid, "fla", raw_nifti_result)},
    ]}

    # returning a dictionary with an "addJobs" will add those tasks to the project. 
    return {'result':raw_nifti_result, 'addJobs':moreTasks}

该项目将如下所示:

project = {'jobs':[
            {'blocking':True, # this job, and its child jobs, must finished before moving on.
             'jobs':[
                {
                    'func':firstTask, 'args':(patientid, imagepath)
                    'blocking':True, # this job must finished before moving on.
                },
                # "moreTasks" will be added here
                ]
            }
            { # this job will only run when the blocking jobs above finish.
                'func':print,'args': (patient_finished, patientid)
            }
        ]}

如果最终作业需要先前作业的结果,则设置“previousJobArgs”标志。“finalJob”将接收先前结果的数组及其子作业结果的嵌套数组。

def finalJob(previousResults):
    # previousResults = [ 
    #     raw_nifti_copymachine(patientid,imagepath),
    #     [
    #         modality_registrator(patientid, "t1c", raw_nifti_result),
    #         modality_registrator(patientid, "t2", raw_nifti_result),
    #         modality_registrator(patientid, "fla", raw_nifti_result),
    #     ]
    # ]
    return doSomethingWith(previousResults)

然后项目看起来像这样

project = {'jobs':[
            {
             #'blocking':True, # Blocking not needed.
             'jobs':[
                {
                    'func':firstTask, 'args':(patientid, imagepath)
                    'blocking':True, # this job must finished before moving on.
                },
                # "moreTasks" will be added here
                ]
            }
            { # This job will wait, since it needs the previous job's results. 
                'func':finalJob, 'previousJobArgs': True # it gets all the previous jobs results
            }
        ]}

希望https://github.com/rq/rq/issues/260得到实施,我的解决方案将过时!

于 2020-07-29T20:17:46.313 回答