所以我在任务组中创建任务,并试图将它们添加到我的任务序列中,但它抛出了这个错误:
Broken DAG: [/Users/abc/projects/abc/airflow_dags/dag.py] Traceback (most recent call last):
File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1234, in set_downstream
self._set_relatives(task_or_task_list, upstream=False)
File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1178, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'NoneType' object has no attribute 'update_relative'
我正在创建我的任务组和这样的任务:
def get_task_group(dag, task_group):
t1 = DummyOperator(task_id='t1', dag=dag, task_group=task_group)
t2 = DummyOperator(task_id='t2', dag=dag, task_group=task_group)
t3 = DummyOperator(task_id='t3', dag=dag, task_group=task_group)
t4 = DummyOperator(task_id='t4', dag=dag, task_group=task_group)
t5 = DummyOperator(task_id='t5', dag=dag, task_group=task_group)
t_list = [t2, t3, t4]
t1.set_downstream(t_list)
t5.set_upstream(t_list)
with DAG('some_dag', default_args=args) as dag:
with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
run_model_task_group = get_task_group(dag, tg)
a1 = DummyOperator(task_id='a1', dag=dag)
a2 = DummyOperator(task_id='a2', dag=dag)
a3 = DummyOperator(task_id='a3', dag=dag)
a4 = DummyOperator(task_id='a4', dag=dag)
a1.set_downstream(a2)
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
a3.set_downstream(a4)
如果我删除任务组并通过删除行将任务组任务从排序中退出
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
我可以看到 a1、a2、a3 和 a4 的顺序正确,并且我可以断开断开的run_model_task_group
任务,但是只要将其添加到顺序中,就会出现上述错误。
谁能指导我这里可能发生的事情?
请注意,我使用函数获取dag
和task_group
参数来创建任务组任务,因为我也想为另一个 dag 创建相同的任务集。
Python Version: 3.8.8
Airflow Version: 2.0.1