13

我按照 celery文档在我的开发机器上定义了 2 个队列。

我的芹菜设置:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'fs_feeds',
    },
}

我在项目的 virtualenv 中打开了两个终端窗口,并运行了以下命令:

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker

我得到的是所有任务都由两个队列处理。

我的目标是让一个队列仅处理其中定义的一项任务,CELERY_ROUTES并让默认队列处理所有其他任务。

我也关注了这个SO questionrabbitmqctl list_queuesreturnscelery 0和 running rabbitmqctl list_bindingsreturnexchange celery queue celery []两次。重启rabbit服务器并没有改变任何东西。

4

2 回答 2

26

好的,所以我想通了。以下是我的整个设置、设置以及如何运行 celery,对于那些可能想知道与我的问题相同的事情的人。

设置

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1

# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'feeds',
        'routing_key': 'long_tasks',
    },
}

如何运行芹菜?

终端 - 选项卡 1:

celery -A proj worker -Q default -l debug -n default_worker

这将启动第一个使用默认队列中的任务的工作人员。笔记!-n default_worker对于第一个工作人员来说不是必须的,但如果您有任何其他 celery 实例启动并运行,那么这是必须的。设置-n worker_name与 相同--hostname=default@%h

终端 - 选项卡 2:

celery -A proj worker -Q feeds -l debug -n feeds_worker

这将启动消费者从提要队列中执行任务的第二个工作人员。请注意-n feeds_worker,如果您使用-l debug(log level = debug) 运行,您将看到两个工作人员正在它们之间同步。

终端 - 选项卡 3:

celery -A proj beat -l debug

这将开始节拍,根据您的CELERYBEAT_SCHEDULE. 我不必更改任务或CELERYBEAT_SCHEDULE.

例如,这就是我CELERYBEAT_SCHEDULE寻找应该进入提要队列的任务的方式:

CELERYBEAT_SCHEDULE = {
    ...
    'update_feeds': {
        'task': 'arena.social.tasks.Update',
        'schedule': crontab(minute='*/6'),
    },
    ...
}

如您所见,无需添加'options': {'routing_key': 'long_tasks'}或指定它应该去的队列。另外,如果您想知道为什么Update要大写,那是因为它是一个自定义任务,它被定义为celery.Task.

更新 Celery 5.0+

Celery 自第 5 版以来进行了一些更改,这是用于任务路由的更新设置。

如何创建队列?

Celery 可以自动创建队列。它非常适用于简单的情况,其中 celery 的路由默认值是可以的。

task_create_missing_queues=TrueCELERY_或者,如果您正在使用 django 设置,并且您在键下命名所有 celery 配置, CELERY_TASK_CREATE_MISSING_QUEUES=True. 请注意,默认情况下它是打开的。

自动计划任务路由

配置 celery 应用程序后:

celery_app.conf.beat_schedule = {
  "some_scheduled_task": {
    "task": "module.path.some_task",
    "schedule": crontab(minute="*/10"),
    "options": {"queue": "queue1"}
  }
}

自动任务路由

芹菜应用程序仍然必须先配置,然后:

app.conf.task_routes = {
  "module.path.task2": {"queue": "queue2"},
}

手动分配任务

如果您想动态路由任务,则在发送任务时指定队列:

from module import task

def do_work():
  # do some work and launch the task
  task.apply_async(args=(arg1, arg2), queue="queue3")

可以在此处找到重新路由的更多详细信息: https ://docs.celeryproject.org/en/stable/userguide/routing.html

关于这里的调用任务: https ://docs.celeryproject.org/en/stable/userguide/calling.html

于 2014-04-22T13:59:55.043 回答
2

除了接受的答案之外,如果有人来到这里并且仍然想知道为什么他的设置不起作用(就像我刚才所做的那样),原因如下:芹菜文档没有正确列出设置名称。

对于 celery 5.0.5 设置CELERY_DEFAULT_QUEUECELERY_QUEUES,CELERY_ROUTES应该命名为CELERY_TASK_DEFAULT_QUEUE,CELERY_TASK_QUEUESCELERY_TASK_ROUTES代替。这些是我测试过的设置,但我猜同样的规则也适用于交换和路由密钥。

于 2021-03-04T22:58:16.707 回答