我有一个扩展 celerys 的课程Task
。它与旧式 API 一起运行得很好,但我在将其转换为新 API 时遇到了问题。
# In app/tasks.py
from celery import Celery, Task
celery = Celery()
@celery.task
class CustomTask(Task):
def run(self, x):
try:
# do something
except Exception, e:
self.retry(args=[x], exc=e)
然后我像这样运行任务 -
CustomTask().apply_async(args=[x], queue='q1')
我得到了错误 -
TypeError: run() takes exactly 2 arguments (1 given)
这个 SO 答案似乎做了同样的事情,并且它被接受了,所以大概它可以工作。谁能帮助我并向我解释为什么我的代码不起作用?
编辑
如果我将任务命名为与类名不同的名称,则此方法有效-
name = 'app.tasks.CustomTask2'
但是如果我保持任务的名称与完整的类名相同,它就不起作用
name = 'app.tasks.CustomTask'
但是使用不同名称的问题是 celery 有一个额外的任务,与任务类名同名。