3

我有不同的 Rabbit 队列,每个队列都专用于一种特殊的订单处理:

# tasks.py

@celery.task
def process_order_for_product_x(order_id):
    pass  # elided ...


@celery.task
def process_order_for_product_y(order_id):
    pass  # elided ...


# settings.py

CELERY_QUEUES = {
    "black_hole": {
        "binding_key": "black_hole",
        "queue_arguments": {"x-ha-policy": "all"}
    },
    "product_x": {
        "binding_key": "product_x",
        "queue_arguments": {"x-ha-policy": "all"}
    },
    "product_y": {
        "binding_key": "product_y",
        "queue_arguments": {"x-ha-policy": "all"}
    },

CELERY_DEFAULT_QUEUE = 'black_hole'我们有一个策略,通过设置然后从不使用 from来强制执行显式路由black_hole

这些任务中的每一个都可能使用 celery 的画布原语,如下所示:

# tasks.py

@celery.task
def process_order_for_product_x(order_id):
    # These can run in parallel
    stage_1_group = group(do_something.si(order_id),
                          do_something_else.si(order_id))

    # These can run in parallel
    another_group = group(do_something_at_end.si(order_id),
                          do_something_else_at_end.si(order_id))

    # These run in a linear sequence
    process_task = chain(
        stage_1_group,
        do_something_dependent_on_stage_1.si(order_id),
        another_group)

    process_task.apply_async()

假设我希望 、 、 和其他画布任务的特定用途celery.group流经celery.chordcelery.chord_unlock相应产品的队列,而不是陷入其中black_hole,有没有办法使用自定义任务名称或自定义 routing_key 调用每个特定的画布任务?

由于我不会进入的原因,我宁愿不将所有celery.*任务发送到一个包罗万象的celery_canvas队列,这就是我同时正在做的事情。

4

2 回答 2

5

此方法允许您将 Celery 画布任务路由到回调任务的队列。

可以为 Celery 指定一个自定义的基于类的任务路由器,如此所述。

让我们专注于celery.chord_unlock任务。它的签名在这里定义。

def unlock_chord(self, group_id, callback, ...):

第二个位置参数是和弦回调任务的签名。

Celery 中的任务签名基本上是 dicts,因此我们有机会访问任务选项,包括任务队列名称。

这是一个例子:

class CeleryRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'celery.chord_unlock':
            callback_signature = args[1]
            options = callback_signature.get('options')
            if options:
                queue = options.get('queue')
                if queue:
                    return {'queue': queue}

将其添加到 Celery 配置中:

CELERY_ROUTES = (CeleryRouter(), 
于 2015-02-12T22:41:57.097 回答
0

我目前在我的项目中使用 Celery。对于某些情况,我需要通过不同的队列链接任务:

chain(get_staff.s(url), save_staff.s(dt, partner_id, url))()

这两个函数声明如下:

@task(queue='celery_gevent')
def get_staff(source_url):

@task # send to default queue
def save_staff(suggests, dt, partner, url):

顺便说一句,celery_gevent由带有gevent池的工作人员处理以发出 http 请求。

这个例子,你可以如何隐式指定队列。您还可以通过指定其他参数将任务显式放入不同的队列中,如下所示:

In [1]: add.apply_async([4,5])
Out[1]: <AsyncResult: bda3dedd-c2c4-44db-be8e-6a97e718f8b0>

$ sudo rabbitmqctl list_queues
Listing queues ...
celery  1
...done.

In [2]: add.apply_async([4,5], queue='your_product')
Out[2]: <AsyncResult: 934f6161-298b-468b-9716-3da6fae58fa5>

$ sudo rabbitmqctl list_queues
Listing queues ...
celery  1
your_product    1
...done.

您可以在自定义队列中运行整个画布:

process_task.apply_async(queue='your_queue')

尝试在装饰器中指定 queue_name @task。这应该会有所帮助。

链接:

http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

http://docs.celeryproject.org/en/latest/_modules/celery/app/task.html#Task.apply_async

于 2013-06-25T20:01:54.333 回答