2

我想创建对 DAG A 和 DAG B 的 dag 依赖项。DAG A 有两个任务: TASK1 和 TASK2 。DAG B 有 3 个任务:TASK1、TASK2 和 TASK3。

我的要求是 DAG B 在 DAG A TASK1 之后开始。

两个 DAGS 都是每小时运行一次,DAG A 运行 @every hours EX: 10.00,DAG B 运行 @every hours ex:10.30。

我正在使用 Airflow 和运算符 EXternalTask​​Sensors 但它不起作用。

external_dag_id='DAG A',
external_task_id='TASK1',
allowed_states=None,
execution_delta=None,
execution_date_fn=None,
4

1 回答 1

2

如果你execution_delta在那里检查,你没有,文档[ https://github.com/apache/incubator-airflow/blob/master/airflow/operators/sensors.py#L194]说:

:param execution_delta: 与上一次执行的时间差来查看,默认是和当前任务相同的execution_date。对于昨天,使用 [positive!] datetime.timedelta(days=1)。execution_delta 或 execution_date_fn 可以传递给 ExternalTask​​Sensor,但不能同时传递。

简短的回答是,由于您在不同的时间运行 DAG A 和 DAG B,您需要放置 execution_delta,否则,它假定您的其他 DAG 同时运行,在这种情况下,没有找到 DAG 运行,您可能会得到一些东西意外。所以尝试类似 datetime.timedelta(minutes=30)

于 2017-09-18T21:16:06.530 回答