我是气流新手。
我遇到了一个场景,其中父 DAG 需要将一些动态数字(比如说n
)传递给子 DAG。
SubDAG 将使用此数字来动态创建n
并行任务。
气流文档没有涵盖实现这一目标的方法。所以我探索了几种方法:
选项 - 1(使用 xcom 拉取)
我试图作为 xcom 值传递,但由于某种原因,SubDAG 没有解析为传递的值。
父 Dag 文件
def load_dag(**kwargs):
number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs'])
dag_data = json.dumps({
"number_of_runs": number_of_runs
})
return dag_data
# ------------------ Tasks ------------------------------
load_config = PythonOperator(
task_id='load_config',
provide_context=True,
python_callable=load_dag,
dag=dag)
t1 = SubDagOperator(
task_id=CHILD_DAG_NAME,
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ),
default_args=default_args,
dag=dag,
)
子日期文件
def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval=None)
variabe_names = {}
for i in range(num_of_runs):
variabe_names['task' + str(i + 1)] = DummyOperator(
task_id='dummy_task',
dag=dag_subdag,
)
return dag_subdag
选项 - 2
我也尝试将number_of_runs
其作为全局变量传递,但它不起作用。
选项 - 3
我们还尝试将此值写入数据文件。但子 DAG 正在抛出File doesn't exist error
。这可能是因为我们正在动态生成这个文件。
有人可以帮我弄这个吗。