我正在编写一个小型 Django 应用程序,我应该能够为每个模型对象创建其定期任务,该任务将以一定的时间间隔执行。我正在为此使用 Celery 应用程序,但我无法理解一件事:
class ProcessQueryTask(PeriodicTask):
run_every = timedelta(minutes=1)
def run(self, query_task_pk, **kwargs):
logging.info('Process celery task for QueryTask %d' %
query_task_pk)
task = QueryTask.objects.get(pk=query_task_pk)
task.exec_task()
return True
然后我做以下事情:
>>> from tasks.tasks import ProcessQueryTask
>>> result1 = ProcessQueryTask.delay(query_task_pk=1)
>>> result2 = ProcessQueryTask.delay(query_task_pk=2)
第一次调用成功,但其他定期调用返回错误 - TypeError: run() 在 celeryd 服务器中恰好采用 2 个非关键字参数(1 个给定)。我可以将自己的参数传递给 PeriodicTaskrun()
吗?