3

我喜欢让多个培训师使用相同的 ExampleGen、Schema 和 Transform 同时运行。下面是我添加额外组件作为 trainer2 evaluator2 和 pusher2 的代码。但是我一直收到以下错误,我不确定如何解决它们。能否请您提前告知和感谢!

错误: RuntimeError:组件类型 tfx.components.trainer.component.Trainer 的重复 component_id Trainer

def create_pipeline(
    pipeline_name: Text,
    pipeline_root: Text,
    data_path: Text,
    preprocessing_fn: Text,
    run_fn: Text,
    run_fn2: Text,
    train_args: trainer_pb2.TrainArgs,
    train_args2: trainer_pb2.TrainArgs,
    eval_args: trainer_pb2.EvalArgs,
    eval_args2: trainer_pb2.EvalArgs,
    eval_accuracy_threshold: float,
    eval_accuracy_threshold2: float,
    serving_model_dir: Text,
    serving_model_dir2: Text,
    metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
    beam_pipeline_args: Optional[List[Text]] = None,
    ai_platform_training_args: Optional[Dict[Text, Text]] = None,
    ai_platform_serving_args: Optional[Dict[Text, Any]] = None,
) -> pipeline.Pipeline:
  """Implements the custom pipeline with TFX."""

  components = []
  example_gen = CsvExampleGen(input=external_input(data_path))
  components.append(example_gen)

schema_gen = SchemaGen(
      statistics=statistics_gen.outputs['statistics'],
      infer_feature_shape=False)
  components.append(schema_gen)

transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      preprocessing_fn=preprocessing_fn)
  components.append(transform)

trainer_args = {
      'run_fn': run_fn,
      'transformed_examples': transform.outputs['transformed_examples'],
      'schema': schema_gen.outputs['schema'],
      'transform_graph': transform.outputs['transform_graph'],
      'train_args': train_args,
      'eval_args': eval_args,
      'custom_executor_spec':
          executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
  }

  trainer = Trainer(**trainer_args)
  components.append(trainer)

  trainer_args2 = {
      'run_fn': run_fn2,
      'transformed_examples': transform.outputs['transformed_examples'],
      'schema': schema_gen.outputs['schema'],
      'transform_graph': transform.outputs['transform_graph'],
      'train_args': train_args2,
      'eval_args': eval_args2,
      'custom_executor_spec':
          executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
  }

  trainer2 = Trainer(**trainer_args2)
  components.append(trainer2)

  model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
  components.append(model_resolver)

  model_resolver2 = ResolverNode(
      instance_name='latest_blessed_model_resolver2',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
  components.append(model_resolver2)



  evaluator = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      #baseline_model=model_resolver.outputs['model'],
      # Change threshold will be ignored if there is no baseline (first run).
      eval_config=eval_config)
  components.append(evaluator)

  evaluator2 = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer2.outputs['model'],
      baseline_model=model_resolver2.outputs['model'],
      # Change threshold will be ignored if there is no baseline (first run).
      eval_config=eval_config2)
  components.append(evaluator2)

  pusher_args = {
      'model':
          trainer.outputs['model'],
      'model_blessing':
          evaluator.outputs['blessing'],
      'push_destination':
          pusher_pb2.PushDestination(
              filesystem=pusher_pb2.PushDestination.Filesystem(
                  base_directory=serving_model_dir)),
  }

  pusher = Pusher(**pusher_args)  
  components.append(pusher)

  pusher_args2 = {
      'model':
          trainer2.outputs['model'],
      'model_blessing':
          evaluator2.outputs['blessing'],
      'push_destination':
          pusher_pb2.PushDestination(
              filesystem=pusher_pb2.PushDestination.Filesystem(
                  base_directory=serving_model_dir2)),
  }

  pusher2 = Pusher(**pusher_args2)  # pylint: disable=unused-variable
  components.append(pusher2)
4

2 回答 2

3

通过在每个管道组件中添加“instance_name”以标识唯一名称,上述问题已得到解决。

于 2020-05-28T08:21:10.520 回答
0

instance_name参数已被弃用。相反,使用with_id().

例子:

model_resolver2 = ResolverNode(
    resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
    model=Channel(type=Model),
    model_blessing=Channel(type=ModelBlessing)
).with_id("latest_blessed_model_resolver2")
    
components.append(model_resolver2)
于 2022-02-14T21:33:58.517 回答