0
点赞
收藏
分享

微信扫一扫

CS294(285) Actor Critic案例之路径工具类utils



CS294(285) Actor Critic案例之路径工具类utils

      在CS294(285) Actor Critic系列文章中,我们将跟着CS294 285的作业内容,一步一步的实现自己的演员-评论家算法。本篇实现路径工具类utils。




Actor Critic案例之路径工具类utils的几个主要方法:

sample_trajectory:抽样一条路径轨迹Path,限制每条路径步数,路径步数最大为max_path_length,每一步包括观测值、 观测图像、动作值、奖励、下一个观测值、终端状态值。




sample_trajectories:抽样多条路径,限制每条路径步数,每条路径的步数为pathLen;限制总步数,总步数求和小于每个批次的最小步数。 




sample_n_trajectories:抽样n条路径。限制每次的步数,但不限总步数。




Path:构建1条路径,Path包括路径中每一步的观测值、图像观测值、行动值、奖励、下一个观测值、终端状态。




convert_listofrollouts:分拆路径集,分别构建各个元素。


将路径集(多条路径)的每一条路径的各个元素取出来,重新按元素组合。


观测值:每一条路径中每一步观测值的合并。


动作值:每一条路径中每一步动作值的合并。


下一个观测值:每一条路径中每一步下一个观测值的合并。


终端值:每一条路径中每一步终端值的合并。


连锁奖励值:每一条路径中每一步奖励值的合并。


非连锁奖励值:各条路径奖励的列表。


import numpy as np	
import time	

	
############################################	
############################################	

	
def sample_trajectory(env, policy, max_path_length, render=False, render_mode=('rgb_array')):	

	
    # initialize env for the beginning of a new rollout	
    ob = env.reset() # HINT: should be the output of resetting the env	

	
    # init vars	
    obs, acs, rewards, next_obs, terminals, image_obs = [], [], [], [], [], []	
    steps = 0	
    while True:	

	
        # render image of the simulated env	
        if render:	
            if 'rgb_array' in render_mode:	
                if hasattr(env, 'sim'):	
                    if 'track' in env.env.model.camera_names:	
                        image_obs.append(env.sim.render(camera_name='track', height=500, width=500)[::-1])	
                    else:	
                        image_obs.append(env.sim.render(height=500, width=500)[::-1])	
                else:	
                    image_obs.append(env.render(mode=render_mode))	
            if 'human' in render_mode:	
                env.render(mode=render_mode)	
                time.sleep(env.model.opt.timestep)	

	
        # use the most recent ob to decide what to do	
        obs.append(ob)	
        ac = policy.get_action(ob) # HINT: query the policy's get_action function	
        ac = ac[0]	
        acs.append(ac)	

	
        # take that action and record results	
        ob, rew, done, _ = env.step(ac)	

	
        # record result of taking that action	
        steps += 1	
        next_obs.append(ob)	
        rewards.append(rew)	

	
        # End the rollout if the rollout ended 	
        # Note that the rollout can end due to done, or due to max_path_length	
        rollout_done = 1 if (done or (steps >= max_path_length)) else 0 # HINT: this is either 0 or 1	
        terminals.append(rollout_done)	
        	
        if rollout_done: 	
            break	

	
    return Path(obs, image_obs, acs, rewards, next_obs, terminals)	

	
def sample_trajectories(env, policy, min_timesteps_per_batch, max_path_length, render=False, render_mode=('rgb_array')):	
    """	
        Collect rollouts until we have collected min_timesteps_per_batch steps.	

	
        implement this function	
        Hint1: use sample_trajectory to get each path (i.e. rollout) that goes into paths	
        Hint2: use get_pathlength to count the timesteps collected in each path	
    """	
    timesteps_this_batch = 0	
    paths = []	
    	
    print("\n")	
    while timesteps_this_batch < min_timesteps_per_batch:	
        path = sample_trajectory(env, policy, max_path_length, render, render_mode)	
        paths.append(path)	
        pathLen = get_pathlength(path)	
        timesteps_this_batch += pathLen	
        print("Steps for batch   " + str(len(paths)) + "= " + str(pathLen) + "\n")	
    print("Num Rollouts   " + str(len(paths)) + "\n")	
    return paths, timesteps_this_batch	

	
def sample_n_trajectories(env, policy, ntraj, max_path_length, render=False, render_mode=('rgb_array')):	
    """	
        Collect ntraj rollouts.	

	
        implement this function	
        Hint1: use sample_trajectory to get each path (i.e. rollout) that goes into paths	
    """	
    paths = []	

	
    for _ in range(ntraj):	
        path = sample_trajectory(env, policy, max_path_length, render, render_mode)	
        paths.append(path)	

	
    return paths	

	
############################################	
############################################	

	
def Path(obs, image_obs, acs, rewards, next_obs, terminals):	
    """	
        Take info (separate arrays) from a single rollout	
        and return it in a single dictionary	
    """	
    if image_obs != []:	
        image_obs = np.stack(image_obs, axis=0)	
    return {"observation" : np.array(obs, dtype=np.float32),	
            "image_obs" : np.array(image_obs, dtype=np.uint8),	
            "reward" : np.array(rewards, dtype=np.float32),	
            "action" : np.array(acs, dtype=np.float32),	
            "next_observation": np.array(next_obs, dtype=np.float32),	
            "terminal": np.array(terminals, dtype=np.float32)}	

	

	
def convert_listofrollouts(paths):	
    """	
        Take a list of rollout dictionaries	
        and return separate arrays,	
        where each array is a concatenation of that array from across the rollouts	
    """	
    observations = np.concatenate([path["observation"] for path in paths])	
    actions = np.concatenate([path["action"] for path in paths])	
    next_observations = np.concatenate([path["next_observation"] for path in paths])	
    terminals = np.concatenate([path["terminal"] for path in paths])	
    concatenated_rewards = np.concatenate([path["reward"] for path in paths])	
    unconcatenated_rewards = [path["reward"] for path in paths]	
    return observations, actions, next_observations, terminals, concatenated_rewards, unconcatenated_rewards	

	
############################################	
############################################	

	
def get_pathlength(path):	
    return len(path["reward"])


CS294(285) Actor Critic案例之路径工具类utils_Network

举报

相关推荐

0 条评论