我喜欢让多个培训师使用相同的 ExampleGen、Schema 和 Transform 同时运行。下面是我添加额外组件作为 trainer2 evaluator2 和 pusher2 的代码。但是我一直收到以下错误,我不确定如何解决它们。能否请您提前告知和感谢!
错误: RuntimeError:组件类型 tfx.components.trainer.component.Trainer 的重复 component_id Trainer
def create_pipeline(
pipeline_name: Text,
pipeline_root: Text,
data_path: Text,
preprocessing_fn: Text,
run_fn: Text,
run_fn2: Text,
train_args: trainer_pb2.TrainArgs,
train_args2: trainer_pb2.TrainArgs,
eval_args: trainer_pb2.EvalArgs,
eval_args2: trainer_pb2.EvalArgs,
eval_accuracy_threshold: float,
eval_accuracy_threshold2: float,
serving_model_dir: Text,
serving_model_dir2: Text,
metadata_connection_config: Optional[
metadata_store_pb2.ConnectionConfig] = None,
beam_pipeline_args: Optional[List[Text]] = None,
ai_platform_training_args: Optional[Dict[Text, Text]] = None,
ai_platform_serving_args: Optional[Dict[Text, Any]] = None,
) -> pipeline.Pipeline:
"""Implements the custom pipeline with TFX."""
components = []
example_gen = CsvExampleGen(input=external_input(data_path))
components.append(example_gen)
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False)
components.append(schema_gen)
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
preprocessing_fn=preprocessing_fn)
components.append(transform)
trainer_args = {
'run_fn': run_fn,
'transformed_examples': transform.outputs['transformed_examples'],
'schema': schema_gen.outputs['schema'],
'transform_graph': transform.outputs['transform_graph'],
'train_args': train_args,
'eval_args': eval_args,
'custom_executor_spec':
executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
}
trainer = Trainer(**trainer_args)
components.append(trainer)
trainer_args2 = {
'run_fn': run_fn2,
'transformed_examples': transform.outputs['transformed_examples'],
'schema': schema_gen.outputs['schema'],
'transform_graph': transform.outputs['transform_graph'],
'train_args': train_args2,
'eval_args': eval_args2,
'custom_executor_spec':
executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
}
trainer2 = Trainer(**trainer_args2)
components.append(trainer2)
model_resolver = ResolverNode(
instance_name='latest_blessed_model_resolver',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing))
components.append(model_resolver)
model_resolver2 = ResolverNode(
instance_name='latest_blessed_model_resolver2',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing))
components.append(model_resolver2)
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
#baseline_model=model_resolver.outputs['model'],
# Change threshold will be ignored if there is no baseline (first run).
eval_config=eval_config)
components.append(evaluator)
evaluator2 = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer2.outputs['model'],
baseline_model=model_resolver2.outputs['model'],
# Change threshold will be ignored if there is no baseline (first run).
eval_config=eval_config2)
components.append(evaluator2)
pusher_args = {
'model':
trainer.outputs['model'],
'model_blessing':
evaluator.outputs['blessing'],
'push_destination':
pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir)),
}
pusher = Pusher(**pusher_args)
components.append(pusher)
pusher_args2 = {
'model':
trainer2.outputs['model'],
'model_blessing':
evaluator2.outputs['blessing'],
'push_destination':
pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir2)),
}
pusher2 = Pusher(**pusher_args2) # pylint: disable=unused-variable
components.append(pusher2)