这篇文章中的代码 -在 Airflow 中动态生成 DAG 会动态生成 DAG。它使用 PythonOperator 来定义一个任务。
是否可以使用 @dag 和 @task 装饰器动态生成 DAG?
这段代码运行并且 DAG 出现在 Airflow UI 中,但我认为 DAG 无论如何都会出现,即使globals()[dag_id] = dag
要被删除。
from airflow import DAG
from datetime import datetime
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
dag_id = 'sample123454321'
@dag(dag_id=dag_id, default_args=default_args, schedule_interval=None, start_date=days_ago(2), catchup=False,
tags=['sample'])
def hello_world_dag():
@task()
def print_hello_world():
print("Hello, World!")
print_hello_world()
dag = hello_world_dag()
globals()[dag_id] = dag
@tags
在 Airflow 2.x 中使用和@task
装饰器动态生成标签的正确方法是什么?