1

我有两个 dags Dag 1 和 Dag 2

任务 A、B 和 C 在 Dag 1 中 任务 D、E 和 F 在 Dag 2 中

如何实现第三个 Dag 来利用任务?

我希望 Dag 3 使用任务 A,然后是任务 E,然后是任务 C,而不重写函数

4

1 回答 1

0

创建一个返回运算符的函数,并从您需要的任何 DAG 中重用它。

例子:

def create_my_opeartor(task_id=None, **kwargs):
    #Replace with your actual operator configuration
    return MyOperator(task_id=task_id, **kwargs) 

def create_my_opeartor_2(task_id=None, **kwargs):
    #Replace with your actual operator configuration
    return BashOperator(task_id=task_id, bash_command='echo "hello world", **kwargs)

with DAG(
    dag_id='DAG1',
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2021, 8, 24),
) as dag1:
    a = create_my_opeartor(task_id='task_A')
    
with DAG(
    dag_id='DAG2',
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2021, 8, 24),
) as dag2:
    e = create_my_opeartor_2(task_id='task_E')
    
with DAG(
    dag_id='DAG3',
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2021, 8, 24),
) as dag3:
    a = create_my_opeartor(task_id='task_A')
    e = create_my_opeartor2(task_id='task_E')
    a >> e
于 2021-08-25T11:12:22.913 回答