更新 1:修改了贪婪的 epsilon 政策,因为在使 epsilon 数量非常少之前所花费的情节数量非常少。我已经更新了代码。
新问题是经过充分训练后它不应该有太大的偏差,但它会选择错误的值并立即发散是 epsilon 变小
我已经在openai 健身房平台上工作了很长一段时间,因为我的目标是了解更多关于强化学习的知识。在堆栈溢出用户@sajad 的帮助下,我已经成功实现了双深度 Q 学习(DQN)和优先体验重播(PER)。在cart-pole 问题上,通过仔细的超参数调整获得了非常好的成功率。这是迄今为止我学到的最好的算法,但无论我做什么,我似乎都无法在山地车问题上得到这项工作,因为剧集的奖励总是保持在 -200。我查看了我的代码,从各种教程中我认为我的内存实现是正确的。
从基本 DQN 到 PER 的 DQN 的算法似乎都不起作用。
如果我能在调试代码或任何其他可能导致它不收敛的实现更改方面获得一些帮助,那将会很有帮助
这是我的实现:所有参数都有常用名称
# implemented using sum_tree
import os
import random
import gym
import numpy as np
import tensorflow as tf
from memory import Memory
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
env = gym.make("MountainCar-v0")
env.reset()
model_save_path = "C:/Users/sanka/codes/mountain car openai/mc_save"
class dqn(object):
def __init__(self):
self.flag = 0
self.batch_size = 64
self.episodes = 20000
self.input_size = env.observation_space.sample().size
self.output_size = env.action_space.n
self.gamma = 0.99
self.epsilon = 1.0
self.step = 0
self.learning_rate = 0.0001
self.lambda1 = 0.001
self.initial_epsilon = self.epsilon
self.final_epsilon = 0.01
self.weights = {}
self.biases = {}
self.target_weights = {}
self.target_biases = {}
self.create_nn()
self.create_training_network()
self.max_size = 10000
self.memory = Memory(size=self.max_size)
self.sess = tf.InteractiveSession()
self.sess.run(tf.global_variables_initializer())
self.saver = tf.train.Saver()
def create_nn(self):
s1 = {1: [self.input_size, 30], 2: [30, 100], 3: [100, 30], 4: [30, self.output_size]}
s2 = {1: [30], 2: [100], 3: [30], 4: [self.output_size]}
for i in s1:
self.weights[i] = tf.Variable(tf.truncated_normal(s1[i]), name='w{0}'.format(i))
self.biases[i] = tf.Variable(tf.truncated_normal(s2[i]), name='b{0}'.format(i))
self.target_weights[i] = tf.Variable(tf.truncated_normal(s1[i]), name='tw{0}'.format(i))
self.target_biases[i] = tf.Variable(tf.truncated_normal(s2[i]), name='tb{0}'.format(i))
def feed_forward(self, z):
q = tf.nn.relu(tf.matmul(z, self.weights[1]) + self.biases[1])
for i in range(2, len(self.weights), 1):
q = tf.nn.relu(tf.matmul(q, self.weights[i]) + self.biases[i])
q = tf.matmul(q, self.weights[len(self.weights)]) + self.biases[len(self.biases)]
return q
def feed_forward_target(self, z):
q = tf.nn.relu(tf.matmul(z, self.target_weights[1]) + self.target_biases[1])
for i in range(2, len(self.weights), 1):
q = tf.nn.relu(tf.matmul(q, self.target_weights[i]) + self.target_biases[i])
q = tf.matmul(q, self.target_weights[len(self.weights)]) + self.target_biases[len(self.weights)]
return q
def create_training_network(self):
self.x = tf.placeholder(tf.float32, [None, self.input_size])
self.y = tf.placeholder(tf.float32, [None])
self.a = tf.placeholder(tf.float32, [None, self.output_size])
self.q_value = self.feed_forward(self.x)
self.q_value_target = self.feed_forward_target(self.x)
self.output = tf.reduce_sum(tf.multiply(self.q_value, self.a), reduction_indices=1)
self.action = tf.argmax(self.q_value, 1)
self.loss = tf.reduce_mean(tf.square(self.output - self.y))
self.optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(self.loss)
def append_to_memory(self, state, action, reward, next_state, done):
one_hot_action = np.zeros(self.output_size)
one_hot_action[action] = 1.0
prob = (abs(reward) + .01) ** 0.6
self.memory.append(prob, (state, one_hot_action, reward, next_state, done))
if self.memory.current_size >= self.memory.size:
self.step += 1
# self.epsilon = self.final_epsilon + (self.initial_epsilon - self.final_epsilon) * np.exp(
# -self.lambda1 * (self.step / 200))
self.epsilon = max(self.initial_epsilon - (self.step / 200) * self.lambda1, self.final_epsilon)
if (self.flag == 0):
print("started training")
self.flag = 1
self.train()
def get_reward(self, q1, q2, reward, done):
if done:
return reward
else:
return reward + self.gamma * q2[np.argmax(q1)]
def train(self):
index, sample = self.memory.sample(self.batch_size)
train_x = [i[0] for i in sample]
action = [i[1] for i in sample]
reward = [i[2] for i in sample]
next_state = [i[3] for i in sample]
train_y = []
q = self.sess.run(self.q_value, feed_dict={self.x: np.array(train_x)})
q_1 = self.sess.run(self.q_value, feed_dict={self.x: np.array(next_state)})
q_next = self.sess.run(self.q_value_target, feed_dict={self.x: np.array(next_state)})
for i in range(len(reward)):
train_y.append(self.get_reward(q_1[i], q_next[i], reward[i], sample[i][4]))
train_y = np.array(train_y)
train_x = np.array(train_x)
action = np.array(action)
self.sess.run(self.optimizer, feed_dict={self.x: train_x, self.y: train_y, self.a: action})
for i in range(self.batch_size):
error = abs(np.max(q[i]) - train_y[i])
self.memory.update(index[i], (error + 0.01) ** 0.6)
# return loss
def copy_variables(self):
for i in range(1, len(self.weights) + 1, 1):
self.sess.run(self.target_weights[i].assign(self.weights[i]))
self.sess.run(self.target_biases[i].assign(self.biases[i]))
def save(self):
self.saver.save(self.sess, model_save_path)
print("model saved")
def main():
obj = dqn()
for e in range(obj.episodes):
p = env.reset()
for i in range(500):
# obj.step += 1
ac = obj.sess.run(obj.action, feed_dict={obj.x: np.array([p])})[0]
if np.random.rand() < obj.epsilon:
ac = random.randint(0, obj.output_size - 1)
obs, rew, done, _ = env.step(ac)
obj.append_to_memory(p, ac, rew, obs, done)
p = obs
if done:
break
if obj.step % 1000 == 0 and obj.flag == 1:
obj.copy_variables()
# print("episode {0} completed with loss: {1}".format(e, total_loss))
if e % 100 == 0:
print("episodes {0} completed".format(e), )
av = []
for f in range(10):
p = env.reset()
r = 0
for i in range(200):
ac = obj.sess.run(obj.action, feed_dict={obj.x: np.array([p])})[0]
p, rew, done, _ = env.step(ac)
r += rew
if done:
break
av.append(r)
print("average score is {0}".format(np.average(np.array(av))))
obj.save()
if __name__ == '__main__':
main()
这里的参考是作为单独模块实现的内存的实现:
import numpy as np
import random
class Memory(object):
def __init__(self, size):
self.size = size
self.data = np.zeros(size, dtype=object)
self.tree = np.zeros(2 * size - 1, dtype=np.float32)
self.current_size = 0
self.last = 0
def append(self, p, data):
self.current_size = min(self.current_size + 1, self.size)
cur = self.last + self.size - 1
self.update_at_index(cur, p - self.tree[cur])
self.data[self.last] = data
self.last += 1
if self.last >= self.size:
self.last = 0
def update(self, index, p):
self.update_at_index(index, p - self.tree[index])
def update_at_index(self, index, change):
while (index >= 0):
self.tree[index] += change
index = (index - 1) // 2
def get(self, index, s):
left = index * 2 + 1
if (left >= self.size):
return (index, self.data[index + 1 - self.size])
if (self.tree[left] >= s):
return self.get(left, s)
else:
right = left + 1
return self.get(right, s - self.tree[left])
def sample(self, n):
av_sum = self.tree[0] / n
l = []
m = []
for i in range(n):
min_sum = av_sum * i
max_sum = av_sum * (i + 1)
s = random.uniform(min_sum, max_sum)
x = self.get(0, s)
l.append(x[0])
m.append(x[1])
return l, m
提前致谢