我一直在与 RL 代理合作,在 openai 健身房解决出租车问题。
我从 keras-rl 中选择了 DQNAgent,并按照此处的示例进行操作:
https://tiewkh.github.io/blog/deepqlearning-openaiitaxi/
import gym
from gym import wrappers, logger
import numpy as np
import pickle
import json, sys, os
from os import path
from taxi import TaxiEnv
from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten, Embedding, Reshape
from keras.optimizers import Adam
action_size = env.nA
model = Sequential()
model.add(Embedding(500, 10, input_length=1))
model.add(Reshape((10,)))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(action_size, activation='linear'))
print(model.summary())
from rl.agents.dqn import DQNAgent
from rl.policy import EpsGreedyQPolicy
from rl.memory import SequentialMemory
memory = SequentialMemory(limit=50000, window_length=1)
policy = EpsGreedyQPolicy()
dqn_only_embedding = DQNAgent(model=model, nb_actions=action_size, memory=memory, nb_steps_warmup=500, target_model_update=1e-2, policy=policy)
dqn_only_embedding.compile(Adam(lr=1e-3), metrics=['mae'])
#: I'm actually running this in jupyter so I use debug magic
%debug
dqn_only_embedding.fit(env, nb_steps=1000000, visualize=False, verbose=1, nb_max_episode_steps=99, log_interval=100000)
它似乎适用于一批的大部分。
Training for 1000000 steps ...
Interval 1 (0 steps performed)
489/100000 [..............................] - ETA: 4:42 - reward: -1.3497
然后我收到此错误消息:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-47-e017660bd2c2> in <module>
1 get_ipython().run_line_magic('debug', '')
----> 2 dqn_only_embedding.fit(env, nb_steps=1000000, visualize=False, verbose=1, nb_max_episode_steps=99, log_interval=100000)
~Projects/RL/rl/core.py in fit(self, env, nb_steps, action_repetition, callbacks, verbose, visualize, nb_max_start_steps, start_step_policy, log_interval, nb_max_episode_steps)
202 # Force a terminal state.
203 done = True
--> 204 metrics = self.backward(reward, terminal=done)
205 episode_reward += reward
206
~Projects/RL/rl/agents/dqn.py in backward(self, reward, terminal)
325 # pdb.set_trace()
326 ins = [state0_batch] if type(self.model.input) is not list else state0_batch
--> 327 metrics = self.trainable_model.train_on_batch(ins + [targets, masks], [dummy_targets, targets])
328 metrics = [metric for idx, metric in enumerate(metrics) if idx not in (1, 2)] # throw away individual losses
329 metrics += self.policy.metrics
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py in train_on_batch(self, x, y, sample_weight, class_weight, reset_metrics, return_dict)
1725 class_weight)
1726 self.train_function = self.make_train_function()
-> 1727 logs = self.train_function(iterator)
1728
1729 if reset_metrics:
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in __call__(self, *args, **kwds)
826 tracing_count = self.experimental_get_tracing_count()
827 with trace.Trace(self._name) as tm:
--> 828 result = self._call(*args, **kwds)
829 compiler = "xla" if self._experimental_compile else "nonXla"
830 new_tracing_count = self.experimental_get_tracing_count()
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in _call(self, *args, **kwds)
853 # In this case we have created variables on the first call, so we run the
854 # defunned version which is guaranteed to never create variables.
--> 855 return self._stateless_fn(*args, **kwds) # pylint: disable=not-callable
856 elif self._stateful_fn is not None:
857 # Release the lock early so that multiple threads can perform the call
TypeError: 'NoneType' object is not callable
如果我在没有调试魔法的情况下运行,我会收到一条略有不同的错误消息:
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
<ipython-input-28-d9e25a0a5e69> in <module>
1 # %debug
----> 2 dqn_only_embedding.fit(env, nb_steps=1000000, visualize=False, verbose=1, nb_max_episode_steps=99, log_interval=100000)
~Projects/RL/rl/core.py in fit(self, env, nb_steps, action_repetition, callbacks, verbose, visualize, nb_max_start_steps, start_step_policy, log_interval, nb_max_episode_steps)
202 # Force a terminal state.
203 done = True
--> 204 metrics = self.backward(reward, terminal=done)
205 episode_reward += reward
206
~Projects/RL/rl/agents/dqn.py in backward(self, reward, terminal)
325 # pdb.set_trace()
326 ins = [state0_batch] if type(self.model.input) is not list else state0_batch
--> 327 metrics = self.trainable_model.train_on_batch(ins + [targets, masks], [dummy_targets, targets])
328 metrics = [metric for idx, metric in enumerate(metrics) if idx not in (1, 2)] # throw away individual losses
329 metrics += self.policy.metrics
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py in train_on_batch(self, x, y, sample_weight, class_weight, reset_metrics, return_dict)
1725 class_weight)
1726 self.train_function = self.make_train_function()
-> 1727 logs = self.train_function(iterator)
1728
1729 if reset_metrics:
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in __call__(self, *args, **kwds)
826 tracing_count = self.experimental_get_tracing_count()
827 with trace.Trace(self._name) as tm:
--> 828 result = self._call(*args, **kwds)
829 compiler = "xla" if self._experimental_compile else "nonXla"
830 new_tracing_count = self.experimental_get_tracing_count()
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in _call(self, *args, **kwds)
869 # This is the first call of __call__, so we have to initialize.
870 initializers = []
--> 871 self._initialize(args, kwds, add_initializers_to=initializers)
872 finally:
873 # At this point we know that the initialization is complete (or less
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in _initialize(self, args, kwds, add_initializers_to)
723 self._graph_deleter = FunctionDeleter(self._lifted_initializer_graph)
724 self._concrete_stateful_fn = (
--> 725 self._stateful_fn._get_concrete_function_internal_garbage_collected( # pylint: disable=protected-access
726 *args, **kwds))
727
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _get_concrete_function_internal_garbage_collected(self, *args, **kwargs)
2967 args, kwargs = None, None
2968 with self._lock:
-> 2969 graph_function, _ = self._maybe_define_function(args, kwargs)
2970 return graph_function
2971
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _maybe_define_function(self, args, kwargs)
3359
3360 self._function_cache.missed.add(call_context_key)
-> 3361 graph_function = self._create_graph_function(args, kwargs)
3362 self._function_cache.primary[cache_key] = graph_function
3363
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _create_graph_function(self, args, kwargs, override_flat_arg_shapes)
3194 arg_names = base_arg_names + missing_arg_names
3195 graph_function = ConcreteFunction(
-> 3196 func_graph_module.func_graph_from_py_func(
3197 self._name,
3198 self._python_function,
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/framework/func_graph.py in func_graph_from_py_func(name, python_func, args, kwargs, signature, func_graph, autograph, autograph_options, add_control_dependencies, arg_names, op_return_value, collections, capture_by_value, override_flat_arg_shapes)
988 _, original_func = tf_decorator.unwrap(python_func)
989
--> 990 func_outputs = python_func(*func_args, **func_kwargs)
991
992 # invariant: `func_outputs` contains only Tensors, CompositeTensors,
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in wrapped_fn(*args, **kwds)
632 xla_context.Exit()
633 else:
--> 634 out = weak_wrapped_fn().__wrapped__(*args, **kwds)
635 return out
636
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/framework/func_graph.py in wrapper(*args, **kwargs)
975 except Exception as e: # pylint:disable=broad-except
976 if hasattr(e, "ag_error_metadata"):
--> 977 raise e.ag_error_metadata.to_exception(e)
978 else:
979 raise
NotImplementedError: in user code:
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:805 train_function *
return step_function(self, iterator)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:795 step_function **
outputs = model.distribute_strategy.run(run_step, args=(data,))
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:1259 run
return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:2730 call_for_each_replica
return self._call_for_each_replica(fn, args, kwargs)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:3417 _call_for_each_replica
return fn(*args, **kwargs)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:788 run_step **
outputs = model.train_step(data)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:757 train_step
self.optimizer.minimize(loss, self.trainable_variables, tape=tape)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:498 minimize
return self.apply_gradients(grads_and_vars, name=name)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:631 apply_gradients
return distribute_ctx.get_replica_context().merge_call(
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:2941 merge_call
return self._merge_call(merge_fn, args, kwargs)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:2948 _merge_call
return merge_fn(self._strategy, *args, **kwargs)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:682 _distributed_apply **
update_ops.extend(distribution.extended.update(
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:2494 update
return self._update(var, fn, args, kwargs, group)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:3431 _update
return self._update_non_slot(var, fn, (var,) + tuple(args), kwargs, group)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:3437 _update_non_slot
result = fn(*args, **kwargs)
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:653 apply_grad_to_update_var **
return self._resource_apply_sparse_duplicate_indices(
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:1214 _resource_apply_sparse_duplicate_indices
return self._resource_apply_sparse(summed_grad, handle, unique_indices,
/anaconda3/envs/openai_env/lib/python3.8/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:1236 _resource_apply_sparse
raise NotImplementedError("Must be implemented in subclasses.")
NotImplementedError: Must be implemented in subclasses.
在我的 DQNAgent 中,向后的方法被覆盖了,所以我不确定为什么我会看到这个。
class DQNAgent(AbstractDQNAgent):
"""
# Arguments
model__: A Keras model.
policy__: A Keras-rl policy that are defined in [policy](https://github.com/keras-rl/keras-rl/blob/master/rl/policy.py).
test_policy__: A Keras-rl policy.
enable_double_dqn__: A boolean which enable target network as a second network proposed by van Hasselt et al. to decrease overfitting.
enable_dueling_dqn__: A boolean which enable dueling architecture proposed by Mnih et al.
dueling_type__: If `enable_dueling_dqn` is set to `True`, a type of dueling architecture must be chosen which calculate Q(s,a) from V(s) and A(s,a) differently. Note that `avg` is recommanded in the [paper](https://arxiv.org/abs/1511.06581).
`avg`: Q(s,a;theta) = V(s;theta) + (A(s,a;theta)-Avg_a(A(s,a;theta)))
`max`: Q(s,a;theta) = V(s;theta) + (A(s,a;theta)-max_a(A(s,a;theta)))
`naive`: Q(s,a;theta) = V(s;theta) + A(s,a;theta)
"""
def __init__(self, model, policy=None, test_policy=None, enable_double_dqn=False, enable_dueling_network=False,
dueling_type='avg', *args, **kwargs):
super(DQNAgent, self).__init__(*args, **kwargs)
# Validate (important) input.
if hasattr(model.output, '__len__') and len(tuple(model.output.shape)) > 2:
raise ValueError('Model "{}" has more than one output. DQN expects a model that has a single output.'.format(model))
if tuple(model.output.shape) != (None, self.nb_actions):
raise ValueError('Model output "{}" has invalid shape. DQN expects a model that has one dimension for each action, in this case {}.'.format(model.output, self.nb_actions))
# Parameters.
self.enable_double_dqn = enable_double_dqn
self.enable_dueling_network = enable_dueling_network
self.dueling_type = dueling_type
if self.enable_dueling_network:
# get the second last layer of the model, abandon the last layer
layer = model.layers[-2]
nb_action = model.output._keras_shape[-1]
# layer y has a shape (nb_action+1,)
# y[:,0] represents V(s;theta)
# y[:,1:] represents A(s,a;theta)
y = Dense(nb_action + 1, activation='linear')(layer.output)
# caculate the Q(s,a;theta)
# dueling_type == 'avg'
# Q(s,a;theta) = V(s;theta) + (A(s,a;theta)-Avg_a(A(s,a;theta)))
# dueling_type == 'max'
# Q(s,a;theta) = V(s;theta) + (A(s,a;theta)-max_a(A(s,a;theta)))
# dueling_type == 'naive'
# Q(s,a;theta) = V(s;theta) + A(s,a;theta)
if self.dueling_type == 'avg':
outputlayer = Lambda(lambda a: K.expand_dims(a[:, 0], -1) + a[:, 1:] - K.mean(a[:, 1:], axis=1, keepdims=True), output_shape=(nb_action,))(y)
elif self.dueling_type == 'max':
outputlayer = Lambda(lambda a: K.expand_dims(a[:, 0], -1) + a[:, 1:] - K.max(a[:, 1:], axis=1, keepdims=True), output_shape=(nb_action,))(y)
elif self.dueling_type == 'naive':
outputlayer = Lambda(lambda a: K.expand_dims(a[:, 0], -1) + a[:, 1:], output_shape=(nb_action,))(y)
else:
assert False, "dueling_type must be one of {'avg','max','naive'}"
model = Model(inputs=model.input, outputs=outputlayer)
# Related objects.
self.model = model
if policy is None:
policy = EpsGreedyQPolicy()
if test_policy is None:
test_policy = GreedyQPolicy()
self.policy = policy
self.test_policy = test_policy
# State.
self.reset_states()
def get_config(self):
config = super(DQNAgent, self).get_config()
config['enable_double_dqn'] = self.enable_double_dqn
config['dueling_type'] = self.dueling_type
config['enable_dueling_network'] = self.enable_dueling_network
config['model'] = get_object_config(self.model)
config['policy'] = get_object_config(self.policy)
config['test_policy'] = get_object_config(self.test_policy)
if self.compiled:
config['target_model'] = get_object_config(self.target_model)
return config
def compile(self, optimizer, metrics=[]):
metrics += [mean_q] # register default metrics
# We never train the target model, hence we can set the optimizer and loss arbitrarily.
self.target_model = clone_model(self.model, self.custom_model_objects)
self.target_model.compile(optimizer='sgd', loss='mse')
self.model.compile(optimizer='sgd', loss='mse')
# Compile model.
if self.target_model_update < 1.:
# We use the `AdditionalUpdatesOptimizer` to efficiently soft-update the target model.
updates = get_soft_target_model_updates(self.target_model, self.model, self.target_model_update)
optimizer = AdditionalUpdatesOptimizer(optimizer, updates)
def clipped_masked_error(args):
y_true, y_pred, mask = args
loss = huber_loss(y_true, y_pred, self.delta_clip)
loss *= mask # apply element-wise mask
return K.sum(loss, axis=-1)
# Create trainable model. The problem is that we need to mask the output since we only
# ever want to update the Q values for a certain action. The way we achieve this is by
# using a custom Lambda layer that computes the loss. This gives us the necessary flexibility
# to mask out certain parameters by passing in multiple inputs to the Lambda layer.
y_pred = self.model.output
y_true = Input(name='y_true', shape=(self.nb_actions,))
mask = Input(name='mask', shape=(self.nb_actions,))
loss_out = Lambda(clipped_masked_error, output_shape=(1,), name='loss')([y_true, y_pred, mask])
ins = [self.model.input] if type(self.model.input) is not list else self.model.input
trainable_model = Model(inputs=ins + [y_true, mask], outputs=[loss_out, y_pred])
assert len(trainable_model.output_names) == 2
combined_metrics = {trainable_model.output_names[1]: metrics}
losses = [
lambda y_true, y_pred: y_pred, # loss is computed in Lambda layer
lambda y_true, y_pred: K.zeros_like(y_pred), # we only include this for the metrics
]
trainable_model.compile(optimizer=optimizer, loss=losses, metrics=combined_metrics)
self.trainable_model = trainable_model
self.compiled = True
def load_weights(self, filepath):
self.model.load_weights(filepath)
self.update_target_model_hard()
def save_weights(self, filepath, overwrite=False):
self.model.save_weights(filepath, overwrite=overwrite)
def reset_states(self):
self.recent_action = None
self.recent_observation = None
if self.compiled:
self.model.reset_states()
self.target_model.reset_states()
def update_target_model_hard(self):
self.target_model.set_weights(self.model.get_weights())
def forward(self, observation):
# Select an action.
state = self.memory.get_recent_state(observation)
q_values = self.compute_q_values(state)
if self.training:
action = self.policy.select_action(q_values=q_values)
else:
action = self.test_policy.select_action(q_values=q_values)
# Book-keeping.
self.recent_observation = observation
self.recent_action = action
return action
def backward(self, reward, terminal):
# Store most recent experience in memory.
if self.step % self.memory_interval == 0:
self.memory.append(self.recent_observation, self.recent_action, reward, terminal,
training=self.training)
metrics = [np.nan for _ in self.metrics_names]
if not self.training:
# We're done here. No need to update the experience memory since we only use the working
# memory to obtain the state over the most recent observations.
return metrics
# Train the network on a single stochastic batch.
if self.step > self.nb_steps_warmup and self.step % self.train_interval == 0:
experiences = self.memory.sample(self.batch_size)
assert len(experiences) == self.batch_size
# Start by extracting the necessary parameters (we use a vectorized implementation).
state0_batch = []
reward_batch = []
action_batch = []
terminal1_batch = []
state1_batch = []
for e in experiences:
state0_batch.append(e.state0)
state1_batch.append(e.state1)
reward_batch.append(e.reward)
action_batch.append(e.action)
terminal1_batch.append(0. if e.terminal1 else 1.)
# Prepare and validate parameters.
state0_batch = self.process_state_batch(state0_batch)
state1_batch = self.process_state_batch(state1_batch)
terminal1_batch = np.array(terminal1_batch)
reward_batch = np.array(reward_batch)
assert reward_batch.shape == (self.batch_size,)
assert terminal1_batch.shape == reward_batch.shape
assert len(action_batch) == len(reward_batch)
# Compute Q values for mini-batch update.
if self.enable_double_dqn:
# According to the paper "Deep Reinforcement Learning with Double Q-learning"
# (van Hasselt et al., 2015), in Double DQN, the online network predicts the actions
# while the target network is used to estimate the Q value.
q_values = self.model.predict_on_batch(state1_batch)
assert q_values.shape == (self.batch_size, self.nb_actions)
actions = np.argmax(q_values, axis=1)
assert actions.shape == (self.batch_size,)
# Now, estimate Q values using the target network but select the values with the
# highest Q value wrt to the online model (as computed above).
target_q_values = self.target_model.predict_on_batch(state1_batch)
assert target_q_values.shape == (self.batch_size, self.nb_actions)
q_batch = target_q_values[range(self.batch_size), actions]
else:
# Compute the q_values given state1, and extract the maximum for each sample in the batch.
# We perform this prediction on the target_model instead of the model for reasons
# outlined in Mnih (2015). In short: it makes the algorithm more stable.
target_q_values = self.target_model.predict_on_batch(state1_batch)
assert target_q_values.shape == (self.batch_size, self.nb_actions)
q_batch = np.max(target_q_values, axis=1).flatten()
assert q_batch.shape == (self.batch_size,)
targets = np.zeros((self.batch_size, self.nb_actions))
dummy_targets = np.zeros((self.batch_size,))
masks = np.zeros((self.batch_size, self.nb_actions))
# Compute r_t + gamma * max_a Q(s_t+1, a) and update the target targets accordingly,
# but only for the affected output units (as given by action_batch).
discounted_reward_batch = self.gamma * q_batch
# Set discounted reward to zero for all states that were terminal.
discounted_reward_batch *= terminal1_batch
assert discounted_reward_batch.shape == reward_batch.shape
Rs = reward_batch + discounted_reward_batch
for idx, (target, mask, R, action) in enumerate(zip(targets, masks, Rs, action_batch)):
target[action] = R # update action with estimated accumulated reward
dummy_targets[idx] = R
mask[action] = 1. # enable loss for this specific action
targets = np.array(targets).astype('float32')
masks = np.array(masks).astype('float32')
# Finally, perform a single update on the entire batch. We use a dummy target since
# the actual loss is computed in a Lambda layer that needs more complex input. However,
# it is still useful to know the actual target to compute metrics properly.
# pdb.set_trace()
ins = [state0_batch] if type(self.model.input) is not list else state0_batch
metrics = self.trainable_model.train_on_batch(ins + [targets, masks], [dummy_targets, targets])
metrics = [metric for idx, metric in enumerate(metrics) if idx not in (1, 2)] # throw away individual losses
metrics += self.policy.metrics
if self.processor is not None:
metrics += self.processor.metrics
if self.target_model_update >= 1 and self.step % self.target_model_update == 0:
self.update_target_model_hard()
return metrics
...
我已经尝试使用调试器进行挖掘,但除了这两个错误消息之外,我运气不佳。是否有其他方法需要我重写,这可能会引发这样的错误?