我想使用参数服务器方法将 Tensorflow 的ElasticAverageOptimizer添加到我的模型训练中。该模型目前使用AdagradOptimizer
.
我正在使用 Tensorflow v1.15 api tf.estimator 来训练和评估模型,构建一个像这样的自定义估计器:
run_config = tf.estimator.RunConfig(
save_summary_steps=train_config.save_summary_steps,
save_checkpoints_steps=train_config.save_checkpoint_steps,
keep_checkpoint_max=keep_checkpoint_max,
)
train_input_fn = self.generate_input_fn(train_config.input_reader, run_config, False)
profiler_hook = tf.train.ProfilerHook(
save_steps=train_config.save_profiler_tracing_steps,
output_dir=os.path.join(job_dir, "tracing"),
show_dataflow=True,
show_memory=True)
train_spec = tf.estimator.TrainSpec({"input_fn": train_input_fn, "hooks": [profiler_hook]})
exporters = []
{..some exporter code...}
eval_input_fn = self.generate_input_fn(eval_config.input_reader, run_config, True)
eval_spec = tf.estimator.EvalSpec(eval_input_fn, steps=eval_config.eval_steps, exporters=exporters)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
model_fn = self.generate_model_fn(optimizer_config=train_config.optimizer)
estimator = tf.estimator.Estimator(model_fn=model_fn,
config=run_config)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
训练使用具有 10 个 PS 节点和约 100 个工作节点的参数服务器方案运行。我想修改我的代码以在我当前generate_model_fn
的优化器周围添加ElasticAverageOptimizer
包装器AdagradOptimizer
。以下是来自的当前相关代码generate_model_fn
:
optimizer = build_optimizer(optimizer_config)
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.compat.v1.train.get_or_create_global_step(),
learning_rate=None, # set by the optimizer
optimizer=optimizer,
gradient_multipliers=gradient_multipliers,
clip_gradients=clip_gradients,
summaries=tf.contrib.layers.OPTIMIZER_SUMMARIES)
return tf.estimator.EstimatorSpec(
mode=tf.estimator.ModeKeys.TRAIN,
predictions=debug_predictions,
loss=loss,
# Do not train on a master of distributed training.
train_op=loss if config.is_chief and config.num_ps_replicas else train_op)
我不确定如何设置ElasticAverageCustomGetter
以ElasticAverageOptimizer
使用我的 cluster_spec。文档中的示例提供了一种为单个工作人员和参数服务器设置自定义 getter 的方法,但我不知道如何将其转换为我的情况。我试过将一个工人传递给自定义吸气剂,但这似乎不起作用。这是代码:
cluster_spec = tf_config['cluster']
worker_device = cluster_spec['worker'][0]
ea_custom_getter = tf.contrib.opt.ElasticAverageCustomGetter(worker_device)
num_worker = len(cluster_spec['worker'])
device_setter = tf.compat.v1.train.replica_device_setter(cluster=cluster_spec, worker_device=worker_device)
with tf.device(device_setter), tf.variable_scope('',custom_getter=ea_custom_getter):
ea_opt = tf.contrib.opt.ElasticAverageOptimizer(
optimizer, num_worker, ea_custom_getter, communication_period=10
)
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.compat.v1.train.get_or_create_global_step(),
learning_rate=None, # set by the optimizer
optimizer=ea_opt,
gradient_multipliers=gradient_multipliers,
clip_gradients=clip_gradients,
summaries=tf.contrib.layers.OPTIMIZER_SUMMARIES)
这导致ValueError: Unknown attribute: 'ixqzvi-worker-0.test.svc' in 'ixqzvi-worker-0.test.svc:2222'
我也尝试将整个传递cluster_spec['worker']
给ElasticAverageCustomGetter
,但这会导致TypeError
for 列表类型。
有什么建议么?谢谢。