10

我试图通过查询每个任务状态来获取任务链的进度。但是当通过它的 id 检索链时,我得到了一些行为不同的对象。

在任务.py

from celery import Celery

celery = Celery('tasks')
celery.config_from_object('celeryconfig')

def unpack_chain(nodes): 
    while nodes.parent:
        yield nodes.parent
        nodes = nodes.parent
    yield nodes

@celery.task
def add(num, num2):
    return num + num2

从 ipython 查询时...

In [43]: from celery import chain
In [44]: from tasks import celery, add, unpack_chain
In [45]: c = chain(add.s(3,3), add.s(10).set(countdown=100))
In [46]: m = c.apply_async()
In [47]: a = celery.AsyncResult(m.id)
In [48]: a == m
Out[48]: True
In [49]: a.id == m.id
Out[49]: True
In [50]: [t.status for t in list(unpack_chain(a))]
Out[50]: ['PENDING']
In [51]: [t.status for t in list(unpack_chain(m))]
Out[51]: ['PENDING', 'SUCCESS']

在 Redis 下使用 Python 2.7.3 和 Celery 3.0.19。

正如您在50 和 51中看到的那样,返回的值celery.AsyncResult与原始链不同。

如何通过链 id 获取原始链任务列表?

4

1 回答 1

10

就像@Hernantz 所说,您不能仅从任务 ID 中恢复父链,您必须遍历您的队列,这可能取决于您用作代理的方式,也可能不可能。

但是,如果您有最后一个任务 ID 来进行查找,那么您就有了链,您只需要存储所有任务 ID 并在需要检查它们的状态时重建链。您可以使用以下功能:

def store(node):
    id_chain = []
    while node.parent:
      id_chain.append(node.id)
      node = node.parent
    id_chain.append(node.id)
    return id_chain

def restore(id_chain):
    id_chain.reverse()
    last_result = None
    for tid in id_chain:
        result = celery.AsyncResult(tid)
        result.parent = last_result
        last_result = result
    return last_result

当您第一次从链中获取 AsyncResult 时调用 store。调用 restore 会给你一个链表,AsyncResult就像链给你一样。

于 2014-05-28T09:54:06.060 回答