13

有没有办法在气流中使用depends_on_past整个 DagRun,而不仅仅是应用于任务?

我有一个每日 DAG,周五 DagRun 在第 4 个任务上出错,但周六和周日 DagRuns 仍然按计划运行。使用depends_on_past = True会在相同的第 4 个任务上暂停 DagRun,但前 3 个任务仍会运行。

我可以在 DagRun DB 表中state看到包含failedFriday DagRun 的一列。我想要的是一种将 DagRun 配置为在先前的 DagRun 失败时不启动的方法,在找到以前失败的任务之前不启动并运行。

有谁知道这是否可能?

4

3 回答 3

14

在您的第一个任务中,设置depends_on_past=Truewait_for_downstream=True,组合将导致当前 dag-run 仅在最后一次运行成功时运行。

因为通过在当前 dag-run 设置第一个任务将等待前一个 (depends_on_past) 和所有任务 (wait_for_downstream) 成功

于 2018-04-16T08:32:16.107 回答
7

这个问题有点老了,但事实证明它是第一个谷歌搜索结果,而评分最高的答案显然是误导性的(这让我有点挣扎),所以它肯定需要一个正确的答案。虽然第二个评分的答案应该有效,但有一种更清洁的方法可以做到这一点,我个人觉得使用 xcom 很难看。

Airflow 有一个特殊的操作符类,用于监控来自其他 dag 运行或整个其他 dag 的任务状态。所以我们需要做的是在 dag 中所有任务之前添加一个任务,检查之前的运行是否成功。

from airflow.sensors.external_task_sensor import ExternalTaskSensor


previous_dag_run_sensor = ExternalTaskSensor(
    task_id = 'previous_dag_run_sensor',
    dag = our_dag,
    external_dag_id = our_dag.dag_id,
    execution_delta = our_dag.schedule_interval
)

previous_dag_run_sensor.set_downstream(vertices_of_indegree_zero_from_our_dag)
于 2020-05-06T20:02:45.743 回答
5

一种可能的解决方案是使用xcom

  1. 将 2 个 PythonOperatorsstart_task和添加end_task到 DAG。
  2. 使所有其他任务依赖于start_task
  3. 使end_task依赖于所有其他任务 ( set_upstream)。
  4. end_task将始终将变量推last_success = context['execution_date']送到 xcom ( xcom_push)。(provide_context = True在 PythonOperators 中需要)。
  5. 并且start_task将始终检查 xcom ( xcom_pull) 以查看是否存在last_success值等于前一个 DagRun 的 execution_date 或 DAG 的 start_date 的变量(让进程启动)。

xcom 的使用示例:
https ://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py

于 2017-11-28T21:45:57.900 回答