keras-rl "get_gradients" 函数使用通过组合的 actor-critic 模型计算的梯度,但您需要在动作输入中使用critic 输出的梯度来应用反转梯度功能。
我最近使用 keras-rl 在我正在研究的 RDPG 原型上实现了它。仍在测试中,代码可以优化并且肯定不是没有错误的,但是我已经通过修改一些 keras-rl 代码行来使反转渐变工作。为了修改动作输入的评论家输出的梯度,我已经按照原始公式计算演员梯度,在帕特里克·埃米米这篇很棒的帖子的帮助下:http: //pemami4911.github.io/blog /2016/08/21/ddpg-rl.html。
我将整个“编译”函数放在这里,该函数在继承自“DDPAgent”的类中重新定义,其中实现了反转梯度功能。
def compile(self, optimizer, metrics=[]):
metrics += [mean_q]
if type(optimizer) in (list, tuple):
if len(optimizer) != 2:
raise ValueError('More than two optimizers provided. Please only provide a maximum of two optimizers, the first one for the actor and the second one for the critic.')
actor_optimizer, critic_optimizer = optimizer
else:
actor_optimizer = optimizer
critic_optimizer = clone_optimizer(optimizer)
if type(actor_optimizer) is str:
actor_optimizer = optimizers.get(actor_optimizer)
if type(critic_optimizer) is str:
critic_optimizer = optimizers.get(critic_optimizer)
assert actor_optimizer != critic_optimizer
if len(metrics) == 2 and hasattr(metrics[0], '__len__') and hasattr(metrics[1], '__len__'):
actor_metrics, critic_metrics = metrics
else:
actor_metrics = critic_metrics = metrics
def clipped_error(y_true, y_pred):
return K.mean(huber_loss(y_true, y_pred, self.delta_clip), axis=-1)
# Compile target networks. We only use them in feed-forward mode, hence we can pass any
# optimizer and loss since we never use it anyway.
self.target_actor = clone_model(self.actor, self.custom_model_objects)
self.target_actor.compile(optimizer='sgd', loss='mse')
self.target_critic = clone_model(self.critic, self.custom_model_objects)
self.target_critic.compile(optimizer='sgd', loss='mse')
# We also compile the actor. We never optimize the actor using Keras but instead compute
# the policy gradient ourselves. However, we need the actor in feed-forward mode, hence
# we also compile it with any optimzer and
self.actor.compile(optimizer='sgd', loss='mse')
# Compile the critic.
if self.target_model_update < 1.:
# We use the `AdditionalUpdatesOptimizer` to efficiently soft-update the target model.
critic_updates = get_soft_target_model_updates(self.target_critic, self.critic, self.target_model_update)
critic_optimizer = AdditionalUpdatesOptimizer(critic_optimizer, critic_updates)
self.critic.compile(optimizer=critic_optimizer, loss=clipped_error, metrics=critic_metrics)
clipnorm = getattr(actor_optimizer, 'clipnorm', 0.)
clipvalue = getattr(actor_optimizer, 'clipvalue', 0.)
critic_gradients_wrt_action_input = tf.gradients(self.critic.output, self.critic_action_input)
critic_gradients_wrt_action_input = [g / float(self.batch_size) for g in critic_gradients_wrt_action_input] # since TF sums over the batch
action_bounds = [(-1.,1.) for i in range(self.nb_actions)]
def calculate_inverted_gradient():
"""
Applies "inverting gradient" feature to the action-value gradients.
"""
gradient_wrt_action = -critic_gradients_wrt_action_input[0]
inverted_gradients = []
for n in range(self.batch_size):
inverted_gradient = []
for i in range(gradient_wrt_action[n].shape[0].value):
action = self.critic_action_input[n][i]
is_gradient_negative = K.less(gradient_wrt_action[n][i], K.constant(0, dtype='float32'))
adjust_for_upper_bound = gradient_wrt_action[n][i] * ((action_bounds[i][1] - action) / (action_bounds[i][1] - action_bounds[i][0]))
adjust_for_lower_bound = gradient_wrt_action[n][i] * ((action - action_bounds[i][0]) / (action_bounds[i][1] - action_bounds[i][0]))
modified_gradient = K.switch(is_gradient_negative, adjust_for_upper_bound, adjust_for_lower_bound)
inverted_gradient.append( modified_gradient )
inverted_gradients.append(inverted_gradient)
gradient_wrt_action = tf.stack(inverted_gradients)
return gradient_wrt_action
actor_gradients_wrt_weights = tf.gradients(self.actor.output, self.actor.trainable_weights, grad_ys=calculate_inverted_gradient())
actor_gradients_wrt_weights = [g / float(self.batch_size) for g in actor_gradients_wrt_weights] # since TF sums over the batch
def get_gradients(loss, params):
""" Used by the actor optimizer.
Returns the gradients to train the actor.
These gradients are obtained by multiplying the gradients of the actor output w.r.t. its weights
with the gradients of the critic output w.r.t. its action input. """
# Aplly clipping if defined
modified_grads = [g for g in actor_gradients_wrt_weights]
if clipnorm > 0.:
norm = K.sqrt(sum([K.sum(K.square(g)) for g in modified_grads]))
modified_grads = [optimizers.clip_norm(g, clipnorm, norm) for g in modified_grads]
if clipvalue > 0.:
modified_grads = [K.clip(g, -clipvalue, clipvalue) for g in modified_grads]
return modified_grads
actor_optimizer.get_gradients = get_gradients
# get_updates is the optimizer function that changes the weights of the network
updates = actor_optimizer.get_updates(self.actor.trainable_weights, self.actor.constraints, None)
if self.target_model_update < 1.:
# Include soft target model updates.
updates += get_soft_target_model_updates(self.target_actor, self.actor, self.target_model_update)
updates += self.actor.updates # include other updates of the actor, e.g. for BN
# Finally, combine it all into a callable function.
# The inputs will be all the necessary placeholders to compute the gradients (actor and critic inputs)
inputs = self.actor.inputs[:] + [self.critic_action_input, self.critic_history_input]
self.actor_train_fn = K.function(inputs, [self.actor.output], updates=updates)
self.actor_optimizer = actor_optimizer
self.compiled = True
训练 actor 时,您现在应该传递 3 个输入而不是 2 个:观察输入 + 动作输入(来自 actor 网络的预测),因此您还必须修改“后向”函数。就我而言:
...
if self.episode > self.nb_steps_warmup_actor:
action = self.actor.predict_on_batch(history_batch)
inputs = [history_batch, action, history_batch]
actor_train_result = self.actor_train_fn(inputs)
action_values = actor_train_result[0]
assert action_values.shape == (self.batch_size, self.nb_actions)
...
之后,您可以让您的演员在输出中具有线性激活。