13

celery在我的应用程序中使用来运行定期任务。让我们看下面的简单示例

from myqueue import Queue
@perodic_task(run_every=timedelta(minutes=1))
def process_queue():
    queue = Queue()
    uid, questions = queue.pop()
    if uid is None:
        return

    job = group(do_stuff(q) for q in questions)
    job.apply_async()

def do_stuff(question):
    try:
        ...
    except:
        ...
        raise

正如您在上面的示例中看到的那样,我celery曾经运行异步任务,但是(因为它是一个队列)我需要queue.fail(uid)在出现异常do_stuffqueue.ack(uid)其他情况时执行此操作。在这种情况下,在这两种情况下从我的任务中获得一些回调将是非常清晰和有用的 -on_failureon_success.

我看过一些文档,但从未见过将回调与apply_async. 有可能这样做吗?

4

2 回答 2

45

继承 Task 类并重载 on_success 和 on_failure 函数:

class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        pass

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        pass


@celery.task(base=CallbackTask)  # this does the trick
def add(x, y):
    return x + y
于 2012-11-21T13:27:27.073 回答
8

您可以在调用 apply_async 时通过 link 和 link_err kwargs 指定成功和错误回调。芹菜文档包括一个明确的例子:http ://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks

于 2013-01-04T16:41:17.213 回答