34

芹菜似乎没有正确处理异常。

如果我有任务:

def errorTest():
    raise Exception()

然后我打电话

r = errorTest.delay()
In [8]: r.result

In [9]: r.state
Out[9]: 'PENDING'

它会像这样无限期地挂起。

去检查日志显示错误在任务中被抛出(如果你想要消息,问),我知道后端和一切都设置正确,因为其他任务正常工作并正确返回结果。

为了在 Celery 中捕获异常,我需要做一些时髦的事情吗?

/Celery 版本是 3.0.13,代理是在我的本地机器上运行的 RabbitMQ

4

4 回答 4

39

如果您在 CELERY_ALWAYS_EAGER 设置为 True 的情况下运行 Celery,请确保在设置中也包含此行:

CELERY_EAGER_PROPAGATES_EXCEPTIONS = True

http://docs.celeryproject.org/en/latest/configuration.html#celery-eager-propagates-exceptions

于 2014-06-16T11:54:22.880 回答
15

您可以在子类中定义一个on_failure函数Task来正确处理它们。如果您只是想了解发生了什么,您可以设置错误电子邮件通知,它将向您发送 celery 配置中的堆栈跟踪。

注意:从 v4 开始,Celery不再支持发送电子邮件

于 2013-02-06T07:39:10.763 回答
2

将使@primalpython 的答案更加明确。

这将失败:

@task
def error():
    raise Exception

输入输出:

In [7]: r = error.delay()

In [8]: print r.state
Out[8]: 'PENDING'

In [9]: print r.result
Out[9]: None

这将成功:

@task
def error():
    raise Exception

    def on_failure(self, *args, **kwargs):
        pass

输入输出:

In [7]: r = error.delay()

In [8]: print r.state
Out[8]: 'FAILURE'

In [9]: print r.result
Out[9]: Exception()
于 2013-02-06T19:35:54.097 回答
1

IMO 最简单的方法是在创建新的 Celery 应用程序时传递对任务类的引用。

在一个模块中定义默认使用的任务类:

from celery.app.task import Task
import logging

logger=logging.getLogger(__name__)

class LoggingTask(Task):
  def on_failure(self, exc, task_id, args, kwargs, einfo):
      kwargs={}
      if logger.isEnabledFor(logging.DEBUG):
         kwargs['exc_info']=exc
      logger.error('Task % failed to execute', task_id, **kwargs)
      super().on_failure(exc, task_id, args, kwargs, einfo)

当您定义您的应用程序时,请引用该模块(注意,它是您提供的字符串引用..):

from celery import Celery

app=Celery('my_project_name', task_cls='task_package.module_name:LoggingTask')

从那时起,如果没有专门提供任务类,则将使用 LoggingTask - 从而允许您影响所有现有任务(使用默认值),而不必修改每个任务。这也意味着您可以正常使用 @shared_task 装饰器。

于 2018-07-25T11:29:44.363 回答