Sarsa算法
from random import random
from gym import Env
import gym
from gridworld import *
class Agent():
def __init__(self,env: Env):
self.env = env #个体持有环境的引用
self.Q = {} #个体维护一张行为价值表
self.state = None #个体当前的观测
def performPolicy(self, state): #执行一个策略
pass
def act(self,a): #执行一个行为
return self.env.step(a)
def learning(self): #学习过程
pass
def _get_state_name(self,state): #将观测状态转换为一个字典的键
return str(state)
def _is_state_in_Q(self, s): #判断s的Q值是否存在
return self.Q.get(s) is not None
def _init_state_value(self, s_name, randomized = True): #初始化某状态的Q值
if not self._is_state_in_Q(s_name):
self.Q[s_name] = {}
for action in range(self.env.action_space.n):
default_v = random() / 10 if randomized is True else 0.0
self.Q[s_name][action] = default_v
def _assert_state_in_Q(self, s, randomized=True): #确保某状态Q的值存在
if not self._is_state_in_Q(s):
self._init_state_value(s, randomized)
def _get_Q(self, s, a): #获取Q(s,a)
self._assert_state_in_Q(s, randomized=True)
return self.Q[s][a]
def _set_Q(self, s, a, value): #设置Q(s,a)
self._assert_state_in_Q(s, randomized=True)
self.Q[s][a] = value
def _performPolicy(self, s, episode_num, use_epsilon):
epsilon = 1.00 / (episode_num+1)
Q_s = self.Q[s]
str_act = 'unknown'
rand_value = random()
action = None
if use_epsilon and rand_value < epsilon:
action = self.env.action_space.sample()
else:
str_act = max(Q_s, key=Q_s.get)
action = int(str_act)
return action
def _learning(self, gamma, alpha, max_episode_num):
total_time, time_in_episode, num_episode = 0,0,0
while num_episode < max_episode_num: #设置终止条件
self.state = self.env.reset() #环境初始化
s0 = self._get_state_name(self.state) #获取个体对于观测的命名
self._assert_state_in_Q(s0, randomized=True)
self.env.render() #显示UI界面
a0 = self._performPolicy(s0, num_episode, use_epsilon=True)
time_in_episode = 0
is_done = False
while not is_done: #针对一个Eposide
s1,r1,is_done,info = self.act(a0) #执行策略行为
self.env.render() #更新UI界面
s1 = self._get_state_name(s1) #获取个体对于新状态的命名
self._assert_state_in_Q(s1, randomized=True)
a1 = self._performPolicy(s1, num_episode, use_epsilon=True)
old_q = self._get_Q(s0, a0)
q_prime = self._get_Q(s1, a1)
td_target = r1 + gamma*q_prime
td_error = td_target - old_q
new_q = old_q + alpha*td_error
self._set_Q(s0, a0, new_q)
if num_episode == max_episode_num:
print("t:{0:>2}: s:{1}, a:{2:2}, s1:{3}".format(time_in_episode,s0,a0,s1))
s0,a0 = s1,a1
time_in_episode += 1
print("Eposide {0} takes {1} steps.".format(num_episode, time_in_episode))
total_time += time_in_episode
num_episode += 1
return
def main():
env = SimpleGridWorld()
agent = Agent(env)
print("learning...")
agent._learning(gamma=0.9, alpha=0.1, max_episode_num=800)
return
if __name__ == '__main__':
main()
格子世界类
import math
import gym
from gym import spaces
from gym.utils import seeding
import numpy as np
class Grid(object):
def __init__(self, x: int = None,
y: int = None,
type: int = 0,
reward: int = 0.0,
value: float = 0.0): # value, for possible future usage
self.x = x # coordinate x
self.y = y
self.type = value # Type (0:empty;1:obstacle or boundary)
self.reward = reward # instant reward for an agent entering this grid cell
self.value = value # the value of this grid cell, for future usage
self.name = None # name of this grid.
self._update_name()
def _update_name(self):
self.name = "X{0}-Y{1}".format(self.x, self.y)
def __str__(self):
return "name:{4}, x:{0}, y:{1}, type:{2}, value{3}".format(self.x,
self.y,
self.type,
self.reward,
self.value,
self.name
)
class GridMatrix(object):
'''格子矩阵,通过不同的设置,模拟不同的格子世界环境
'''
def __init__(self, n_width: int, # defines the number of cells horizontally
n_height: int, # vertically
default_type: int = 0, # default cell type
default_reward: float = 0.0, # default instant reward
default_value: float = 0.0 # default value
):
self.grids = None
self.n_height = n_height
self.n_width = n_width
self.len = n_width * n_height
self.default_reward = default_reward
self.default_value = default_value
self.default_type = default_type
self.reset()
def reset(self):
self.grids = []
for x in range(self.n_height):
for y in range(self.n_width):
self.grids.append(Grid(x,
y,
self.default_type,
self.default_reward,
self.default_value))
def get_grid(self, x, y=None):
'''get a grid information
args: represented by x,y or just a tuple type of x
return: grid object
'''
xx, yy = None, None
if isinstance(x, int):
xx, yy = x, y
elif isinstance(x, tuple):
xx, yy = x[0], x[1]
assert (xx >= 0 and yy >= 0 and xx < self.n_width and yy < self.n_height), \
"coordinates should be in reasonable range"
index = yy * self.n_width + xx
return self.grids[index]
def set_reward(self, x, y, reward):
grid = self.get_grid(x, y)
if grid is not None:
grid.reward = reward
else:
raise ("grid doesn't exist")
def set_value(self, x, y, value):
grid = self.get_grid(x, y)
if grid is not None:
grid.value = value
else:
raise ("grid doesn't exist")
def set_type(self, x, y, type):
grid = self.get_grid(x, y)
if grid is not None:
grid.type = type
else:
raise ("grid doesn't exist")
def get_reward(self, x, y):
grid = self.get_grid(x, y)
if grid is None:
return None
return grid.reward
def get_value(self, x, y):
grid = self.get_grid(x, y)
if grid is None:
return None
return grid.value
def get_type(self, x, y):
grid = self.get_grid(x, y)
if grid is None:
return None
return grid.type
class GridWorldEnv(gym.Env):
'''格子世界环境,可以模拟各种不同的格子世界
'''
metadata = {
'render.modes': ['human', 'rgb_array'],
'video.frames_per_second': 30
}
def __init__(self, n_width: int = 10,
n_height: int = 7,
u_size=40,
default_reward: float = 0,
default_type=0,
windy=False):
self.u_size = u_size # size for each cell (pixels)
self.n_width = n_width # width of the env calculated by number of cells.
self.n_height = n_height # height...
self.width = u_size * n_width # scenario width (pixels)
self.height = u_size * n_height # height
self.default_reward = default_reward
self.default_type = default_type
self._adjust_size()
self.grids = GridMatrix(n_width=self.n_width,
n_height=self.n_height,
default_reward=self.default_reward,
default_type=self.default_type,
default_value=0.0)
self.reward = 0 # for rendering
self.action = None # for rendering
self.windy = windy # whether this is a windy environment
# 0,1,2,3,4 represent left, right, up, down, -, five moves.
self.action_space = spaces.Discrete(4)
# 观察空间由low和high决定
self.observation_space = spaces.Discrete(self.n_height * self.n_width)
# 坐标原点为左下角,这个pyglet是一致的, left-bottom corner is the position of (0,0)
# 通过设置起始点、终止点以及特殊奖励和类型的格子可以构建各种不同类型的格子世界环境
# 比如:随机行走、汽车租赁、悬崖行走等David Silver公开课中的示例
self.ends = [(7, 3)] # 终止格子坐标,可以有多个, goal cells position list
self.start = (0, 3) # 起始格子坐标,只有一个, start cell position, only one start position
self.types = [] # 特殊种类的格子在此设置。[(3,2,1)]表示(3,2)处值为1.
# special type of cells, (x,y,z) represents in position(x,y) the cell type is z
self.rewards = [] # 特殊奖励的格子在此设置,终止格子奖励0, special reward for a cell
self.refresh_setting()
self.viewer = None # 图形接口对象
self.seed() # 产生一个随机子
self.reset()
def _adjust_size(self):
'''调整场景尺寸适合最大宽度、高度不超过800
'''
pass
def seed(self, seed=None):
# 产生一个随机化时需要的种子,同时返回一个np_random对象,支持后续的随机化生成操作
self.np_random, seed = seeding.np_random(seed)
return [seed]
def step(self, action):
assert self.action_space.contains(action), \
"%r (%s) invalid" % (action, type(action))
self.action = action # action for rendering
old_x, old_y = self._state_to_xy(self.state)
new_x, new_y = old_x, old_y
# wind effect:
# 有风效果,其数字表示个体离开(而不是进入)该格子时朝向别的方向会被吹偏离的格子数
# this effect is just used for the windy env in David Silver's youtube video.
if self.windy:
if new_x in [3, 4, 5, 8]:
new_y += 1
elif new_x in [6, 7]:
new_y += 2
if action == 0:
new_x -= 1 # left
elif action == 1:
new_x += 1 # right
elif action == 2:
new_y += 1 # up
elif action == 3:
new_y -= 1 # down
elif action == 4:
new_x, new_y = new_x - 1, new_y - 1
elif action == 5:
new_x, new_y = new_x + 1, new_y - 1
elif action == 6:
new_x, new_y = new_x + 1, new_y - 1
elif action == 7:
new_x, new_y = new_x + 1, new_y + 1
# boundary effect
if new_x < 0: new_x = 0
if new_x >= self.n_width: new_x = self.n_width - 1
if new_y < 0: new_y = 0
if new_y >= self.n_height: new_y = self.n_height - 1
# wall effect, obstacles or boundary.
# 类型为1的格子为障碍格子,不可进入
if self.grids.get_type(new_x, new_y) == 1:
new_x, new_y = old_x, old_y
self.reward = self.grids.get_reward(new_x, new_y)
done = self._is_end_state(new_x, new_y)
self.state = self._xy_to_state(new_x, new_y)
# 提供格子世界所有的信息在info内
info = {"x": new_x, "y": new_y, "grids": self.grids}
return self.state, self.reward, done, info
# 将状态变为横纵坐标, set status into an one-axis coordinate value
def _state_to_xy(self, s):
x = s % self.n_width
y = int((s - x) / self.n_width)
return x, y
def _xy_to_state(self, x, y=None):
if isinstance(x, int):
assert (isinstance(y, int)), "incomplete Position info"
return x + self.n_width * y
elif isinstance(x, tuple):
return x[0] + self.n_width * x[1]
return -1 # 未知状态, unknow status
def refresh_setting(self):
'''用户在使用该类创建格子世界后可能会修改格子世界某些格子类型或奖励值
的设置,修改设置后通过调用该方法使得设置生效。
'''
for x, y, r in self.rewards:
self.grids.set_reward(x, y, r)
for x, y, t in self.types:
self.grids.set_type(x, y, t)
def reset(self):
self.state = self._xy_to_state(self.start)
return self.state
# 判断是否是终止状态
def _is_end_state(self, x, y=None):
if y is not None:
xx, yy = x, y
elif isinstance(x, int):
xx, yy = self._state_to_xy(x)
else:
assert (isinstance(x, tuple)), "incomplete coordinate values"
xx, yy = x[0], x[1]
for end in self.ends:
if xx == end[0] and yy == end[1]:
return True
return False
# 图形化界面, Graphic UI
def render(self, mode='human', close=False):
if close:
if self.viewer is not None:
self.viewer.close()
self.viewer = None
return
zero = (0, 0)
u_size = self.u_size
m = 2 # gaps between two cells
# 如果还没有设定屏幕对象,则初始化整个屏幕具备的元素。
if self.viewer is None:
from gym.envs.classic_control import rendering
self.viewer = rendering.Viewer(self.width, self.height)
# 在Viewer里绘制一个几何图像的步骤如下:
# the following steps just tells how to render an shape in the environment.
# 1. 建立该对象需要的数据本身
# 2. 使用rendering提供的方法返回一个geom对象
# 3. 对geom对象进行一些对象颜色、线宽、线型、变换属性的设置(有些对象提供一些个
# 性化的方法来设置属性,具体请参考继承自这些Geom的对象),这其中有一个重要的
# 属性就是变换属性,
# 该属性负责对对象在屏幕中的位置、渲染、缩放进行渲染。如果某对象
# 在呈现时可能发生上述变化,则应建立关于该对象的变换属性。该属性是一个
# Transform对象,而一个Transform对象,包括translate、rotate和scale
# 三个属性,每个属性都由以np.array对象描述的矩阵决定。
# 4. 将新建立的geom对象添加至viewer的绘制对象列表里,如果在屏幕上只出现一次,
# 将其加入到add_onegeom()列表中,如果需要多次渲染,则将其加入add_geom()
# 5. 在渲染整个viewer之前,对有需要的geom的参数进行修改,修改主要基于该对象
# 的Transform对象
# 6. 调用Viewer的render()方法进行绘制
''' 绘制水平竖直格子线,由于设置了格子之间的间隙,可不用此段代码
for i in range(self.n_width+1):
line = rendering.Line(start = (i*u_size, 0),
end =(i*u_size, u_size*self.n_height))
line.set_color(0.5,0,0)
self.viewer.add_geom(line)
for i in range(self.n_height):
line = rendering.Line(start = (0, i*u_size),
end = (u_size*self.n_width, i*u_size))
line.set_color(0,0,1)
self.viewer.add_geom(line)
'''
# 绘制格子, draw cells
for x in range(self.n_width):
for y in range(self.n_height):
v = [(x * u_size + m, y * u_size + m),
((x + 1) * u_size - m, y * u_size + m),
((x + 1) * u_size - m, (y + 1) * u_size - m),
(x * u_size + m, (y + 1) * u_size - m)]
rect = rendering.FilledPolygon(v)
r = self.grids.get_reward(x, y) / 10
if r < 0:
rect.set_color(0.9 - r, 0.9 + r, 0.9 + r)
elif r > 0:
rect.set_color(0.3, 0.5 + r, 0.3)
else:
rect.set_color(0.9, 0.9, 0.9)
self.viewer.add_geom(rect)
# 绘制边框, draw frameworks
v_outline = [(x * u_size + m, y * u_size + m),
((x + 1) * u_size - m, y * u_size + m),
((x + 1) * u_size - m, (y + 1) * u_size - m),
(x * u_size + m, (y + 1) * u_size - m)]
outline = rendering.make_polygon(v_outline, False)
outline.set_linewidth(3)
if self._is_end_state(x, y):
# 给终点方格添加金黄色边框, give end state cell a yellow outline.
outline.set_color(0.9, 0.9, 0)
self.viewer.add_geom(outline)
if self.start[0] == x and self.start[1] == y:
outline.set_color(0.5, 0.5, 0.8)
self.viewer.add_geom(outline)
if self.grids.get_type(x, y) == 1: # 障碍格子用深灰色表示, obstacle cells are with gray color
rect.set_color(0.3, 0.3, 0.3)
else:
pass
# 绘制个体, draw agent
self.agent = rendering.make_circle(u_size / 4, 30, True)
self.agent.set_color(1.0, 1.0, 0.0)
self.viewer.add_geom(self.agent)
self.agent_trans = rendering.Transform()
self.agent.add_attr(self.agent_trans)
# 更新个体位置 update position of an agent
x, y = self._state_to_xy(self.state)
self.agent_trans.set_translation((x + 0.5) * u_size, (y + 0.5) * u_size)
return self.viewer.render(return_rgb_array=mode == 'rgb_array')
def LargeGridWorld():
'''10*10的一个格子世界环境,设置参照:
http://cs.stanford.edu/people/karpathy/reinforcejs/gridworld_td.html
'''
env = GridWorldEnv(n_width=10,
n_height=10,
u_size=40,
default_reward=0,
default_type=0,
windy=False)
env.start = (0, 9)
env.ends = [(5, 4)]
env.types = [(4, 2, 1), (4, 3, 1), (4, 4, 1), (4, 5, 1), (4, 6, 1), (4, 7, 1),
(1, 7, 1), (2, 7, 1), (3, 7, 1), (4, 7, 1), (6, 7, 1), (7, 7, 1),
(8, 7, 1)]
env.rewards = [(3, 2, -1), (3, 6, -1), (5, 2, -1), (6, 2, -1), (8, 3, -1),
(8, 4, -1), (5, 4, 1), (6, 4, -1), (5, 5, -1), (6, 5, -1)]
env.refresh_setting()
return env
def SimpleGridWorld():
'''无风10*7的格子,设置参照: David Silver强化学习公开课视频 第3讲
'''
env = GridWorldEnv(n_width=10,
n_height=7,
u_size=60,
default_reward=-1,
default_type=0,
windy=False)
env.start = (0, 3)
env.ends = [(7, 3)]
env.rewards = [(7, 3, 1)]
env.refresh_setting()
return env
def WindyGridWorld():
'''有风10*7的格子,设置参照: David Silver强化学习公开课视频 第5讲
'''
env = GridWorldEnv(n_width=10,
n_height=7,
u_size=60,
default_reward=-1,
default_type=0,
windy=True)
env.start = (0, 3)
env.ends = [(7, 3)]
env.rewards = [(7, 3, 1)]
env.refresh_setting()
return env
def RandomWalk():
'''随机行走示例环境
'''
env = GridWorldEnv(n_width=7,
n_height=1,
u_size=80,
default_reward=0,
default_type=0,
windy=False)
env.action_space = spaces.Discrete(2) # left or right
env.start = (3, 0)
env.ends = [(6, 0), (0, 0)]
env.rewards = [(6, 0, 1)]
env.refresh_setting()
return env
def CliffWalk():
'''悬崖行走格子世界环境
'''
env = GridWorldEnv(n_width=12,
n_height=4,
u_size=60,
default_reward=-1,
default_type=0,
windy=False)
env.action_space = spaces.Discrete(4) # left or right
env.start = (0, 0)
env.ends = [(11, 0)]
# env.rewards=[]
# env.types = [(5,1,1),(5,2,1)]
for i in range(10):
env.rewards.append((i + 1, 0, -100))
env.ends.append((i + 1, 0))
env.refresh_setting()
return env
def SkullAndTreasure():
'''骷髅与钱币示例,解释随机策略的有效性 David Silver 强化学习公开课第六讲 策略梯度
Examples of Skull and Money explained the necessity and effectiveness
'''
env = GridWorldEnv(n_width=5,
n_height=2,
u_size=60,
default_reward=-1,
default_type=0,
windy=False)
env.action_space = spaces.Discrete(4) # left or right
env.start = (0, 1)
env.ends = [(2, 0)]
env.rewards = [(0, 0, -100), (2, 0, 100), (4, 0, -100)]
env.types = [(1, 0, 1), (3, 0, 1)]
env.refresh_setting()
return env
if __name__ == "__main__":
env = GridWorldEnv()
print("hello")
env.reset()
nfs = env.observation_space
nfa = env.action_space
print("nfs:%s; nfa:%s" % (nfs, nfa))
print(env.observation_space)
print(env.action_space)
print(env.state)
env.render()
# x = input("press any key to exit")
for _ in range(20000):
env.render()
a = env.action_space.sample()
state, reward, isdone, info = env.step(a)
print("{0}, {1}, {2}, {3}".format(a, reward, isdone, info))
print("env closed")