1

chord在我的定期任务中运行以下内容:

(group(task_A1, task_A2, ... , task_An) | task_B)

每个任务 ( task_A1, task_A2... , task_An) 可能需要大约 5-10 分钟才能运行。

考虑以下场景:在周期性任务(每小时)期间,task_A1..task_An并行运行。现在,有人从另一个流程触发了外部task_A1'任务。我的目标是仅停止和替换当前的运行task_A1task_A1'保持(不停止task_A2.. task_An)流原样 - 这是等待(task_A2, ... ,task_An新的task_A1')组完成并启动task_B.

我目前的解决方案是撤销task_A1&task_B并将新任务task_A1'&添加task_B'到队列中 - 但我想知道如何与已经运行的创建这种新组合task_A2, ... , task_An,以实现类似:

(group(task_A1', task_A2, ... , task_An) | task_B')

我通过撤销任务并在 CeleryRouter 中“重新连接”它们来做到这一点。我正在从celeryapp.control.inspect().active()和中提取任务celeryapp.control.inspect().reserved()。我想知道是否有优雅的方式来实现我的目标。

谢谢。

4

0 回答 0