0

尝试在 Cloud Composer 上同时完成一些任务:

arr = {}
for i in xrange(3):
    print("i: " + str(i))
    command_formatted = command_template.format(str(i))
    create_training_instance = bash_operator.BashOperator(
        task_id='create_training_instance',
        bash_command=command_formatted)
    arr[i] = create_training_instance
    start_training.set_downstream(arr[i])  

收到以下错误:

损坏的 DAG:[/home/airflow/gcs/dags/scale_simple.py] 依赖,create_training_instance 已经注册

4

2 回答 2

1

对于task_id单个任务,应该始终是唯一的。所以,你可以使用类似create_training_instance_{}.format(i)as 的东西task_id

于 2018-09-04T20:31:17.917 回答
0

You need to parameterize your task id as well, e.g., task_id='create_training_instance' --> 'create_traiing_instance-{}'.format(i)

于 2018-09-04T16:51:18.077 回答