44

在某些情况下,我想让 celery 任务在该任务中失败。我尝试了以下方法:

from celery.task import task
from celery import states

@task()
def run_simulation():
    if some_condition:
        run_simulation.update_state(state=states.FAILURE)
        return False

但是,该任务仍然报告已成功:

任务sim.tasks.run_simulation[9235e3a7-c6d2-4219-bbc7-acf65c816e65] 1.17847704887s成功:False

似乎只能在任务运行时修改状态并且一旦完成 - 芹菜将状态更改为它认为是结果的任何内容(请参阅this question)。有什么方法可以在不通过引发异常而使任务失败的情况下使 celery 返回任务失败的情况?

4

3 回答 3

37

要将任务标记为失败而不引发异常,请将任务状态更新为FAILURE然后引发Ignore异常,因为返回任何值都会将任务记录为成功,例如:

from celery import Celery, states
from celery.exceptions import Ignore

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task(bind=True)
def run_simulation(self):
    if some_condition:
        # manually update the task state
        self.update_state(
            state = states.FAILURE,
            meta = 'REASON FOR FAILURE'
        )

        # ignore the task so no other state is recorded
        raise Ignore()

但最好的方法是从您的任务中引发异常,您可以创建一个自定义异常来跟踪这些失败:

class TaskFailure(Exception):
   pass

并从您的任务中引发此异常:

if some_condition:
    raise TaskFailure('Failure reason')
于 2015-10-15T08:30:42.030 回答
6

我想进一步扩展 Pierre 的答案,因为我在使用建议的解决方案时遇到了一些问题。

要在将任务的状态更新为 states.FAILURE 时允许自定义字段,还必须模拟 FAILURE 状态将具有的一些属性(注意 exc_type 和 exc_message)虽然解决方案将终止任务,但任何查询状态的尝试(对于例如 - 获取 'REASON FOR FAILURE' 值)将失败。

以下是我从中获取的参考片段: https ://www.distributedpython.com/2018/09/28/celery-task-states/

@app.task(bind=True)
def task(self):
    try:
        raise ValueError('Some error')
    except Exception as ex:
        self.update_state(
            state=states.FAILURE,
            meta={
                'exc_type': type(ex).__name__,
                'exc_message': traceback.format_exc().split('\n')
                'custom': '...'
            })
        raise Ignore()
于 2019-01-21T15:27:37.320 回答
2

我从 Ask Solem 那里得到了一个有趣的回答,他提出了一个 'after_return' 处理程序来解决这个问题。这可能是未来的一个有趣的选择。

与此同时,当我想让它失败时,我通过简单地从任务中返回一个字符串'FAILURE'来解决这个问题,然后按如下方式检查它:

result = AsyncResult(task_id)
if result.state == 'FAILURE' or (result.state == 'SUCCESS' and result.get() == 'FAILURE'):
    # Failure processing task 
于 2011-10-11T07:24:36.903 回答