8

我正在使用 Django 和 Celery,并且正在尝试设置路由到多个队列。当我指定任务的routing_keyand exchange(在任务装饰器中或使用apply_async())时,该任务不会添加到代理(即 Kombu 连接到我的 MySQL 数据库)。

如果我在任务装饰器中指定队列名称(这意味着路由键被忽略),任务工作正常。这似乎是路由/交换设置的问题。

知道问题可能是什么吗?

这是设置:

设置.py

INSTALLED_APPS = (
    ...
    'kombu.transport.django',
    'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
    'default': {
        'binding_key':'task.#',
    },
    'i_tasks': {
        'binding_key':'important_task.#',
    },
}

任务.py

from celery.task import task

@task(routing_key='important_task.update')
def my_important_task():
    try:
        ...
    except Exception as exc:
        my_important_task.retry(exc=exc)

启动任务:

from tasks import my_important_task
my_important_task.delay()
4

1 回答 1

48

您正在使用 Django ORM 作为代理,这意味着声明仅存储在内存中(请参阅http://readthedocs.org/docs/kombu/en/latest/introduction.html上的传输比较表,无疑是很难找到的#运输比较

因此,当您使用 routing_key 应用此任务时important_task.update,它将无法路由它,因为它尚未声明队列。

如果您这样做,它将起作用:

@task(queue="i_tasks", routing_key="important_tasks.update")
def important_task():
    print("IMPORTANT")

但是使用自动路由功能会更简单,因为这里没有任何内容表明您需要使用“主题”交换,使用自动路由只需删除设置

  • CELERY_DEFAULT_QUEUE,
  • CELERY_DEFAULT_EXCHANGE,
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

并像这样声明您的任务:

@task(queue="important")
def important_task():
    return "IMPORTANT"

然后启动一个从该队列消费的工作人员:

$ python manage.py celeryd -l info -Q important

或者从默认 ( celery) 队列和important队列中消费:

$ python manage.py celeryd -l info -Q celery,important

另一个好的做法是不要将队列名称硬编码到任务中,CELERY_ROUTES而是使用:

@task
def important_task():
    return "DEFAULT"

然后在您的设置中:

CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}}

如果您仍然坚持使用主题交换,那么您可以添加此路由器以在第一次发送任务时自动声明所有队列:

class PredeclareRouter(object):
    setup = False

    def route_for_task(self, *args, **kwargs):
        if self.setup:
            return
        self.setup = True
        from celery import current_app, VERSION as celery_version
        # will not connect anywhere when using the Django transport
        # because declarations happen in memory.
        with current_app.broker_connection() as conn:
            queues = current_app.amqp.queues
            channel = conn.default_channel
            if celery_version >= (2, 6):
                for queue in queues.itervalues():
                    queue(channel).declare()
            else:
                from kombu.common import entry_to_queue
                for name, opts in queues.iteritems():
                    entry_to_queue(name, **opts)(channel).declare()
CELERY_ROUTES = (PredeclareRouter(), )
于 2012-05-25T14:17:11.837 回答