2

我有一个扩展 celerys 的课程Task。它与旧式 API 一起运行得很好,但我在将其转换为新 API 时遇到了问题。

# In app/tasks.py
from celery import Celery, Task

celery = Celery()

@celery.task
class CustomTask(Task):

    def run(self, x):
        try:
            # do something
        except Exception, e:
            self.retry(args=[x], exc=e)

然后我像这样运行任务 -

CustomTask().apply_async(args=[x], queue='q1')

我得到了错误 -

TypeError: run() takes exactly 2 arguments (1 given)

这个 SO 答案似乎做了同样的事情,并且它被接受了,所以大概它可以工作。谁能帮助我并向我解释为什么我的代码不起作用?

编辑

如果我将任务命名为与类名不同的名称,则此方法有效-

name = 'app.tasks.CustomTask2'

但是如果我保持任务的名称与完整的类名相同,它就不起作用

name = 'app.tasks.CustomTask'

但是使用不同名称的问题是 celery 有一个额外的任务,与任务类名同名。

4

1 回答 1

2

装饰器不与类一起使用,它用于函数。

通常你不会想要定义自定义任务类,除非你想实现常见的行为。

因此,要么删除@celery.task装饰器,要么将其与函数一起使用。

请注意,您在此处定义的任务未与任何 celery 实例绑定

绑定到任何特定应用实例的注释:

from celery import Task

class MyTask(Task):
    pass

您可以稍后绑定它:

from celery import Celery
app = Celery(broker='amqp://')

MyTask.bind(app)

或者您可以使用应用程序上可用的基类:

from celery import Celery
app = Celery(broker='amqp://')

class MyTask(app.Task):
    pass

最后一个例子不是很干净,因为它意味着您正在模块级别完成应用程序,这是使用带有函数的任务装饰器是最佳实践的另一个原因,并且只创建自定义类用作装饰任务的基类( @task(base=MyTask))。

于 2013-07-24T12:57:15.397 回答