是否可以使用 externalTaskSensor 在另一个时间运行两个 dag?
我有两个 DAG。DAG A 每两小时运行一次
- 上午 10 点(成功)
- 上午 12 点 (失败的)
- 下午 2 点(成功)
Dag B 依赖于 DAG A。DAG B 在凌晨 12 点等待 DAG A 并失败,因为 DAG A 失败了。但是由于 DAG A 在下午 2 点成功,Dag B 应该假设运行。
你怎么能做到这一点?使用 ExternalTaskSensor?
我只是有一个小假人,试图理解它。
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.timezone import datetime
from datetime import datetime, timedelta
import airflow
source_dag = DAG(
dag_id='sensor_dag_source',
start_date = datetime(2020, 1, 20),
schedule_interval='* * * * *'
)
first_task = DummyOperator(task_id='first_task', dag=source_dag)
target_dag = DAG(
dag_id='sensor_dag_target',
start_date = datetime(2020, 1, 20),
schedule_interval='* * * * *'
)
task_sensor = ExternalTaskSensor(
dag=target_dag,
task_id='dag_sensor_source_sensor',
retries=100,
retry_delay=timedelta(seconds=30),
mode='reschedule',
external_dag_id='sensor_dag_source',
external_task_id='first_task'
)
first_task = DummyOperator(task_id='first_task', dag=target_dag)
task_sensor >> first_task