1
## Section 1 | Import Modules
## Section 2 | DAG Default Arguments
## Section 3 | Instantiate the DAG
## Section 4 | defining Utils
## Section 5 | Task defining
## Section 6 | Defining dependecies
 
 
## Section 1 | Import Modules
from airflow import DAG
from datetime import datetime
from airflow.operators.python_operator import PythonOperator
 
## Section 2 | DAG Default Arguments
default_args = {
    'owner': 'Sourav',
    'depends_on_past': False,
    'start_date': datetime(2021, 6, 11),
    'retries': 0,
}
 
## Section 3 | Instantiate the DAG
dag = DAG('basic_skeleton',
        description='basic skeleton of a DAG',
        default_args=default_args,
        schedule_interval=None,
        catchup=False,
        tags=['skeleton'],
        )
 
x = 0
## Section 4 | defining Utils
def print_context(**kwargs):
    print("hello world")
    return "hello world!!!"
 
def sum(**kwargs):
    c = 1+2
    return c
 
def diff(**kwargs):
    global c
    c = 2-1
    return c
 
## Doubts
x = c
y = dag.get_dagrun(execution_date=dag.get_latest_execution_date()).conf
 
## Section 5 | Task defining
with dag:
    t_printHello_prejob = PythonOperator(
        task_id='t_printHello_prejob',
        provide_context=True,
        python_callable=print_context,
        dag=dag,
    )
 
    t_sum_job = PythonOperator(
        task_id='t_sum_job',
        python_callable=sum,
        provide_context=True,
        dag=dag
    )
 
    ## Section 6 | Defining dependecies
    t_printHello_prejob>>t_sum_job

现在,我需要知道两件事:

  1. x = c,我正在尝试使用这个变量 x 来定义一个 for 循环,用于下一个任务需要拍摄的次数。不知何故,Airflow UI 是从一个基本编译的 .py 文件呈现的,并且 x 加载的值为 0 而不是 1,即使我global c在函数中这样做。有时,airflow UI 会偶然显示 1 的值。我想知道它背后的逻辑。如何控制全局变量?

  2. 对于每个 dagrun,我想conf摆脱气流模板范围并在全局 python 区域 [非气流模板] 中使用它。我了解,我可以在气流模板中使用 jinja 宏。但是,我需要访问气流范围之外的 conf。 y = dag.get_dagrun(execution_date=dag.get_latest_execution_date()).conf 该语句为我提供了最新的 dag_run conf。但是,对我来说,我有多个 DAG_runs 同时运行,所以我可以在这个变量中为那个 dagrun 获取当前的 dag_run conf 吗?

4

1 回答 1

1

Sourav,告诉我这是否有帮助:

在 Airflow DAG 中,我们通常不会在任务之间共享数据,即使这在技术上是可行的。我们鼓励让每个任务保持幂等性,这与函数式编程中的“纯函数”不同。这意味着给定输入x,给定任务将始终创建相同的结果。

您在此处定义的 DAG 基本上是数据管道的蓝图。当 DAG 和任务由 Airflow 调度程序评估时,任务将调用的函数......嗯,还没有调用。直观地说,因此我希望x总是等于零,虽然解开为什么不总是为零是一个有趣的谜团,但在 DAG 运行期间改变全局变量并不是 Airflow 的设置。

也就是说,一种可靠地变异xc跨任务使用它的简单方法是将其存储在 Airflow 变量中:

from airflow.models.variable import Variable
...

Variable.set('x', 0)
...

def sum(**kwargs):
    c = 1+2
    return c
 
def diff(**kwargs):
    c = 2-1
    Variable.set('c', c)
    return c

def a_func_that_uses_c(**kwargs):
    """make sure this function is called in a task _after_ the task calling `diff`"""
    c = Variable.get('c')
    ...

一个问题是 Airflow 变量是字符串,所以如果你存储一个整数,就像这里一样,你需要eval(c)int(c)获取它。

于 2021-12-13T09:10:59.460 回答