0

我有以下两个文件。一个带有 DAG 和两个任务(DummyOperator 和 TaskGroup)。

# example_dag.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from includes.taskgroup import build_taskgroup



default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id="modularized_dag",
    schedule_interval="@once",
    start_date=days_ago(1),
    default_args=default_args,
) as dag:
    first = DummyOperator(task_id="first_task", dag=dag)
    second = build_taskgroup(dag, "lorem ipsum dolor sit amet")
    first >> second

第二个文件是在第一个文件中创建并返回调用的任务组的方法。

# includes/taskgroup.py
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup


def print_variable(templates_dict: str)
    logging.info(f'input_text: {templates_dict}')


def build_taskgroup(dag: DAG, templates_dict: str) -> TaskGroup:

    with TaskGroup(group_id="xyzzy_taskgroup") as task_group:

        second_task = DummyOperator(task_id="second_task", task_group=task_group, dag=dag)

        third_task = PythonOperator(
            task_id="third_task",
            task_group=task_group,
            python_callable=print_variable,
            op_kwargs={'templates_dict': templates_dict},
            dag=dag,
        )

        second_task >> third_task

    return task_group

我的问题如下:在第一个文件中,我将变量 (input_text) 传递给创建 TaskGroup 的方法,这又将 input_text 传递给 PythonOperator,它只是打印它。我不知道为什么变量没有从 DAG 传递给方法。当我打印它时,我有:

input_text: None

我是否忘记了有关 DAG 生命周期的一些基本信息?是否有另一种方法可以将变量传递给创建任务组的方法?

提前致谢。


更新

当我尝试编写一段代码来复制我的问题(基本问题是私有代码,工作)时,我更改了一个变量的名称,而重命名正是我问题的根源,这就是我放在这里的代码段起作用的原因为 LD Nicolas May。

那是一团糟:

        third_task = PythonOperator(
            #...
            op_kwargs={'templates_dict': templates_dict},
        )

似乎我不能在 op_kwargs 中使用键名,templates_dict因为那是 PythonOperator 参数。

对不起,乱七八糟。

4

2 回答 2

0

对 Jorge 的回答三思而后行,作为一种解决方法,我必须创建一个存储变量的初始运算符。所以我从...

在此处输入图像描述

至: 在此处输入图像描述

代码是:

# example_dag.py
def store_variable(ti):
    ti.xcom_push(key="input_text_id", value="lorem ipsum dolor sit amet")


with DAG(...) as dag:
    zero = PythonOperator(task_id="store_variable", python_callable=store_variable, dag=dag)
    first = ...
    second = build_taskgroup(dag)
    zero >> first >> second

在第二个文件中:

# includes/taskgroup.py
def print_variable(ti):
    input_text = ti.xcom_pull(key='input_text_id')
    logging.info(f'input_text: {input_text}')


def build_taskgroup(dag: DAG) -> TaskGroup:
    with TaskGroup(group_id="taskgroup") as task_group:
        # ...
        third_task = PythonOperator(
            task_id="third_task",
            task_group=task_group,
            python_callable=print_variable,
            dag=dag,
        )

    # ...

这是一个可行的解决方法。我仍然不知道我的问题的根源(将变量传递给 DAG 内的函数)。

于 2021-12-14T18:30:06.983 回答
0

也许您需要使用xcom值在 DAG 之间传递变量。

在你的打印功能中试试这个:

def print_variable(input_text: str, **kwargs)
    logging.info(f'input_text: {input_text}')
    ti = kwargs['ti']
    xcom_value = ti.xcom_pull(task_ids='third_task')

其中 xcom_value 将是您的“input_text”变量。“xcom_pull”是 Composer/Airflow 必须从其他 DAG 或函数中提取变量的功能。此外,您需要为 third_task DAG 提供上下文:

third_task = PythonOperator(
            task_id="third_task",
            task_group=task_group,
            python_callable=print_variable,
            op_kwargs={'input_text': input_text},
            provide_context=True,
            dag=dag,
        )

这样,您就可以将“input_text”变量传递给 print_variable 函数。

于 2021-12-14T14:35:02.593 回答