0

这篇文章中的代码 -在 Airflow 中动态生成 DAG 会动态生成 DAG。它使用 PythonOperator 来定义一个任务。

是否可以使用 @dag 和 @task 装饰器动态生成 DAG?

这段代码运行并且 DAG 出现在 Airflow UI 中,但我认为 DAG 无论如何都会出现,即使globals()[dag_id] = dag要被删除。

from airflow import DAG
from datetime import datetime
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
}

dag_id = 'sample123454321'


@dag(dag_id=dag_id, default_args=default_args, schedule_interval=None, start_date=days_ago(2), catchup=False,
     tags=['sample'])
def hello_world_dag():
    @task()
    def print_hello_world():
        print("Hello, World!")

    print_hello_world()


dag = hello_world_dag()

globals()[dag_id] = dag

@tags在 Airflow 2.x 中使用和@task装饰器动态生成标签的正确方法是什么?

4

0 回答 0