我有两个 dags Dag 1 和 Dag 2
任务 A、B 和 C 在 Dag 1 中 任务 D、E 和 F 在 Dag 2 中
如何实现第三个 Dag 来利用任务?
我希望 Dag 3 使用任务 A,然后是任务 E,然后是任务 C,而不重写函数
创建一个返回运算符的函数,并从您需要的任何 DAG 中重用它。
例子:
def create_my_opeartor(task_id=None, **kwargs):
#Replace with your actual operator configuration
return MyOperator(task_id=task_id, **kwargs)
def create_my_opeartor_2(task_id=None, **kwargs):
#Replace with your actual operator configuration
return BashOperator(task_id=task_id, bash_command='echo "hello world", **kwargs)
with DAG(
dag_id='DAG1',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 8, 24),
) as dag1:
a = create_my_opeartor(task_id='task_A')
with DAG(
dag_id='DAG2',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 8, 24),
) as dag2:
e = create_my_opeartor_2(task_id='task_E')
with DAG(
dag_id='DAG3',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 8, 24),
) as dag3:
a = create_my_opeartor(task_id='task_A')
e = create_my_opeartor2(task_id='task_E')
a >> e