我得到了以下代码,我试图将每次运行获得的平均总奖励与状态的效用进行比较。我已经能够找到每个状态的效用,但不确定每次运行的奖励意味着什么。
另外,我想更改我的跑步的起始状态(即)而不是从(0,0)开始,它可以从(1,2)开始。我怎么去编码呢?
感谢您的任何帮助。
from __future__ import generators
import operator
import random
from collections import defaultdict
import numpy as np
orientations = [(1,0), (0, 1), (-1, 0), (0, -1)]
def turn_right(orientation):
return orientations[orientations.index(orientation)-1]
def turn_left(orientation):
return orientations[(orientations.index(orientation)+1) % len(orientations)]
def vector_add(a, b):
"""Component-wise addition of two vectors.
>>> vector_add((0, 1), (8, 9))
(8, 10)
"""
return tuple(map(operator.add, a, b))
def isnumber(x):
"Is x a number? We say it is if it has a __int__ method."
return hasattr(x, '__int__')
def if_(test, result, alternative):
"""Like C++ and Java's (test ? result : alternative), except
both result and alternative are always evaluated. However, if
either evaluates to a function, it is applied to the empty arglist,
so you can delay execution by putting it in a lambda.
>>> if_(2 + 2 == 4, 'ok', lambda: expensive_computation())
'ok'
"""
if test:
if callable(result): return result()
return result
else:
if callable(alternative): return alternative()
return alternative
def print_table(table, header=None, sep=' ', numfmt='%g'):
"""Print a list of lists as a table, so that columns line up nicely.
header, if specified, will be printed as the first row.
numfmt is the format for all numbers; you might want e.g. '%6.2f'.
(If you want different formats in different columns, don't use print_table.)
sep is the separator between columns."""
justs = [if_(isnumber(x), 'rjust', 'ljust') for x in table[0]]
if header:
table = [header] + table
table = [[if_(isnumber(x), lambda: numfmt % x, x) for x in row]
for row in table]
maxlen = lambda seq: max(map(len, seq))
sizes = map(maxlen, zip(*[map(str, row) for row in table]))
for row in table:
for (j, size, x) in zip(justs, sizes, row):
print (getattr(str(x), j)(size), sep,)
print
class MDP:
"""A Markov Decision Process, defined by an initial state, transition model,
and reward function. We also keep track of a gamma value, for use by
algorithms. The transition model is represented somewhat differently from
the text. Instead of P(s' | s, a) being a probability number for each
state/state/action triplet, we instead have T(s, a) return a
list of (p, s') pairs. We also keep track of the possible states,
terminal states, and actions for each state. [Page 646]"""
def __init__(self, init, actlist, terminals, transitions=None, reward=None, states=None, gamma=0.9):
if not (0 < gamma <= 1):
raise ValueError("An MDP must have 0 < gamma <= 1")
# collect states from transitions table if not passed.
self.states = states or self.get_states_from_transitions(transitions)
self.init = init
if isinstance(actlist, list):
# if actlist is a list, all states have the same actions
self.actlist = actlist
elif isinstance(actlist, dict):
# if actlist is a dict, different actions for each state
self.actlist = actlist
self.terminals = terminals
self.transitions = transitions or {}
if not self.transitions:
print("Warning: Transition table is empty.")
self.gamma = gamma
self.reward = reward or {s: 0 for s in self.states}
# self.check_consistency()
def R(self, state):
"""Return a numeric reward for this state."""
return self.reward[state]
def T(self, state, action):
"""Transition model. From a state and an action, return a list
of (probability, result-state) pairs."""
if not self.transitions:
raise ValueError("Transition model is missing")
else:
return self.transitions[state][action]
def actions(self, state):
"""Return a list of actions that can be performed in this state. By default, a
fixed list of actions, except for terminal states. Override this
method if you need to specialize by state."""
if state in self.terminals:
return [None]
else:
return self.actlist
def get_states_from_transitions(self, transitions):
if isinstance(transitions, dict):
s1 = set(transitions.keys())
s2 = set(tr[1] for actions in transitions.values()
for effects in actions.values()
for tr in effects)
return s1.union(s2)
else:
print('Could not retrieve states from transitions')
return None
def check_consistency(self):
# check that all states in transitions are valid
assert set(self.states) == self.get_states_from_transitions(self.transitions)
# check that init is a valid state
assert self.init in self.states
# check reward for each state
assert set(self.reward.keys()) == set(self.states)
# check that all terminals are valid states
assert all(t in self.states for t in self.terminals)
# check that probability distributions for all actions sum to 1
for s1, actions in self.transitions.items():
for a in actions.keys():
s = 0
for o in actions[a]:
s += o[0]
assert abs(s - 1) < 0.001
class GridMDP(MDP):
"""A two-dimensional grid MDP, as in [Figure 17.1]. All you have to do is
specify the grid as a list of lists of rewards; use None for an obstacle
(unreachable state). Also, you should specify the terminal states.
An action is an (x, y) unit vector; e.g. (1, 0) means move east."""
def __init__(self, grid, terminals, init=(0, 0), gamma=.9):
grid.reverse() # because we want row 0 on bottom, not on top
reward = {}
states = set()
self.rows = len(grid)
self.cols = len(grid[0])
self.grid = grid
for x in range(self.cols):
for y in range(self.rows):
if grid[y][x]:
states.add((x, y))
reward[(x, y)] = grid[y][x]
self.states = states
actlist = orientations
transitions = {}
for s in states:
transitions[s] = {}
for a in actlist:
transitions[s][a] = self.calculate_T(s, a)
MDP.__init__(self, init, actlist=actlist,
terminals=terminals, transitions=transitions,
reward=reward, states=states, gamma=gamma)
def calculate_T(self, state, action):
if action:
return [(0.8, self.go(state, action)),
(0.1, self.go(state, turn_right(action))),
(0.1, self.go(state, turn_left(action)))]
else:
return [(0.0, state)]
def T(self, state, action):
return self.transitions[state][action] if action else [(0.0, state)]
def go(self, state, direction):
"""Return the state that results from going in this direction."""
state1 = vector_add(state, direction)
return state1 if state1 in self.states else state
def to_grid(self, mapping):
"""Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""
return list(reversed([[mapping.get((x, y), None)
for x in range(self.cols)]
for y in range(self.rows)]))
def to_arrows(self, policy):
chars = {(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}
return self.to_grid({s: chars[a] for (s, a) in policy.items()})
# ______________________________________________________________________________
""" [Figure 17.1]
A 4x3 grid environment that presents the agent with a sequential decision problem.
"""
sequential_decision_environment = GridMDP([[-0.04, -0.04, -0.04, +1],
[-0.04, None, -0.04, -1],
[-0.04, -0.04, -0.04, -0.04]],
terminals=[(3, 2), (3, 1)],
gamma=.9)
# ______________________________________________________________________________
def value_iteration(mdp, epsilon=0.001):
"""Solving an MDP by value iteration. [Figure 17.4]"""
U1 = {s: 0 for s in mdp.states}
R, T, gamma = mdp.R, mdp.T, mdp.gamma
while True:
U = U1.copy()
delta = 0
for s in mdp.states:
U1[s] = R(s) + gamma * max(sum(p * U[s1] for (p, s1) in T(s, a))
for a in mdp.actions(s))
delta = max(delta, abs(U1[s] - U[s]))
if delta <= epsilon * (1 - gamma) / gamma:
return U
def best_policy(mdp, U):
"""Given an MDP and a utility function U, determine the best policy,
as a mapping from state to action. [Equation 17.4]"""
pi = {}
for s in mdp.states:
pi[s] = max(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
return pi
def expected_utility(a, s, U, mdp):
"""The expected utility of doing a in state s, according to the MDP and U."""
return sum(p * U[s1] for (p, s1) in mdp.T(s, a))
# ______________________________________________________________________________
def policy_iteration(mdp):
"""Solve an MDP by policy iteration [Figure 17.7]"""
U = {s: 0 for s in mdp.states}
pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}
while True:
U = policy_evaluation(pi, U, mdp)
unchanged = True
for s in mdp.states:
a = max(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
if a != pi[s]:
pi[s] = a
unchanged = False
if unchanged:
print('Computed utilities')
grid_U=mdp.to_grid(U)
for entry in grid_U:
print(entry)
print('\nOptimal policy: ')
return pi
def policy_evaluation(pi, U, mdp, k=20):
"""Return an updated utility mapping U from each state in the MDP to its
utility, using an approximation (modified policy iteration)."""
R, T, gamma = mdp.R, mdp.T, mdp.gamma
for i in range(k):
for s in mdp.states:
U[s] = R(s) + gamma * sum(p * U[s1] for (p, s1) in T(s, pi[s]))
return U
import pprint
pp = pprint.PrettyPrinter(indent=4)
pi = policy_iteration(sequential_decision_environment)
pp.pprint(pi)
...