0

是否可以使用 externalTask​​Sensor 在另一个时间运行两个 dag?

我有两个 DAG。DAG A 每两小时运行一次

  • 上午 10 点(成功)
  • 上午 12 点 (失败的)
  • 下午 2 点(成功)

Dag B 依赖于 DAG A。DAG B 在凌晨 12 点等待 DAG A 并失败,因为 DAG A 失败了。但是由于 DAG A 在下午 2 点成功,Dag B 应该假设运行。

你怎么能做到这一点?使用 ExternalTask​​Sensor?

我只是有一个小假人,试图理解它。

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.timezone import datetime
from datetime import datetime, timedelta
import airflow 

source_dag = DAG(
    dag_id='sensor_dag_source',
    start_date = datetime(2020, 1, 20),
    schedule_interval='* * * * *'
)

first_task = DummyOperator(task_id='first_task', dag=source_dag)

target_dag = DAG(
    dag_id='sensor_dag_target',
    start_date = datetime(2020, 1, 20),
    schedule_interval='* * * * *'   
)

task_sensor = ExternalTaskSensor(
    dag=target_dag,
    task_id='dag_sensor_source_sensor',
    retries=100,
    retry_delay=timedelta(seconds=30),
    mode='reschedule',
    external_dag_id='sensor_dag_source',
    external_task_id='first_task'
)

first_task = DummyOperator(task_id='first_task', dag=target_dag)

task_sensor >> first_task
4

1 回答 1

1

您可以尝试使用 TriggerDagRunOperator 并从 DAG A 触发 DAG B

这是一个完整的答案 -在气流中,有没有一种好方法可以调用另一个 dag 的任务?

还有另一个关于它的好帖子-

将顶级 DAG 连接在一起

于 2020-05-19T09:11:25.940 回答