15

即使主要任务失败,是否可以运行和弦回调?

我创建了一个和弦,我添加了一堆任务并注册了一个回调。我的问题是,如果其中一项任务失败,则不会触发回调,但我希望以任何一种方式触发回调。

我试图用 si() 注册回调(不变性

callback = tasks.run_delete_rule.si([timestamp])
header = [tasks.run_update_rule.s(i, timestamp) for i in item_ids]
result = chord(header)(callback)

我还尝试将参数添加ignore_result=True到两个任务装饰器中,但没有成功。

4

2 回答 2

11

从 github 问题#1881如果回调具有link_error选项集,它采用任务名称列表,那么当和弦的任务失败时,link_error 任务将被执行。

@task(name='super_task.good')
def good():
    return True

@task(name='super_task.raise_exception')
def raise_exception():
    raise ValueError('error')

@task(name='super_task.callback')
def callback(*args, **kwargs):
    logger.info('callback')
    logger.info(args)
    logger.info(kwargs)
    return 'finished'

@task(name='super_task.error_callback')
def error_callback(*args, **kwargs):
    logger.info('error_callback')
    logger.info(args)
    logger.info(kwargs)
    return 'error'

>>> c = chord(
        [raise_exception.s(), good.s(), raise_exception.s()], 
        callback.s().set(link_error=['super_task.error_callback'])
    )
>>> result = c()

这将执行和弦,并且在您的 celery 日志中,您会看到raise_exception任务失败,并且其执行error_callback将在它的 args 中接收 task_id 的callback

此时, 的值result将包含 的AsyncResult实例callback,并且因为在一个和弦中错误传播到回调做result.get()将引发任务的异常并result.traceback为您提供回溯。

如果你想要一个回调,只需将和弦回调的名称传递给link_error

callback.s().set(link_error='super_task.callback')

笔记

另一个选项是设置CELERY_CHORD_PROPAGATES = False将恢复到 celery 3.1 之前的行为并始终执行回调。

但这不是推荐的方法,因为您可以在 github 问题#1349中找到

Celery 3.1 定义了如何处理和弦错误,以前的行为从未被记录下来,更多的是一个意外,因为它从来没有打算那样工作。

我们无法更改错误修复版本中的行为,因此必须使用设置来代替,但绝不是有人故意禁用新行为。

新行为是为了防止发生此类问题,并且可能会删除向后兼容的设置。我建议您在这里找到其他处理错误的方法(如果您可以为它发明一个不错的 api,我不介意提出建议)

于 2015-07-16T12:30:35.117 回答
0

你只需要改变方式如何link_error被称为。使用您想要的参数传递签名,而不是字符串引用。

在上面的示例中,您可以按以下方式传递参数

c = chord(
    [raise_exception.s(), good.s(), raise_exception.s()], 
    callback.s().set(link_error=[error_callback.s(<arguments_here>)])
)

请记住,第一个参数将是task_id,进一步的参数将是签名中定义的参数。

于 2019-07-24T13:34:35.140 回答