0

我知道在 StackOverflow 上讨论了很多类似的话题,但是我在 StackOverflow 和互联网上都做了很多研究,但我找不到解决方案。我正在尝试实现经典的 Deep Q 学习算法来解决 openAI 健身房的推车游戏: OpenAI Gym Cartpole

首先,我创建了一个生成随机权重的代理。结果如下图所示: 代理使用随机搜索来击败购物车

令人惊讶的是,代理通过在每个情节中从(-1.0 到 1.0)简单地生成 4 个随机均匀权重 [w1, w2, w3, w4] 在许多情节中设法达到 200 步(这是最大值)。

所以,我决定实现一个只有 4 个权重和 2 个偏差的简单 DQN,并让代理随着时间的推移学习这个游戏。权重将在开始时随机初始化,并且在代理执行步骤时将使用反向传播来更新它们。

我使用 Epsilon Greedy 策略让代理在开始时进行探索,然后利用 Q 值。然而,与随机代理相比,结果令人失望:

在此处输入图像描述

我尝试调整很多参数和不同的架构,结果变化不大。所以,我的问题如下:

问题: 我是否对 DQN 进行了错误的实现,或者一个简单的 DQN 无法击败cartpole?你有什么经验?它确实减少了损失(错误),但并不能保证一个好的解决方案。提前致谢。

import tensorflow as tf
import gym
import numpy as np
import random as rand
import matplotlib.pyplot as plt

# Cartpole's Observation:
#   4 Inputs
#   2 Actions (LEFT | RIGHT)
input_size = 4
output_size = 2

# Deep Q Network Class
class DQN:
    def __init__(self, var_names):
        self.var_names = var_names

        self._define_placeholders()
        self._add_layers()
        self._define_loss()
        self._choose_optimizer()
        self._initialize()

    # Placeholders:
    # Inputs: The place where we feed the Observations (States).
    # Targets: Q_target = R + gamma*Q(s', a*).
    def _define_placeholders(self):
        self.inputs = tf.placeholder(tf.float32, shape=(None, input_size), name='inputs')
        self.targets = tf.placeholder( tf.float32, shape=(None, output_size), name='targets')

    # Layers:
    # 4 Input Weights.
    # 2 Biases.
    # output = softmax(inputs*weights + biases).
    # Weights and biases are initialized randomly.
    def _add_layers(self):
        w = tf.get_variable(name=self.var_names[0], shape=(input_size, output_size),
                                initializer=tf.initializers.random_uniform(minval=-1.0, maxval=1.0) )
        b = tf.get_variable(name=self.var_names[1], shape=(output_size),
                                initializer=tf.initializers.random_uniform(minval=-1.0, maxval=1.0) )
        self.outputs = tf.nn.softmax(tf.matmul(self.inputs, w) + b)
        self.prediction = tf.argmax(self.outputs, 1)

    # Loss = MSE.
    def _define_loss(self):
        self.mean_loss = tf.losses.mean_squared_error(labels=self.targets, predictions=self.outputs) / 2

    # AdamOptimizer with starting learning rate: a = 0.005.
    def _choose_optimizer(self):
        self.optimizer = tf.train.AdamOptimizer(learning_rate=0.005).minimize(loss=self.mean_loss)

    # Initializes the dqn's weights.
    def _initialize(self):
        initializer = tf.global_variables_initializer()
        self.sess = tf.InteractiveSession()
        self.sess.run(initializer)

    # Get's current's DQN weights.
    def get_weights(self):
        return [ self.sess.run( tf.trainable_variables(var) )[0] for var in self.var_names ]
        
    # Updates the weights of DQN.
    def update_weights(self, new_weights):
        variables = [tf.trainable_variables(name)[0] for name in self.var_names]
        update = [ tf.assign(var, weight) for (var, weight) in zip(variables, new_weights) ]
        self.sess.run(update)

    # Predicts the best possible action from a state s.
    # a* = argmax( Q(s) )
    # Returns from Q(s), a*
    def predict(self, states):
        Q, actions = self.sess.run( [self.outputs, self.prediction],
                                    feed_dict={self.inputs: states} )
        return Q, actions

    # It partially fits the given observations and the targets into the network.
    def partial_fit(self, states, targets):
        _, loss = self.sess.run( [self.optimizer, self.mean_loss],
                                    feed_dict={self.inputs: states, self.targets: targets} )
        return loss

# Replay Memory Buffer
# It stores experiences as (s,a,r,s') --> (State, Action, Reward, Next_Action).
# It generates random mini-batches of experiences from the memory.
# If the memory is full, then it deletes the oldest experiences. Experience is an step.
class ReplayMemory:
    def __init__(self, mem_size):
        self.mem_size = mem_size
        self.experiences = []

    def add_experience(self, xp):
        self.experiences.append(xp)
        if len(self.experiences) > self.mem_size:
            self.experiences.pop(0)

    def random_batch(self, batch_size):
        if len(self.experiences) < batch_size:
            return self.experiences
        else:
            return rand.sample(self.experiences, batch_size)

# The agent's class.
# It contains 2 DQNs: Online DQN for Predictions and Target DQN for the targets.
class Agent:
    def __init__(self, epsilon, epsilon_decay, min_epsilon, gamma, mem_size):
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.min_epsilon = min_epsilon
        self.gamma = gamma
        self.replay_mem = ReplayMemory(mem_size)
        self.online_dqn = DQN( var_names=['online_w', 'online_b'] )
        self.target_dqn = DQN( var_names=['target_w', 'target_b'] )
        self.state = None

    def set_epsilon(self, epsilon):
        self.epsilon = epsilon

    def reduce_epsilon(self):
        if self.epsilon > self.min_epsilon:
            self.epsilon -= self.epsilon_decay
    
    def update_state(self, state):
        self.state = state

    def update_memory(self, state, action, reward, next_state):
       experience = (state, action, reward, next_state)
        self.replay_mem.add_experience(experience)

    # It updates the target network after N steps.
    def update_network(self):
        self.target_dqn.update_weights( self.online_dqn.get_weights() )

    # Randomly chooses an action from the enviroment.
    def explore(self, env):
        action = env.action_space.sample()
        return action

    # Predicts and chooses the best possible moves from the current state.
    def exploit(self):
        _, action = self.online_dqn.predict(self.state)
        return action[0]

    # Uses Epsilon-Greedy to decide whether to explore or exploit.
    # Epsilon starts with 1 and is reduced over the time.
    # After the agent makes a move, he returns: state, action, reward, next_state.
    def take_action(self, env):
        action = None
        p = rand.uniform(0.0, 1.0)
        if p < self.epsilon:
            action = self.explore(env)
        else:
            action = self.exploit()
        next_state, reward, done, _ = env.step(action)
        if done:
            next_state = None
        else:
            next_state = np.reshape( next_state, (1, input_size) )
        return self.state, action, reward, next_state, done

    # Trains the agent.
    # A random mini-batch is generated from the memory.
    # We feed each experience into the DQN.
    # For each 
    # Q(s) = Qtarget(s)
    # Q(s'), a* = Qtarget(s'), argmax Q(s')
    # We set targets = Q(s')

    # For each action (a), reward (r), next_state (s') in the batch:
    # If s' is None the GameOver. So, we set target[i] = Reward
    # If s' != None, then target[i][a] = r + gamma*Q(s', 'a')

    # Then, the online DQN calculates the mean squared difference of r + gamma*Q(s', 'a') - Q(s, a)
    # and uses Back-Propagation to update the weights.
    def train(self):
        mini_batch = self.replay_mem.random_batch(batch_size=256)
        batch_size = len(mini_batch)
        states = np.zeros( shape=(batch_size, input_size) )
        next_states = np.zeros( shape=(batch_size, input_size) )
        for i in range(batch_size):
            states[i] = mini_batch[i][0]
            next_states[i] = mini_batch[i][3]

        Q, _ = self.target_dqn.predict(states)
        next_Q, next_actions = self.target_dqn.predict(next_states)
        targets = Q
        for i in range(batch_size):
            action = mini_batch[i][1]
            reward = mini_batch[i][2]
            next_state = mini_batch[i][3]
            if next_state is None:
                targets[i][action] = reward
            else:
                targets[i][action] = reward + self.gamma * next_Q[i][ next_actions[i] ]
        loss = self.online_dqn.partial_fit(states, targets)
        return loss
    
def play(agent, env, episodes, N, render=False, train=True):
    ep = 0
    episode_steps = []
    steps = 0
    total_steps = 0
    loss = 0

    # Sets the current state as the initial.
    # Cartpole spawns the agent in a random state.
    agent.update_state( np.reshape( env.reset(), (1, input_size) ) )
    agent.update_network()

    while ep < episodes:
        if render:
            env.render()
    
        # The target DQN's weights are frozen.
        # The agent Updates the Target DQN's Weights after 100 steps.
        if train and total_steps % N == 0:
            agent.update_network()
            print('---Target network updated---')

        # Takes action.
        state, action, reward, next_state, done = agent.take_action(env)

        # Updates the memory and the current state.
        agent.update_memory(state, action, reward, next_state)
        agent.update_state(next_state)
        steps += 1
        total_steps += 1

        if train:
            loss = agent.train()

        if done:
            agent.update_state( np.reshape( env.reset(), (1, input_size) ) )
            episode_steps.append(steps)
            ep += 1
            if train:
                agent.reduce_epsilon()
                print('End of episode', ep, 'Training loss =', loss, 'Steps =', steps)
            steps = 0

    if render:
        env.close()

    return episode_steps

env = gym.make('CartPole-v0')

# Training the agent.
agent = Agent(epsilon=1, epsilon_decay = 0.01, min_epsilon = 0.05, gamma=0.9, mem_size=50000)
episodes = 1000
N = 100
episode_steps = play(agent, env, episodes, N)

# Plotting the results.
# After the training is done, the steps should be maximized (up to 200)
plt.plot(episode_steps)
plt.show()

# Testing the agent.
agent.set_epsilon(0)
episodes = 1
steps = play(agent, env, episodes, N, render=True, train=False)[0]
print('\nSteps =', steps)
4

1 回答 1

0

该算法运行良好。当我决定绘制数据时,我将其用作指标:

Rewards / Episode

大多数深度强化学习框架(例如 tf-agents)使用平均奖励(例如每 10 集的平均奖励),这就是情节看起来如此流畅的原因。如果你看上面的情节,代理在大多数情况下都能获得高分。

另外,我决定使用 numpy 操作而不是“for”循环来提高算法的速度。你可以在这里查看我的实现:

https://github.com/kochlisGit/Deep-Reinforcement-Learning/tree/master/Custom%20DQN

于 2021-03-27T17:49:59.880 回答