16

当我将任务路由到特定队列时,它可以工作:

task.apply_async(queue='beetroot')

但是如果我创建一个链:

chain = task | task

然后我写

chain.apply_async(queue='beetroot')

它似乎忽略了 queue 关键字并分配给默认的“celery”队列。

如果 celery 支持链中的路由,那就太好了——所有任务都在同一个队列中按顺序执行。

4

3 回答 3

18

我这样做:

subtask = task.s(*myargs, **mykwargs).set(queue=myqueue)
mychain = celery.chain(subtask, subtask2, ...)
mychain.apply_async()
于 2014-06-10T09:49:57.400 回答
11

好的,我明白了这一点。

您必须将所需的执行选项(例如 queue= 或 countdown= 添加到子任务定义中,或者通过部分添加:

子任务定义:

from celery import subtask

chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot')

部分的:

chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot')

然后通过以下方式执行链:

chain.apply_async()

或者,

chain.delay()

任务将被发送到“甜菜根”队列。最后一个命令中的额外执行参数不会做任何事情。在链(或组,或任何其他 Canvas 原语)级别应用所有这些执行参数会很不错。

于 2013-02-19T13:47:19.643 回答
9

这已经很晚了,但我认为@mpaf 提供的代码并不完全正确。

上下文:就我而言,我有两个子任务,其中第一个提供一个返回值,该返回值作为输入参数传递给第二个。我在执行第二个任务时遇到了麻烦——我在日志中看到 Celery 会承认第二个任务是第一个任务的回调,但它永远不会执行第二个任务。

这是我的非工作链代码-:

from celery import chain

chain(
    module.task1.s(arg),
    module.task2.s()
).apply_async(countdown=0.1, queue='queuename')

使用@mpaf 的答案中提供的语法,我得到了两个任务来执行,但是执行顺序是随意的,第二个子任务没有被确认为第一个子任务的回调。我想到了浏览有关如何在子任务上显式设置队列的文档。

这是工作代码-:

chain(
    module.task1.s(arg).set(queue='queuename'),
    module.task2.s().set(queue='queuename')
).apply_async(countdown=0.1)
于 2016-07-30T13:24:18.517 回答