我chord
在我的定期任务中运行以下内容:
(group(task_A1, task_A2, ... , task_An) | task_B)
每个任务 ( task_A1
, task_A2
... , task_An
) 可能需要大约 5-10 分钟才能运行。
考虑以下场景:在周期性任务(每小时)期间,task_A1
..task_An
并行运行。现在,有人从另一个流程触发了外部task_A1'
任务。我的目标是仅停止和替换当前的运行task_A1
并task_A1'
保持(不停止task_A2
.. task_An
)流原样 - 这是等待(task_A2, ... ,task_An
新的task_A1'
)组完成并启动task_B
.
我目前的解决方案是撤销task_A1
&task_B
并将新任务task_A1'
&添加task_B'
到队列中 - 但我想知道如何与已经运行的创建这种新组合task_A2, ... , task_An
,以实现类似:
(group(task_A1', task_A2, ... , task_An) | task_B')
我通过撤销任务并在 CeleryRouter 中“重新连接”它们来做到这一点。我正在从celeryapp.control.inspect().active()
和中提取任务celeryapp.control.inspect().reserved()
。我想知道是否有优雅的方式来实现我的目标。
谢谢。