2

我们的气流实施中有多运营商 dags。假设 dag-a 有运算符 t1、t2、t3,它们设置为按顺序运行(即 t2 依赖于 t1,t3 依赖于 t2。)

 task_2.set_upstream(task_1)
 task_3.set_upstream(task_2)

我们需要确保当 dag-a 被实例化时,它的所有任务在同一个 dag 的另一个实例被实例化之前(或在下一个 dag 实例上的第一个任务被触发之前)成功完成。

我们在 dags 中设置了以下内容:

da['depends_on_past'] = True

现在发生的情况是,如果实例化的 dag 没有任何错误,我们就会看到预期的效果。
但是,假设 dag-a 计划每小时运行一次。按计划触发 dag-a-i1 实例。然后 dag-a-i1 任务 t1 运行成功,然后 t2 开始运行并失败。在那种情况下,我们看到 dag-a-i1 实例按预期停止。当下一个小时到来时,我们看到 dag-a-i2 实例被触发,我们看到该 dag 实例 (i2) 的任务 t1 开始运行并假设完成,然后 dag-a-i2 停止,因为它的 t2 不能运行,因为 t2 的先前实例(对于 dag-a-i1)处于失败状态。

我们需要看到的行为是第二​​个实例没有被触发,或者如果它被触发,我们不希望看到第二个实例的任务 t1 被触发。这给我们带来了问题。

任何帮助表示赞赏。

4

1 回答 1

2

在我开始回答之前,我将设置一个与您在问题中提出的命名约定不同的命名约定。

DagA.TimeA.T1将在 time引用 DAGA执行任务的实例。T1A

继续前进,我在这里看到了两个潜在的解决方案。

首先:

虽然不是特别漂亮,但您可以在 DAG 的开头添加一个传感器任务。这个传感器应该等待同一个 DAG 的最终任务的执行。像下面这样的东西应该可以工作:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from datetime import timedelta

dag = DAG(dag_id="ETL", schedule_interval="@hourly")
ensure_prior_success = ExternalTaskSensor(external_dag_id="ETL", 
external_task_id="final_task", execution_delta=timedelta(hours=1))
final_task = DummyOperator(task_id="final_task", dag=dag)

以这种方式编写,如果任何非传感器任务在DagA.TimeA运行期间失败,DagA.TimeB将开始执行其传感器任务,但最终会超时。

如果您选择以这种方式编写 DAG,您应该注意几件事。

  1. 如果您计划执行此 DAG 的回填(或者,如果您认为可能),您应该将您的 DAG 设置max_active_runs为较低的数字。这样做的原因是,足够大的回填可能会用传感器任务填充全局任务队列,并造成新任务无法排队的情况。

  2. 该 DAG 的第一次运行需要人工干预。人类需要将初始传感器任务标记为成功(因为不存在先前的运行,传感器无法成功完成)。

第二:

我不确定您的任务正在执行什么工作,但举例来说,假设它们涉及对数据库的写入。创建一个操作员,查看您的数据库以获取DagA.TimeA.T3成功完成的证据。

正如我所说,在不知道您的任务在做什么的情况下,很难就该操作员的外观提供具体建议。如果您的用例涉及恒定数量的数据库写入,您可以执行查询来计算目标表中存在的文档数WHERE TIME <= NOW - 1 HOUR

于 2017-10-18T22:04:34.957 回答