45

我正在使用Airflow来安排批处理作业。我有一个每天晚上运行的 DAG (A) 和另一个每月运行一次的 DAG (B)。B 依赖于 A 已成功完成。但是 B 需要很长时间才能运行,因此我想将其保存在单独的 DAG 中,以实现更好的 SLA 报告。

如何使 DAG B 的运行依赖于 DAG A 在同一天的成功运行?

4

3 回答 3

50

您可以使用名为 ExternalTask​​Sensor 的运算符来实现此行为。您在 DAG(B) 中的任务 (B1) 将被安排并等待 DAG(A) 中的任务 (A2) 成功

外部任务传感器文档

于 2016-06-25T12:25:16.610 回答
13

看起来TriggerDagRunOperator也可以使用,您可以使用 python 可调用来添加一些逻辑。如此处所述:https ://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

于 2017-04-17T21:23:28.513 回答
3

当需要跨 DAG 依赖时,往往有两个要求:

  1. B1DAG 上的任务B需要A1在 DAGA上的任务完成后运行。这可以使用ExternalTaskSensor其他人提到的来实现:

    B1 = ExternalTaskSensor(task_id="B1",
                            external_dag_id='A',
                            external_task_id='A1',
                            mode="reschedule")
    
  2. 当用户清除A1DAG 上的任务时A,我们希望 Airflow 清除B1DAG上的任务B以使其重新运行。这可以使用ExternalTaskMarker(自 Airflow v1.10.8 起)来实现。

    A1 = ExternalTaskMarker(task_id="A1", 
                            external_dag_id="B",
                            external_task_id="B1")
    

有关更多详细信息,请参阅有关跨 DAG 依赖项的文档:https ://airflow.apache.org/docs/stable/howto/operator/external.html

于 2020-04-23T00:04:16.490 回答