2

我正在编写一个小型 Django 应用程序,我应该能够为每个模型对象创建其定期任务,该任务将以一定的时间间隔执行。我正在为此使用 Celery 应用程序,但我无法理解一件事:

class ProcessQueryTask(PeriodicTask):
   run_every = timedelta(minutes=1)

   def run(self, query_task_pk, **kwargs):
       logging.info('Process celery task for QueryTask %d' %
query_task_pk)
       task = QueryTask.objects.get(pk=query_task_pk)
       task.exec_task()
       return True

然后我做以下事情:

>>> from tasks.tasks import ProcessQueryTask
>>> result1 = ProcessQueryTask.delay(query_task_pk=1)
>>> result2 = ProcessQueryTask.delay(query_task_pk=2)

第一次调用成功,但其他定期调用返回错误 - TypeError: run() 在 celeryd 服务器中恰好采用 2 个非关键字参数(1 个给定)。我可以将自己的参数传递给 PeriodicTaskrun()吗?

4

1 回答 1

5

Ask Solem 在回答您celery-users Google group上的问题时回答了这个问题。

周期性任务不使用参数,因此您需要创建多个类或创建一个处理多个“模型”的周期性任务。

例如:

from celery.task import PeriodicTask
from celery.decorators import periodic_task

# base class
class BaseProcessQueryTask(PeriodicTask):
    abstract = True
    run_every = timedelta(minutes=1)
    query_task_pk  = None

    def run(self):
        task = QueryTask.objects.get(pk=self.query_task_pk)
        task.exec_task()

class ProcessQueryTask1(BaseProcessQueryTask):
    query_task_pk = 1

class ProcessQueryTask2(BaseProcessQueryTask):
    query_task_pk = 2

但你更有可能想要这样的东西:

@task(ignore_result=True)
def execute_query_task(task):
    task.exec_task()

@periodic_task(run_every=timedelta(minutes=1))
def process_query_tasks():
    for task in QueryTask.objects.all():
        ExecuteQueryTask.delay(task)
于 2010-12-08T17:16:02.537 回答