我有以下两个文件。一个带有 DAG 和两个任务(DummyOperator 和 TaskGroup)。
# example_dag.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from includes.taskgroup import build_taskgroup
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id="modularized_dag",
schedule_interval="@once",
start_date=days_ago(1),
default_args=default_args,
) as dag:
first = DummyOperator(task_id="first_task", dag=dag)
second = build_taskgroup(dag, "lorem ipsum dolor sit amet")
first >> second
第二个文件是在第一个文件中创建并返回调用的任务组的方法。
# includes/taskgroup.py
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
def print_variable(templates_dict: str)
logging.info(f'input_text: {templates_dict}')
def build_taskgroup(dag: DAG, templates_dict: str) -> TaskGroup:
with TaskGroup(group_id="xyzzy_taskgroup") as task_group:
second_task = DummyOperator(task_id="second_task", task_group=task_group, dag=dag)
third_task = PythonOperator(
task_id="third_task",
task_group=task_group,
python_callable=print_variable,
op_kwargs={'templates_dict': templates_dict},
dag=dag,
)
second_task >> third_task
return task_group
我的问题如下:在第一个文件中,我将变量 (input_text) 传递给创建 TaskGroup 的方法,这又将 input_text 传递给 PythonOperator,它只是打印它。我不知道为什么变量没有从 DAG 传递给方法。当我打印它时,我有:
input_text: None
我是否忘记了有关 DAG 生命周期的一些基本信息?是否有另一种方法可以将变量传递给创建任务组的方法?
提前致谢。
更新
当我尝试编写一段代码来复制我的问题(基本问题是私有代码,工作)时,我更改了一个变量的名称,而重命名正是我问题的根源,这就是我放在这里的代码段起作用的原因为 LD Nicolas May。
那是一团糟:
third_task = PythonOperator(
#...
op_kwargs={'templates_dict': templates_dict},
)
似乎我不能在 op_kwargs 中使用键名,templates_dict
因为那是 PythonOperator 参数。
对不起,乱七八糟。