1

我的代码如下所示:

def etl():
    for item in ['FIRST','SECCOND','THIRD']:
        if item == 'a':
            requests = ['Data1','Data3']
        elif item == 'b':
            requests = ['Data1']

        for data_name in requests:
            @task(task_id=f'{item}_{data_name}_task_a')
            def taska():
                a,b = some_func
                vars_dict = {'a': a,
                             'b': b}
                return vars_dict

            @task(task_id=f'{account}_{data_name}_get_liveops_data')
            def taskb(vars_dict):
                some_other_func
                return True

            if data_name=='Data1':
                @task(task_id='last_task')
                def last_task(success):
                    dim_experiments.main()
                    return

            vars_dict = taska()
            success = taskb(vars_dict)
            last_task(success)


myc_dag = etl()

dag 看起来像这样: 在此处输入图像描述

什么时候应该是这样的: 在此处输入图像描述

目标是last_task依赖taskataskb除那个taskataskb那个下载Data3请求。我无法使用它来实现它TaskFlow API

4

1 回答 1

1

发生并行依赖是因为调用last_task()TaskFlow 函数并设置任务依赖(隐式通过 TaskFlow API)是在调用其他任务的同一循环中完成的。TaskFlow 函数的每次调用都会创建一个新的任务节点。如果last_task被拉到循环之外并且只在循环内设置了必要的依赖项,您将获得所需的结构。

让我们以您的代码的简化版本为例。

from datetime import datetime
from airflow.decorators import dag, task


@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
    @task(task_id="last_task")
    def last_task(some_input=None):
        ...

    for item in ["a", "b"]:

        @task
        def taska():
            return {"a": "A", "b": "B"}

        @task
        def taskb(input):
            ...

        success = taskb(taska())
        last_task(success)


myc_dag = etl()

在上面的 DAG 中taska()taskb()、 和last_task()TaskFlow 函数都被调用,并且它们的任务依赖关系设置在循环中。因此,我们看到 2 条并行路径:

在此处输入图像描述

last_task()成为两条路径的共享下游任务,我们需要将调用拉到last_task()(意味着我们只创建一次任务节点)但保持任务之间的依赖关系taskb()不变last_task()。这可以通过对示例的小重构来完成:

@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
    @task(task_id="last_task")
    def last_task(some_input=None):
        ...

    last_task = last_task()

    for item in ["a", "b"]:
        @task
        def taska():
            return {"a": "A", "b": "B"}

        @task
        def taskb(input):
            ...

        success = taskb(taska())
        success >> last_task


myc_dag = etl()

请注意,last_task()TaskFlow 函数是在创建其他任务的循环之外调用的。这确保last_task()任务只创建一次。另一个更改是将last_task()调用设置为变量并使用此变量来声明任务依赖项taskb()(类似于您success在原始代码片段中对变量所做的操作)。通过这些小的更改,我们得到了 2 条具有共享最终任务的路径last_task()

在此处输入图像描述

于 2021-11-01T14:12:36.370 回答