我们的气流实施中有多运营商 dags。假设 dag-a 有运算符 t1、t2、t3,它们设置为按顺序运行(即 t2 依赖于 t1,t3 依赖于 t2。)
task_2.set_upstream(task_1)
task_3.set_upstream(task_2)
我们需要确保当 dag-a 被实例化时,它的所有任务在同一个 dag 的另一个实例被实例化之前(或在下一个 dag 实例上的第一个任务被触发之前)成功完成。
我们在 dags 中设置了以下内容:
da['depends_on_past'] = True
现在发生的情况是,如果实例化的 dag 没有任何错误,我们就会看到预期的效果。
但是,假设 dag-a 计划每小时运行一次。按计划触发 dag-a-i1 实例。然后 dag-a-i1 任务 t1 运行成功,然后 t2 开始运行并失败。在那种情况下,我们看到 dag-a-i1 实例按预期停止。当下一个小时到来时,我们看到 dag-a-i2 实例被触发,我们看到该 dag 实例 (i2) 的任务 t1 开始运行并假设完成,然后 dag-a-i2 停止,因为它的 t2 不能运行,因为 t2 的先前实例(对于 dag-a-i1)处于失败状态。
我们需要看到的行为是第二个实例没有被触发,或者如果它被触发,我们不希望看到第二个实例的任务 t1 被触发。这给我们带来了问题。
任何帮助表示赞赏。