44

默认情况下,Celery 将所有任务发送到“celery”队列,但您可以通过添加额外参数来更改此行为:

@task(queue='celery_periodic')
def recalc_last_hour():
    log.debug('sending new task')
    recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example

调度器设置:

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15  # every 15 sec for test
    },
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"

运行工人:

python manage.py celery worker -c 1 -Q celery_periodic -B -E

该方案没有按预期工作:此工作人员将定期任务发送到“celery”队列,而不是“celery_periodic”。我该如何解决?

PS芹菜==3.0.16

4

3 回答 3

54

周期性任务通过 celery beat 发送到队列中,您可以在其中执行使用 Celery API 执行的所有操作。以下是 celery beat 附带的配置列表:

https://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields

在你的情况下:

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15,  # every 15 sec for test
        'options': {'queue' : 'celery_periodic'},  # options are mapped to apply_async options
    },
}
于 2015-01-09T09:45:44.990 回答
26

我找到了解决这个问题的方法:

1)首先我改变了配置周期性任务的方式。我像这样使用@periodic_task装饰器:

@periodic_task(run_every=crontab(minute='5'),
               queue='celery_periodic',
               options={'queue': 'celery_periodic'})
def recalc_last_hour():
    dt = datetime.utcnow()
    prev_hour = datetime(dt.year, dt.month, dt.day, dt.hour) \
                - timedelta(hours=1)
    log.debug('Generating task for hour %s', str(prev_hour))
    recalc_hour.delay(prev_hour)

2)我在@periodic_task 的参数中写两次celery_periodic

  • 当您从代码(.delay 或 .apply_async)调用任务时,使用queue='celery_periodic'选项

  • options={'queue': 'celery_periodic'}选项在celery beat调用时使用。

我敢肯定,如果您使用 CELERYBEAT_SCHEDULE 变量配置定期任务,同样的事情也是可能的。

UPD。该解决方案适用于CELERYBEAT_SCHEDULER的基于数据库和基于文件的存储。

于 2013-06-13T10:03:01.883 回答
2

如果您使用的是 djcelery 数据库调度程序,您可以在 Execution Options -> queue 字段中指定队列

于 2015-06-26T13:51:22.047 回答