0

我有一个参数化的 DAG,我想基于此 DAG 以编程方式创建 DAG 实例。

在传统的气流模型中,我可以使用循环轻松实现这一点:

# Code sample from: https://github.com/astronomer/dynamic-dags-tutorial/blob/main/dags/dynamic-dags-loop.py

def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py)

    return dag

for n in range(1, 4):
    dag_id = 'loop_hello_world_{}'.format(str(n))

    // ...

    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

如何使用 TaskFlow 模型在 AirFlow 2.0 中实现类似的行为?

我试图像下面的代码一样手动扩展@dag 装饰,但它不起作用。动态创建的 DAG 中没有任何任务:

@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['test'])
def flow_test():
    @task()
    def sleep():
        time.sleep(5)

    t1 = sleep()

for n in range(1,3):
    dag_id = 'flow_test_{}'.format(str(n))
    globals()[dag_id] = dag(dag_id=dag_id, tags=['test'])(flow_test)()
4

0 回答 0