我有不同的 Rabbit 队列,每个队列都专用于一种特殊的订单处理:
# tasks.py
@celery.task
def process_order_for_product_x(order_id):
pass # elided ...
@celery.task
def process_order_for_product_y(order_id):
pass # elided ...
# settings.py
CELERY_QUEUES = {
"black_hole": {
"binding_key": "black_hole",
"queue_arguments": {"x-ha-policy": "all"}
},
"product_x": {
"binding_key": "product_x",
"queue_arguments": {"x-ha-policy": "all"}
},
"product_y": {
"binding_key": "product_y",
"queue_arguments": {"x-ha-policy": "all"}
},
CELERY_DEFAULT_QUEUE = 'black_hole'
我们有一个策略,通过设置然后从不使用 from来强制执行显式路由black_hole
。
这些任务中的每一个都可能使用 celery 的画布原语,如下所示:
# tasks.py
@celery.task
def process_order_for_product_x(order_id):
# These can run in parallel
stage_1_group = group(do_something.si(order_id),
do_something_else.si(order_id))
# These can run in parallel
another_group = group(do_something_at_end.si(order_id),
do_something_else_at_end.si(order_id))
# These run in a linear sequence
process_task = chain(
stage_1_group,
do_something_dependent_on_stage_1.si(order_id),
another_group)
process_task.apply_async()
假设我希望 、 、 和其他画布任务的特定用途celery.group
流经celery.chord
其celery.chord_unlock
相应产品的队列,而不是陷入其中black_hole
,有没有办法使用自定义任务名称或自定义 routing_key 调用每个特定的画布任务?
由于我不会进入的原因,我宁愿不将所有celery.*
任务发送到一个包罗万象的celery_canvas
队列,这就是我同时正在做的事情。