我有一个参数化的 DAG,我想基于此 DAG 以编程方式创建 DAG 实例。
在传统的气流模型中,我可以使用循环轻松实现这一点:
# Code sample from: https://github.com/astronomer/dynamic-dags-tutorial/blob/main/dags/dynamic-dags-loop.py
def create_dag(dag_id,
schedule,
dag_number,
default_args):
def hello_world_py(*args):
print('Hello World')
print('This is DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id,
schedule_interval=schedule,
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py)
return dag
for n in range(1, 4):
dag_id = 'loop_hello_world_{}'.format(str(n))
// ...
globals()[dag_id] = create_dag(dag_id,
schedule,
dag_number,
default_args)
如何使用 TaskFlow 模型在 AirFlow 2.0 中实现类似的行为?
我试图像下面的代码一样手动扩展@dag 装饰,但它不起作用。动态创建的 DAG 中没有任何任务:
@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['test'])
def flow_test():
@task()
def sleep():
time.sleep(5)
t1 = sleep()
for n in range(1,3):
dag_id = 'flow_test_{}'.format(str(n))
globals()[dag_id] = dag(dag_id=dag_id, tags=['test'])(flow_test)()