我正在尝试制作一个动态的工作流程。
我收到了这个损坏的 DAG 错误重复任务 ID
Broken DAG: [/opt/airflow/dags/academi_dag.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 430, in __init__
task_group.add(self)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/task_group.py", line 140, in add
raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
airflow.exceptions.DuplicateTaskIdFound: Task id 'Review.extract__1' has already been added to the DAG
我的代码:
@task
def extract(filename):
some_extract_function
@task
def transform(item :list):
some_transform_function
with TaskGroup('Review') as Review:
data = []
filenames = os.listdir(DATA_PATH)
filtered_filenames = list(filter(lambda x: re.match(r"(^review)", x), filenames))
for filename in filtered_filenames:
extract_review = extract(filename)
data.append(extract_review)
transformed_data_review = transform(data)
当我尝试在TaskGroup
. 如果我删除TaskGroup
它工作正常。
我在这里发现了这个问题https://github.com/apache/airflow/issues/8057。有没有办法解决这个错误?比如动态创建自定义task_id?我知道这可以使用PythonOperator
. 但我正在尝试使用TaskFlow API
来代替。
谢谢