我正在使用Airflow来安排批处理作业。我有一个每天晚上运行的 DAG (A) 和另一个每月运行一次的 DAG (B)。B 依赖于 A 已成功完成。但是 B 需要很长时间才能运行,因此我想将其保存在单独的 DAG 中,以实现更好的 SLA 报告。
如何使 DAG B 的运行依赖于 DAG A 在同一天的成功运行?
您可以使用名为 ExternalTaskSensor 的运算符来实现此行为。您在 DAG(B) 中的任务 (B1) 将被安排并等待 DAG(A) 中的任务 (A2) 成功
看起来TriggerDagRunOperator也可以使用,您可以使用 python 可调用来添加一些逻辑。如此处所述:https ://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
当需要跨 DAG 依赖时,往往有两个要求:
B1
DAG 上的任务B
需要A1
在 DAGA
上的任务完成后运行。这可以使用ExternalTaskSensor
其他人提到的来实现:
B1 = ExternalTaskSensor(task_id="B1",
external_dag_id='A',
external_task_id='A1',
mode="reschedule")
当用户清除A1
DAG 上的任务时A
,我们希望 Airflow 清除B1
DAG上的任务B
以使其重新运行。这可以使用ExternalTaskMarker
(自 Airflow v1.10.8 起)来实现。
A1 = ExternalTaskMarker(task_id="A1",
external_dag_id="B",
external_task_id="B1")
有关更多详细信息,请参阅有关跨 DAG 依赖项的文档:https ://airflow.apache.org/docs/stable/howto/operator/external.html