6

我的服务器上运行了两个独立的 celeryd 进程,由supervisor. 它们被设置为侦听单独的队列,如下所示:

[program:celeryd1]
command=/path/to/celeryd --pool=solo --queues=queue1
...

[program:celeryd2]
command=/path/to/celeryd --pool=solo --queues=queue2
...

我的 celeryconfig 看起来像这样:

from celery.schedules import crontab

BROKER_URL = "amqp://guest:guest@localhost:5672//"

CELERY_DISABLE_RATE_LIMITS = True
CELERYD_CONCURRENCY = 1
CELERY_IGNORE_RESULT = True

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = {
    'default': {
        "exchange": "default",
        "binding_key": "default",
    },
    'queue1': {
        'exchange': 'queue1',
        'routing_key': 'queue1',
    },
    'queue2': {
        'exchange': 'queue2',
        'routing_key': 'queue2',
    },
}

CELERY_IMPORTS = ('tasks', )

CELERYBEAT_SCHEDULE = {
    'first-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_1'},
        'options': {'queue': 'queue1'},
    },
    'second-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_2'},
        'options': {'queue': 'queue1'},
    },
}

所有tasks.sync任务都必须路由到特定队列(因此​​ celeryd 进度)。但是,当我尝试手动运行任务时,sync.apply_async(kwargs={'client': 'value'}, queue='queue1')两个芹菜工人都会接手任务。如何使任务路由到正确的队列并且仅由绑定到队列的工作人员运行?

4

1 回答 1

7

您只运行一个 celerybeat 实例,对吗?

也许您有与此冲突的旧队列绑定?尝试运行rabbitmqctl list_queuesand rabbitmqctl list_bindings,也许将代理中的数据重置为从头开始。

您在这里的示例应该可以工作,并且在我刚尝试时对我有用。

提示:由于您使用与队列名称相同的 exchange 和 binding_key 值,因此不必在 CELERY_QUEUES 中显式列出它们。当 CELERY_CREATE_MISSING_QUEUES 开启时(默认情况下),如果您只是执行celeryd -Q queue1任务或将任务发送到未定义的队列,则会自动创建队列。

于 2012-04-10T12:50:13.590 回答