0

我尝试使用任务流 API 创建多个 dag,其中传递了一个变量,dag 中的任务可以使用该变量

例如,我正在尝试使用此代码

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule_interval=None, start_date=datetime(2021, 1, 1))
def dag_template(input_var):

    @task
    def printer_task(x):
        print(x)

    output_input_var = printer_task(input_var)

dag_1 = dag_template("string1")
dag_2 = dag_template(6)

理想情况下,这将创建两个 ID 为 dag_1 和 dag_2 的 dag。一个 dag 将打印字符串“string1”,其他 6 个。这几乎适用于创建 1 个 dag 且 ID 为 dag_template 打印 6 的代码。

文档中有 dag 将被称为 python 可调用,是否可以覆盖它。

4

1 回答 1

0

我不觉得它是一个非常优雅的解决方案,但它确实可以满足我的需求。

from airflow.decorators import dag, task
from datetime import datetime

config = [("dag_1", "string1"), ("dag_2", 6)]

for dag_name, dag_input in config:

    @dag(dag_id = dag_name ,schedule_interval=None, start_date=datetime(2021, 1, 1))
    def dag_template(input_var):
        @task
        def printer_task(x):
            print(x)

        output_input_var = printer_task(input_var)

    globals()[dag_name] = dag_template(dag_input)
于 2021-12-14T03:45:50.707 回答