4

我正在尝试制作一个动态的工作流程。

我收到了这个损坏的 DAG 错误重复任务 ID

Broken DAG: [/opt/airflow/dags/academi_dag.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 430, in __init__
    task_group.add(self)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/task_group.py", line 140, in add
    raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
airflow.exceptions.DuplicateTaskIdFound: Task id 'Review.extract__1' has already been added to the DAG

我的代码:

@task
def extract(filename):
    some_extract_function

@task
def transform(item :list):
    some_transform_function

with TaskGroup('Review') as Review:
    data = []
    filenames = os.listdir(DATA_PATH)
    filtered_filenames = list(filter(lambda x: re.match(r"(^review)", x), filenames))
    for filename in filtered_filenames:
        extract_review = extract(filename)
        data.append(extract_review)
    transformed_data_review = transform(data)

当我尝试在TaskGroup. 如果我删除TaskGroup它工作正常。

我在这里发现了这个问题https://github.com/apache/airflow/issues/8057。有没有办法解决这个错误?比如动态创建自定义task_id?我知道这可以使用PythonOperator. 但我正在尝试使用TaskFlow API来代替。

谢谢

4

1 回答 1

1

修复感谢此视频here

所以我通过在 TaskGroup 中动态创建 TaskGroup 来解决这个问题。

这是代码

with TaskGroup('Review') as Review:
    data = []
    filenames = os.listdir(DATA_PATH)
    filtered_filenames = list(filter(lambda x: re.match(r"(^review)", x), filenames))
    for filename in filtered_filenames:
        with TaskGroup(filename):
            extract_review = extract(filename)
            data.append(extract_review)
    transformed_data_review = transform(data)
于 2021-05-26T00:05:18.493 回答