我找到了以下链接:
https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
这确实解释了如何使用TriggerDagRunOperator
来执行单独的 Airflow dag。该文档使用 Airflow 自己的示例 dag,但我很难理解它们,因为它们没有使用任何传感器。
有人可以解释我如何使用TriggerDagRunOperator
and开始单独的 dagSqlSensor
吗?当我的 SQL Server 作业任务完成时,我正在尝试启动单独的 DAG。我知道如何使用 来检查 SQL Server 作业的状态SqlSensor
,但我不知道如何将结果附加到TriggerDagRunOperator
以启动单独的 DAG。
我不想使用 Airflow CLI 或在一个 DAG 中完成这两项任务。基本上,我希望这只是触发 dag。
以下是我当前的代码,缺少关键的conditionally_trigger
# File Name: check-when-db1-sql-task-is-done
from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators import SqlSensor
from datetime import datetime
default_args = {
'owner': 'airflow',
'retry_delay': timedelta(minutes=5),
}
dag = DAG('check-when-db1-sql-task-is-done',
description='Check-when-DB1-SQL-task-is-done',
default_args=default_args,
schedule_interval='@once',
start_date=datetime.now(),
)
# returns-0-or-1-based-on-job-task-status
sqlsensor = SqlSensor (
task_id='sql-sensor',
poke_interval=30,
timeout=3200,
sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date = (select convert(varchar(24),getdate(),112)); """,
mssql_conn_id='db1',
dag=dag,
)
# dag-to-start
trigger = TriggerDagRunOperator (
task_id='start-ssh-job',
trigger_dag_id="qa-knime-ssh-task",
python_callable=conditionally_trigger,
params={'condition_param': True,
'message': 'Hello World'},
dag=dag)