2

更新 1:修改了贪婪的 epsilon 政策,因为在使 epsilon 数量非常少之前所花费的情节数量非常少。我已经更新了代码。

新问题是经过充分训练后它不应该有太大的偏差,但它会选择错误的值并立即发散是 epsilon 变小


我已经在openai 健身房平台上工作了很长一段时间,因为我的目标是了解更多关于强化学习的知识。在堆栈溢出用户@sajad 的帮助下,我已经成功实现了双深度 Q 学习(DQN)和优先体验重播(PER)。在cart-pole 问题上,通过仔细的超参数调整获得了非常好的成功率。这是迄今为止我学到的最好的算法,但无论我做什么,我似乎都无法在山地车问题上得到这项工作,因为剧集的奖励总是保持在 -200。我查看了我的代码,从各种教程中我认为我的内存实现是正确的。

从基本 DQN 到 PER 的 DQN 的算法似乎都不起作用。

如果我能在调试代码或任何其他可能导致它不收敛的实现更改方面获得一些帮助,那将会很有帮助

这是我的实现:所有参数都有常用名称

    # implemented using sum_tree

import os
import random

import gym
import numpy as np
import tensorflow as tf
from memory import Memory

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
env = gym.make("MountainCar-v0")
env.reset()
model_save_path = "C:/Users/sanka/codes/mountain car openai/mc_save"


class dqn(object):
    def __init__(self):
        self.flag = 0
        self.batch_size = 64
        self.episodes = 20000
        self.input_size = env.observation_space.sample().size
        self.output_size = env.action_space.n
        self.gamma = 0.99
        self.epsilon = 1.0
        self.step = 0
        self.learning_rate = 0.0001
        self.lambda1 = 0.001
        self.initial_epsilon = self.epsilon
        self.final_epsilon = 0.01
        self.weights = {}
        self.biases = {}
        self.target_weights = {}
        self.target_biases = {}
        self.create_nn()
        self.create_training_network()
        self.max_size = 10000
        self.memory = Memory(size=self.max_size)
        self.sess = tf.InteractiveSession()
        self.sess.run(tf.global_variables_initializer())
        self.saver = tf.train.Saver()

    def create_nn(self):

        s1 = {1: [self.input_size, 30], 2: [30, 100], 3: [100, 30], 4: [30, self.output_size]}
        s2 = {1: [30], 2: [100], 3: [30], 4: [self.output_size]}
        for i in s1:
            self.weights[i] = tf.Variable(tf.truncated_normal(s1[i]), name='w{0}'.format(i))
            self.biases[i] = tf.Variable(tf.truncated_normal(s2[i]), name='b{0}'.format(i))
            self.target_weights[i] = tf.Variable(tf.truncated_normal(s1[i]), name='tw{0}'.format(i))
            self.target_biases[i] = tf.Variable(tf.truncated_normal(s2[i]), name='tb{0}'.format(i))

    def feed_forward(self, z):
        q = tf.nn.relu(tf.matmul(z, self.weights[1]) + self.biases[1])
        for i in range(2, len(self.weights), 1):
            q = tf.nn.relu(tf.matmul(q, self.weights[i]) + self.biases[i])
        q = tf.matmul(q, self.weights[len(self.weights)]) + self.biases[len(self.biases)]
        return q

    def feed_forward_target(self, z):
        q = tf.nn.relu(tf.matmul(z, self.target_weights[1]) + self.target_biases[1])
        for i in range(2, len(self.weights), 1):
            q = tf.nn.relu(tf.matmul(q, self.target_weights[i]) + self.target_biases[i])
        q = tf.matmul(q, self.target_weights[len(self.weights)]) + self.target_biases[len(self.weights)]
        return q

    def create_training_network(self):
        self.x = tf.placeholder(tf.float32, [None, self.input_size])
        self.y = tf.placeholder(tf.float32, [None])
        self.a = tf.placeholder(tf.float32, [None, self.output_size])
        self.q_value = self.feed_forward(self.x)
        self.q_value_target = self.feed_forward_target(self.x)
        self.output = tf.reduce_sum(tf.multiply(self.q_value, self.a), reduction_indices=1)
        self.action = tf.argmax(self.q_value, 1)
        self.loss = tf.reduce_mean(tf.square(self.output - self.y))
        self.optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(self.loss)

    def append_to_memory(self, state, action, reward, next_state, done):
        one_hot_action = np.zeros(self.output_size)
        one_hot_action[action] = 1.0
        prob = (abs(reward) + .01) ** 0.6
        self.memory.append(prob, (state, one_hot_action, reward, next_state, done))
        if self.memory.current_size >= self.memory.size:
            self.step += 1
            # self.epsilon = self.final_epsilon + (self.initial_epsilon - self.final_epsilon) * np.exp(
            #    -self.lambda1 * (self.step / 200))
            self.epsilon = max(self.initial_epsilon - (self.step / 200) * self.lambda1, self.final_epsilon)
            if (self.flag == 0):
                print("started training")
                self.flag = 1
            self.train()

    def get_reward(self, q1, q2, reward, done):
        if done:
            return reward
        else:
            return reward + self.gamma * q2[np.argmax(q1)]

    def train(self):
        index, sample = self.memory.sample(self.batch_size)
        train_x = [i[0] for i in sample]
        action = [i[1] for i in sample]
        reward = [i[2] for i in sample]
        next_state = [i[3] for i in sample]
        train_y = []
        q = self.sess.run(self.q_value, feed_dict={self.x: np.array(train_x)})
        q_1 = self.sess.run(self.q_value, feed_dict={self.x: np.array(next_state)})
        q_next = self.sess.run(self.q_value_target, feed_dict={self.x: np.array(next_state)})
        for i in range(len(reward)):
            train_y.append(self.get_reward(q_1[i], q_next[i], reward[i], sample[i][4]))
        train_y = np.array(train_y)
        train_x = np.array(train_x)
        action = np.array(action)
        self.sess.run(self.optimizer, feed_dict={self.x: train_x, self.y: train_y, self.a: action})
        for i in range(self.batch_size):
            error = abs(np.max(q[i]) - train_y[i])
            self.memory.update(index[i], (error + 0.01) ** 0.6)
            # return loss

    def copy_variables(self):
        for i in range(1, len(self.weights) + 1, 1):
            self.sess.run(self.target_weights[i].assign(self.weights[i]))
            self.sess.run(self.target_biases[i].assign(self.biases[i]))

    def save(self):
        self.saver.save(self.sess, model_save_path)
        print("model saved")


def main():
    obj = dqn()
    for e in range(obj.episodes):
        p = env.reset()
        for i in range(500):
            # obj.step += 1
            ac = obj.sess.run(obj.action, feed_dict={obj.x: np.array([p])})[0]
            if np.random.rand() < obj.epsilon:
                ac = random.randint(0, obj.output_size - 1)

            obs, rew, done, _ = env.step(ac)
            obj.append_to_memory(p, ac, rew, obs, done)
            p = obs
            if done:
                break
            if obj.step % 1000 == 0 and obj.flag == 1:
                obj.copy_variables()
        # print("episode {0} completed with loss: {1}".format(e, total_loss))

        if e % 100 == 0:
            print("episodes {0} completed".format(e), )
            av = []
            for f in range(10):
                p = env.reset()
                r = 0
                for i in range(200):
                    ac = obj.sess.run(obj.action, feed_dict={obj.x: np.array([p])})[0]
                    p, rew, done, _ = env.step(ac)
                    r += rew
                    if done:
                        break
                av.append(r)
            print("average score is {0}".format(np.average(np.array(av))))
            obj.save()


if __name__ == '__main__':
    main()

这里的参考是作为单独模块实现的内存的实现:

import numpy as np
import random


class Memory(object):
    def __init__(self, size):
        self.size = size
        self.data = np.zeros(size, dtype=object)
        self.tree = np.zeros(2 * size - 1, dtype=np.float32)
        self.current_size = 0
        self.last = 0

    def append(self, p, data):
        self.current_size = min(self.current_size + 1, self.size)
        cur = self.last + self.size - 1
        self.update_at_index(cur, p - self.tree[cur])
        self.data[self.last] = data
        self.last += 1
        if self.last >= self.size:
            self.last = 0

    def update(self, index, p):
        self.update_at_index(index, p - self.tree[index])

    def update_at_index(self, index, change):
        while (index >= 0):
            self.tree[index] += change
            index = (index - 1) // 2

    def get(self, index, s):
        left = index * 2 + 1
        if (left >= self.size):
            return (index, self.data[index + 1 - self.size])
        if (self.tree[left] >= s):
            return self.get(left, s)
        else:
            right = left + 1
            return self.get(right, s - self.tree[left])

    def sample(self, n):
        av_sum = self.tree[0] / n
        l = []
        m = []
        for i in range(n):
            min_sum = av_sum * i
            max_sum = av_sum * (i + 1)
            s = random.uniform(min_sum, max_sum)
            x = self.get(0, s)
            l.append(x[0])
            m.append(x[1])
        return l, m

提前致谢

4

3 回答 3

2

我研究了一个连续版本的 montain car(现在在 openai gym 中)并用 DDPG 解决了这个问题,在我的实验中,我发现如果在最初的几集中没有获得任何奖励,它就会学会什么都不做。所以这里是一个探索问题,也许你可以让它在开始学习之前对一些情节进行随机动作。或者找到一种奖励探索的方法。(例如,当我奖励从未见过的观察时,它对我很有用)。

于 2017-07-25T13:17:17.563 回答
0

我在 MountainCar 问题上尝试了没有优先经验回放的简单 DQN,它收敛得很好。我觉得问题可能出在这两个方面:

1)原始状态表示只有4维,这使得许多不同的状态无法区分。所以我利用以下代码(来自其他人的博客)来提取状态表示的一些特征,从而最大化状态之间的差异。

class Featurize_state():
def __init__(self, env, no_change = False):
    self.no_change = no_change
    if no_change == True:
        self.After_featurize_state_dim = env.observation_space.shape[0]
        return 

    # to accumulate experiences  
    observation_examples = np.array([env.observation_space.sample() for x in range(10000)])
    # calculate empirical mean and std deviation
    self.scaler = sklearn.preprocessing.StandardScaler()
    self.scaler.fit(observation_examples)

    # Used to converte a state to a featurizes represenation.
    # use RBF kernels with different variances to cover different parts of the space
    self.featurizer = sklearn.pipeline.FeatureUnion([
    ("rbf1", RBFSampler(gamma=5.0, n_components=50)),
    ("rbf2", RBFSampler(gamma=2.0, n_components=100))
    #("rbf3", RBFSampler(gamma=1.0, n_components=100)),
    #("rbf4", RBFSampler(gamma=0.5, n_components=100))
    ])
    self.featurizer.fit(observation_examples)
    self.After_featurize_state_dim = 150

def get_featurized_state_dim(self):
    return self.After_featurize_state_dim

def transfer(self, state):
    if self.no_change:
        return state

    #scaled = self.scaler.transform([state])
    featurized = self.featurizer.transform([state])
    return featurized[0]
    #return state

2)当奖励稀疏时(因为在你到达目标之前没有奖励),不要在你的神经网络中使用太多的层​​。学习模型越复杂,携带它需要学习的信息(奖励)的样本就越多。仅使用带有 relu 的一层并结合状态特征化对我来说非常有效。

于 2018-04-13T11:37:21.400 回答
-3

除了已经说过的话。我建议你另外两个对我有用的方面。

  1. 创建自定义奖励以加速 Q 学习的收敛。增加奖励以鼓励汽车的动力对我有用。

  2. 尝试跳帧。正如DeepMind DQN Nature关于跳帧的论文所述,“代理在每 k 帧而不是每一帧上看到并选择动作”。在山地车问题的情况下,这意味着您可以对接下来的第 k 个状态重复相同的操作。对我来说重复相同的动作,每 4 个状态都有效。

于 2018-08-18T00:43:50.710 回答