关于“动态任务”的其他问题似乎涉及在计划或设计时动态构建 DAG。我有兴趣在执行期间将任务动态添加到 DAG。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG('test_dag', description='a test',
schedule_interval='0 0 * * *',
start_date=datetime(2018, 1, 1),
catchup=False)
def make_tasks():
du1 = DummyOperator(task_id='dummy1', dag=dag)
du2 = DummyOperator(task_id='dummy2', dag=dag)
du3 = DummyOperator(task_id='dummy3', dag=dag)
du1 >> du2 >> du3
p = PythonOperator(
task_id='python_operator',
dag=dag,
python_callable=make_tasks)
这种幼稚的实现似乎不起作用 - 虚拟任务永远不会出现在 UI 中。
在执行期间向 DAG 添加新运算符的正确方法是什么?可能吗?