我有一个复杂的工作流程,涉及不同的 celery 任务。我设计了一个工作流,其中涉及调用链中间的一个组。这意味着,它可以被认为是一个链,一个和弦,另一个链作为和弦回调。但是,chord 回调只会执行链回调的第一个任务,而不执行下一个任务。如何做到这一点?如果回调是一个组,它确实可以正常工作,但这不符合此处所需的逻辑。
以下示例代码从不接收/执行task_h
并且task_i
(即最后执行的任务是链回调的第一个,而不是整个链(任务 g、h 和 i)。提示?
ctask = chain(
task_a.si(**kwargs),
task_b.si(**kwargs),
task_c.si(**kwargs),
chord(
[
task_d.si(**kwargs),
task_e.si(**kwargs),
task_f.si(**kwargs),
],
chain(
task_g.si(**kwargs),
task_h.si(**kwargs),
task_i.si(**kwargs),
),
),
)
[task.set(queue='high_priority') for task in ctask.tasks]
task = ctask.apply_async()
celery: {extras = ["redis"], version = "^5.0.5"}
broker: rabbitmq
backend: redis