6

我正在使用 celery 开发软件升级系统。我有一个我正在努力干净地实施的用例。以下是我的工作:

device_software_updates(device_id)

returns a list of software updates that need to be installed on a device

已安装的设备软件(设备 ID)

returns the software modules that are currently installed on a device

latest_device_software(device_id)

returns the latest software versions available for a device

软件更新(已安装软件、最新软件)

returns the latest software modules that are not installed

在纯 python 中,device_software_updates 的实现可能看起来像

def device_software_updates(device_id):
    return software_updates(installed_device_software(device_id),
                            latest_device_software(device_id))

在 Celery 3.0 中实现这一点的最干净的方法是什么?我想使用组做一些事情。我当前的实现如下所示:

def device_software_updates(device_id):
    return (
        group(installed_device_software.s(device_id),
              latest_device_software.s(device_id)) |
        software_updates.s()
    )()

不幸的是,这意味着 software_updates 的 argspecsoftware_updates(arg_list)并不理想。

4

1 回答 1

7

我相信使用和弦是处理这个问题的正确方法。

根据http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups上的 Celery 文档,

和弦是仅在任务集中的所有任务都完成执行后才执行的任务。

...

和弦就像一个组,但有一个回调。一个和弦由一个标题组和一个正文组成,其中正文是一个任务,应该在标题中的所有任务完成后执行。

这是一个逐行分解的示例(来自 Celery 文档)

callback = tsum.subtask()
header = [add.subtask((i, i)) for i in xrange(100)]
result = chord(header)(callback)
result.get()

在您的情况下,您可以执行类似的操作,例如:

@celery.task
def device_software_updates():
    callback = software_updates.subtask()
    header = [
              installed_device_software.subtask(device_id), 
              latest_device_software.s(device_id) 
             ]
    result = chord(header)(callback)
    return result.get()
于 2012-08-24T03:46:48.417 回答