我最近开始使用 Apache Airflow 及其新概念 Taskflow API 之一。我有一个包含多个装饰任务的 DAG,其中每个任务都有 50 多行代码。所以我决定将每个任务移到一个单独的文件中。
在引用 stackoverflow 之后,我可以以某种方式将 DAG 中的任务移动到每个任务的单独文件中。现在,我的问题是:
- 下面显示的两个代码示例是否相同?(我担心任务的范围)。
- 他们将如何以黑白方式共享数据?
- 性能上有区别吗?(我读到由于性能问题不鼓励 Subdags,尽管这与 Subdags 无关)。
我在网络(和官方文档)中看到的所有代码示例都将所有任务放在一个文件中。
样品 1
import logging
from airflow.decorators import dag, task
from datetime import datetime
default_args = {"owner": "airflow", "start_date": datetime(2021, 1, 1)}
@dag(default_args=default_args, schedule_interval=None)
def No_Import_Tasks():
# Task 1
@task()
def Task_A():
logging.info(f"Task A: Received param None")
# Some 100 lines of code
return "A"
# Task 2
@task()
def Task_B(a):
logging.info(f"Task B: Received param {a}")
# Some 100 lines of code
return str(a + "B")
a = Task_A()
ab = Task_B(a)
No_Import_Tasks = No_Import_Tasks()
示例 2文件夹结构:
- dags
- tasks
- Task_A.py
- Task_B.py
- Main_DAG.py
文件任务_A.py
import logging
from airflow.decorators import task
@task()
def Task_A():
logging.info(f"Task A: Received param None")
# Some 100 lines of code
return "A"
文件任务_B.py
import logging
from airflow.decorators import task
@task()
def Task_B(a):
logging.info(f"Task B: Received param {a}")
# Some 100 lines of code
return str(a + "B")
文件Main_Dag.py
from airflow.decorators import dag
from datetime import datetime
from tasks.Task_A import Task_A
from tasks.Task_B import Task_B
default_args = {"owner": "airflow", "start_date": datetime(2021, 1, 1)}
@dag(default_args=default_args, schedule_interval=None)
def Import_Tasks():
a = Task_A()
ab = Task_B(a)
Import_Tasks_dag = Import_Tasks()
提前致谢!