2

I have a mismatch in shapes between inputs and the model of my reinforcement learning project.

I have been closely following the AWS examples, specifically the cartpole example. However I have built my own custom environment. What I am struggling to understand is how to change my environment so that it is able to work with the prebuilt Ray RLEstimator.

Here is the code for the environment:

from enum import Enum
import math

import gym
from gym import error, spaces, utils, wrappers
from gym.utils import seeding
from gym.envs.registration import register
from gym.spaces import Discrete, Box


import numpy as np

# from float_space import FloatSpace


def sigmoid_price_fun(x, maxcust, gamma):
    return maxcust / (1 + math.exp(gamma * max(0, x)))


class Actions(Enum):
    DECREASE_PRICE = 0
    INCREASE_PRICE = 1
    HOLD = 2


PRICE_ADJUSTMENT = {
    Actions.DECREASE_PRICE: -0.25,
    Actions.INCREASE_PRICE: 0.25,
    Actions.HOLD: 0
}


class ArrivalSim(gym.Env):
    """ Simple environment for price optimising RL learner. """


    def __init__(self, price):
        """
        Parameters
        ----------
        price : float
            The initial price to use.
        """
        super().__init__()
        self.price = price
        self.revenue = 0
        self.action_space = Discrete(3)  # [0, 1, 2]  #increase or decrease
        self.observation_space = Box(np.array(0.0),np.array(1000))
#         self.observation_space = FloatSpace(price)

    def step(self, action):
        """ Enacts the specified action in the environment.

        Returns the new price, reward, whether we're finished and an empty dict for compatibility with Gym's
        interface. """

        self._take_action(Actions(action))
        next_state = self.price
#         next_state = self.observation_space.sample()
        reward = self._get_reward()
        done = False

        if next_state < 0 or reward == 0:
            done = True

        print(next_state, reward, done, {})

        return np.array(next_state), reward, done, {}

    def reset(self):
        """ Resets the environment, selecting a random initial price. Returns the price. """

#         self.observation_space.value = np.random.rand()
#         return self.observation_space.sample()
        self.price = np.random.rand()
        return self.price

    def _take_action(self, action):
#         self.observation_space.value += PRICE_ADJUSTMENT[action]
        self.price += PRICE_ADJUSTMENT[action]

    def _get_reward(self,price):
#         price = self.observation_space.value
#         return max(np.random.poisson(sigmoid_price_fun(price, 50, 0.5)) * price, 0)
        self.revenue = max(np.random.poisson(sigmoid_price_fun(self.price, 50, 0.5)) * self.price, 0)
        return max(np.random.poisson(sigmoid_price_fun(self.price, 50, 0.5)) * self.price, 0)


#     def render(self, mode='human'):
#         super().render(mode)

def testEnv():
    register(
        id='ArrivalSim-v0',
        entry_point='env:ArrivalSim',
        kwargs= {'price' : 40}
    )
    env = gym.make('ArrivalSim-v0')

    env.reset()
    for _ in range(20):
        test = env.action_space.sample()
        print(test)
        print(env.observation_space)
        env.step(test)  # take a random action
    env.close()



if __name__ =='__main__':

    testEnv()

Here is the training script

import json
import os

import gym
import ray
from ray.tune import run_experiments
from ray.tune.registry import register_env
from gym.envs.registration import register

from sagemaker_rl.ray_launcher import SageMakerRayLauncher


def create_environment(env_config):
    import gym
#     from gym.spaces import Space
    from gym.envs.registration import register

    # This import must happen inside the method so that worker processes import this code
    register(
        id='ArrivalSim-v0',
        entry_point='env:ArrivalSim',
        kwargs= {'price' : 40}
    )
    return gym.make('ArrivalSim-v0')



class MyLauncher(SageMakerRayLauncher):

    def register_env_creator(self):
        register_env("ArrivalSim-v0", create_environment)

    def get_experiment_config(self):
        return {
          "training": {
            "env": "ArrivalSim-v0",
            "run": "PPO",
            "stop": {
              "episode_reward_mean": 5000,
            },
            "config": {
              "gamma": 0.995,
              "kl_coeff": 1.0,
              "num_sgd_iter": 10,
              "lr": 0.0001,
              "sgd_minibatch_size": 32768,
              "train_batch_size": 320000,
              "monitor": False,  # Record videos.
              "model": {
                "free_log_std": False
              },
              "use_gae": False,
              "num_workers": (self.num_cpus-1),
              "num_gpus": self.num_gpus,
              "batch_mode": "complete_episodes"

            }
          }
        }

if __name__ == "__main__":
    MyLauncher().train_main()

Here is the code I run in Jupyter:

metric_definitions = RLEstimator.default_metric_definitions(RLToolkit.RAY)
environment = env = {
    'SAGEMAKER_REQUIREMENTS': 'requirements.txt', # path relative to `source_dir` below.
}

estimator = RLEstimator(entry_point="train.py",
                        source_dir='.',
                        toolkit=RLToolkit.RAY,
                        toolkit_version='0.6.5',
                        framework=RLFramework.TENSORFLOW,
                        dependencies=["sagemaker_rl"],
#                         image_name='price-response-ray-cpu',
                        role=role,
#                         train_instance_type="ml.c5.2xlarge",
                        train_instance_type='local',
                        train_instance_count=1,
#                         output_path=s3_output_path,
#                         base_job_name=job_name_prefix,
                        metric_definitions=metric_definitions
#                         hyperparameters={
                          # Attention scientists!  You can override any Ray algorithm parameter here:
                          #"rl.training.config.horizon": 5000,
                          #"rl.training.config.num_sgd_iter": 10,
                        #}
                    )

estimator.fit(wait=True)
job_name = estimator.latest_training_job.job_name
print("Training job: %s" % job_name)

The error message I have been receiving has been the following:

algo-1-dxwxx_1  | == Status ==
algo-1-dxwxx_1  | Using FIFO scheduling algorithm.
algo-1-dxwxx_1  | Resources requested: 0/3 CPUs, 0/0 GPUs
algo-1-dxwxx_1  | Memory usage on this node: 1.1/4.1 GB
algo-1-dxwxx_1  | 
algo-1-dxwxx_1  | == Status ==
algo-1-dxwxx_1  | Using FIFO scheduling algorithm.
algo-1-dxwxx_1  | Resources requested: 2/3 CPUs, 0/0 GPUs
algo-1-dxwxx_1  | Memory usage on this node: 1.4/4.1 GB
algo-1-dxwxx_1  | Result logdir: /opt/ml/output/intermediate/training
algo-1-dxwxx_1  | Number of trials: 1 ({'RUNNING': 1})
algo-1-dxwxx_1  | RUNNING trials:
algo-1-dxwxx_1  |  - PPO_ArrivalSim-v0_0:   RUNNING
algo-1-dxwxx_1  | 
algo-1-dxwxx_1  | (pid=72) 2019-08-30 09:35:13,030  WARNING ppo.py:172 -- FYI: By default, the value function will not share layers with the policy model ('vf_share_layers': False).
algo-1-dxwxx_1  | 2019-08-30 09:35:13,063   ERROR trial_runner.py:460 -- Error processing event.
algo-1-dxwxx_1  | Traceback (most recent call last):
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/tune/trial_runner.py", line 409, in _process_trial
algo-1-dxwxx_1  |     result = self.trial_executor.fetch_result(trial)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/tune/ray_trial_executor.py", line 314, in fetch_result
algo-1-dxwxx_1  |     result = ray.get(trial_future[0])
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/worker.py", line 2316, in get
algo-1-dxwxx_1  |     raise value
algo-1-dxwxx_1  | ray.exceptions.RayTaskError: ray_worker (pid=72, host=b9b15d495b68)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/model.py", line 83, in __init__
algo-1-dxwxx_1  |     restored, num_outputs, options)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/model.py", line 135, in _build_layers_v2
algo-1-dxwxx_1  |     raise NotImplementedError
algo-1-dxwxx_1  | NotImplementedError
algo-1-dxwxx_1  | 
algo-1-dxwxx_1  | During handling of the above exception, another exception occurred:
algo-1-dxwxx_1  | 
algo-1-dxwxx_1  | ray_worker (pid=72, host=b9b15d495b68)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/agent.py", line 276, in __init__
algo-1-dxwxx_1  |     Trainable.__init__(self, config, logger_creator)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/tune/trainable.py", line 88, in __init__
algo-1-dxwxx_1  |     self._setup(copy.deepcopy(self.config))
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/agent.py", line 373, in _setup
algo-1-dxwxx_1  |     self._init()
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/ppo/ppo.py", line 77, in _init
algo-1-dxwxx_1  |     self.env_creator, self._policy_graph)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/agent.py", line 506, in make_local_evaluator
algo-1-dxwxx_1  |     extra_config or {}))
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/agent.py", line 714, in _make_evaluator
algo-1-dxwxx_1  |     async_remote_worker_envs=config["async_remote_worker_envs"])
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/evaluation/policy_evaluator.py", line 288, in __init__
algo-1-dxwxx_1  |     self._build_policy_map(policy_dict, policy_config)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/evaluation/policy_evaluator.py", line 661, in _build_policy_map
algo-1-dxwxx_1  |     policy_map[name] = cls(obs_space, act_space, merged_conf)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/ppo/ppo_policy_graph.py", line 176, in __init__
algo-1-dxwxx_1  |     seq_lens=existing_seq_lens)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/catalog.py", line 215, in get_model
algo-1-dxwxx_1  |     seq_lens)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/catalog.py", line 255, in _get_model
algo-1-dxwxx_1  |     num_outputs, options)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/model.py", line 86, in __init__
algo-1-dxwxx_1  |     input_dict["obs"], num_outputs, options)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/fcnet.py", line 37, in _build_layers
algo-1-dxwxx_1  |     scope=label)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/contrib/framework/python/ops/arg_scope.py", line 182, in func_with_args
algo-1-dxwxx_1  |     return func(*args, **current_args)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/contrib/layers/python/layers/layers.py", line 1854, in fully_connected
algo-1-dxwxx_1  |     outputs = layer.apply(inputs)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/base_layer.py", line 817, in apply
algo-1-dxwxx_1  |     return self.__call__(inputs, *args, **kwargs)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/layers/base.py", line 374, in __call__
algo-1-dxwxx_1  |     outputs = super(Layer, self).__call__(inputs, *args, **kwargs)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/base_layer.py", line 730, in __call__
algo-1-dxwxx_1  |     self._assert_input_compatibility(inputs)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/base_layer.py", line 1493, in _assert_input_compatibility
algo-1-dxwxx_1  |     str(x.shape.as_list()))
algo-1-dxwxx_1  | ValueError: Input 0 of layer default/fc1 is incompatible with the layer: : expected min_ndim=2, found ndim=1. Full shape received: [None]
algo-1-dxwxx_1  | 
algo-1-dxwxx_1  | 2019-08-30 09:35:13,064   INFO ray_trial_executor.py:178 -- Destroying actor for trial PPO_ArrivalSim-v0_0. If your trainable is slow to initialize, consider setting reuse_actors=True to reduce actor creation overheads.
algo-1-dxwxx_1  | 2019-08-30 09:35:13,076   INFO trial_runner.py:497 -- Attempting to recover trial state from last checkpoint.
algo-1-dxwxx_1  | (pid=72) 2019-08-30 09:35:13,041  INFO policy_evaluator.py:278 -- Creating policy evaluation worker 0 on CPU (please ignore any CUDA init errors)

I am not sure how to change the input the environment gives to the model or the models setup itself. It seems the documentations are quite obscure. I have a hunch that problem lies with the observation and action spaces

Here is the reference to the original aws project example: https://github.com/awslabs/amazon-sagemaker-examples/tree/master/reinforcement_learning/rl_roboschool_ray

4

1 回答 1

1

Possible reason:

The error message:

ValueError: Input 0 of layer default/fc1 is incompatible with the layer: : expected min_ndim=2, found ndim=1. Full shape received: [None]

Your original environment obs space is self.observation_space = Box(np.array(0.0),np.array(1000)).

Displaying the shape of your environment obs space gives:

print(Box(np.array(0.0), np.array(1000), dtype=np.float32).shape) = ()

This could be indicated by Full shape received: [None] in the error message.

If you pass the shape (1,1) into np.zeros, you get the expected min_ndim=2:

x = np.zeros((1, 1)) print(x) [[0.]] print(x.ndim) 2

Suggested solution:

I assume that you want your environment obs space to range from 0.0 to 1000.0 as indicated by the self.price = np.random.rand() in your reset function.

Try using the following for your environment obs space:

self.observation_space = Box(0.0, 1000.0, shape=(1,1), dtype=np.float32)

I hope that by setting the Box with an explicit shape helps.

EDIT (20190903):

I have modified your training script. This modification includes new imports, custom model class, model registration & addition of registered custom model to config. For readability, only sections added are shown below. The entire modified training script is available in this gist. Please run with the proposed obs space as describe above.

New additional imports:

# new imports
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork

from ray.rllib.utils import try_import_tf
from ray.tune import grid_search

tf = try_import_tf()
# end new imports

Custom model class:

# Custom model class (fcnet)
class CustomModel(TFModelV2):
    """Example of a custom model that just delegates to a fc-net."""

    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name):
        super(CustomModel, self).__init__(obs_space, action_space, num_outputs,
                                          model_config, name)
        self.model = FullyConnectedNetwork(obs_space, action_space,
                                           num_outputs, model_config, name)
        self.register_variables(self.model.variables())

    def forward(self, input_dict, state, seq_lens):
        return self.model.forward(input_dict, state, seq_lens)

    def value_function(self):
        return self.model.value_function()

Registered & add custom model:

    def get_experiment_config(self):


        # Register custom model
        ModelCatalog.register_custom_model("my_model", CustomModel)


        return {
          "training": {
            "env": "ArrivalSim-v0",
            "run": "PPO",
            "stop": {
              "episode_reward_mean": 5000,
            },
            "config": {


              "model": {"custom_model": "my_model"}, # Add registered custom model


              "gamma": 0.995,
              "kl_coeff": 1.0,
              "num_sgd_iter": 10,
              "lr": 0.0001,
              "sgd_minibatch_size": 32768,
              "train_batch_size": 320000,
              "monitor": False,  # Record videos.
              "model": {
                "free_log_std": False
              },
              "use_gae": False,
              "num_workers": (self.num_cpus-1),
              "num_gpus": self.num_gpus,
              "batch_mode": "complete_episodes"
            }
          }
        }

EDIT 2 (20190910):

To show that it works, truncated output from Sagemaker (Jupyter notebook instance):

.
.
.
algo-1-y2ayw_1  | price b = 0.439261780930142
algo-1-y2ayw_1  | price a = 0.439261780930142
algo-1-y2ayw_1  | (self.price).shape = (1,)
algo-1-y2ayw_1  | [0.43926178] 10.103020961393266 False {}
algo-1-y2ayw_1  | price b = 0.439261780930142
algo-1-y2ayw_1  | price a = 0.439261780930142
algo-1-y2ayw_1  | (self.price).shape = (1,)
algo-1-y2ayw_1  | [0.43926178] 9.663759180463124 False {}
algo-1-y2ayw_1  | price b = 0.439261780930142
algo-1-y2ayw_1  | price a = 0.189261780930142
algo-1-y2ayw_1  | (self.price).shape = (1,)
algo-1-y2ayw_1  | [0.18926178] 5.67785342790426 False {}
algo-1-y2ayw_1  | price b = 0.189261780930142
algo-1-y2ayw_1  | price a = -0.06073821906985799
algo-1-y2ayw_1  | (self.price).shape = (1,)
algo-1-y2ayw_1  | [-0.06073822] 0 True {}
algo-1-y2ayw_1  | Result for PPO_ArrivalSim-v0_0:
algo-1-y2ayw_1  |   date: 2019-09-10_11-51-13
algo-1-y2ayw_1  |   done: true
algo-1-y2ayw_1  |   episode_len_mean: 126.72727272727273
algo-1-y2ayw_1  |   episode_reward_max: 15772.677709596366
algo-1-y2ayw_1  |   episode_reward_mean: 2964.4609668691965
algo-1-y2ayw_1  |   episode_reward_min: 0.0
algo-1-y2ayw_1  |   episodes: 5
algo-1-y2ayw_1  |   experiment_id: 5d3b9f2988854a0db164a2e5e9a7550f
algo-1-y2ayw_1  |   hostname: 2dae585dcc65
algo-1-y2ayw_1  |   info:
algo-1-y2ayw_1  |     cur_lr: 4.999999873689376e-05
algo-1-y2ayw_1  |     entropy: 1.0670874118804932
algo-1-y2ayw_1  |     grad_time_ms: 1195.066
algo-1-y2ayw_1  |     kl: 3.391784191131592
algo-1-y2ayw_1  |     load_time_ms: 44.725
algo-1-y2ayw_1  |     num_steps_sampled: 463
algo-1-y2ayw_1  |     num_steps_trained: 463
algo-1-y2ayw_1  |     policy_loss: -0.05383850634098053
algo-1-y2ayw_1  |     sample_time_ms: 621.282
algo-1-y2ayw_1  |     total_loss: 2194493.5
algo-1-y2ayw_1  |     update_time_ms: 145.352
algo-1-y2ayw_1  |     vf_explained_var: -5.519390106201172e-05
algo-1-y2ayw_1  |     vf_loss: 2194492.5
algo-1-y2ayw_1  |   iterations_since_restore: 2
algo-1-y2ayw_1  |   node_ip: 172.18.0.2
algo-1-y2ayw_1  |   pid: 77
algo-1-y2ayw_1  |   policy_reward_mean: {}
algo-1-y2ayw_1  |   time_since_restore: 4.55129861831665
algo-1-y2ayw_1  |   time_this_iter_s: 1.3484764099121094
algo-1-y2ayw_1  |   time_total_s: 4.55129861831665
algo-1-y2ayw_1  |   timestamp: 1568116273
algo-1-y2ayw_1  |   timesteps_since_restore: 463
algo-1-y2ayw_1  |   timesteps_this_iter: 234
algo-1-y2ayw_1  |   timesteps_total: 463
algo-1-y2ayw_1  |   training_iteration: 2
algo-1-y2ayw_1  |
algo-1-y2ayw_1  | A worker died or was killed while executing task 00000000781a7b5b94a203683f8f789e593abbb1.
algo-1-y2ayw_1  | A worker died or was killed while executing task 00000000d3507bc6b41ee1c9fc36292eeae69557.
algo-1-y2ayw_1  | == Status ==
algo-1-y2ayw_1  | Using FIFO scheduling algorithm.
algo-1-y2ayw_1  | Resources requested: 0/3 CPUs, 0/0 GPUs
algo-1-y2ayw_1  | Result logdir: /opt/ml/output/intermediate/training
algo-1-y2ayw_1  | TERMINATED trials:
algo-1-y2ayw_1  |  - PPO_ArrivalSim-v0_0:   TERMINATED [pid=77], 4 s, 2 iter, 463 ts, 2.96e+03 rew
algo-1-y2ayw_1  |
algo-1-y2ayw_1  | Saved model configuration.
algo-1-y2ayw_1  | Saved the checkpoint file /opt/ml/output/intermediate/training/PPO_ArrivalSim-v0_0_2019-09-10_11-50-53vd32vlux/checkpoint-2.extra_data as /opt/ml/model/checkpoint.extra_data
algo-1-y2ayw_1  | Saved the checkpoint file /opt/ml/output/intermediate/training/PPO_ArrivalSim-v0_0_2019-09-10_11-50-53vd32vlux/checkpoint-2.tune_metadata as /opt/ml/model/checkpoint.tune_metadata
algo-1-y2ayw_1  | Created LogSyncer for /root/ray_results/PPO_ArrivalSim-v0_2019-09-10_11-51-13xdn_5i34 -> None
algo-1-y2ayw_1  | 2019-09-10 11:51:13.941718: I tensorflow/core/common_runtime/process_util.cc:71] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
algo-1-y2ayw_1  | reset -> (self.price).shape =  (1,)
algo-1-y2ayw_1  | LocalMultiGPUOptimizer devices ['/cpu:0']
algo-1-y2ayw_1  | reset -> (self.price).shape =  (1,)
algo-1-y2ayw_1  | INFO:tensorflow:No assets to save.
algo-1-y2ayw_1  | No assets to save.
algo-1-y2ayw_1  | INFO:tensorflow:No assets to write.
algo-1-y2ayw_1  | No assets to write.
algo-1-y2ayw_1  | INFO:tensorflow:SavedModel written to: /opt/ml/model/1/saved_model.pb
algo-1-y2ayw_1  | SavedModel written to: /opt/ml/model/1/saved_model.pb
algo-1-y2ayw_1  | Saved TensorFlow serving model!
algo-1-y2ayw_1  | A worker died or was killed while executing task 00000000f352d985b807ca399460941fe2264899.

algo-1-y2ayw_1  | 2019-09-10 11:51:20,075 sagemaker-containers INFO

 Reporting training SUCCESS

tmpwwb4b358_algo-1-y2ayw_1 exited with code 0

Aborting on container exit...
Failed to delete: /tmp/tmpwwb4b358/algo-1-y2ayw Please remove it manually.

===== Job Complete =====

This time I make edits in all 3 files. Your environment, training script & the Jupyter notebook but it turns out that there isn't a need to define custom models for your custom environment. However, that remains viable. And you're right, the main cause of the issue is still in the obs space.

I set self.price to be a 1D numpy array to make it talk better with Ray RLlib. The creation of the custom environment in the training script was done in a simpler way as shown below. As for the notebook, I used version 0.5.3 instead of 0.6.5 for toolkit_version & the training is done in local mode (in the docker container on the Sagemaker Jupyter notebook instance, still on AWS) with CPU only. However, it will also work with any ML instance (e.g ml.m4.xlarge) with GPU.

The entire package along with all dependencies is in here.

The edited env:

# new
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
# end new


from enum import Enum
import math

import gym
from gym import error, spaces, utils, wrappers
from gym.utils import seeding
from gym.envs.registration import register
from gym.spaces import Discrete, Box

import numpy as np


def sigmoid_price_fun(x, maxcust, gamma):
    return maxcust / (1 + math.exp(gamma * max(0, x)))


class Actions(Enum):
    DECREASE_PRICE = 0
    INCREASE_PRICE = 1
    HOLD = 2


PRICE_ADJUSTMENT = {
    Actions.DECREASE_PRICE: -0.25,
    Actions.INCREASE_PRICE: 0.25,
    Actions.HOLD: 0
}


class ArrivalSim(gym.Env):
    """ Simple environment for price optimising RL learner. """

    def __init__(self, price):
        """
        Parameters
        ----------
        price : float
            The initial price to use.
        """
        super().__init__()

        self.price = price
        self.revenue = 0
        self.action_space = Discrete(3)  # [0, 1, 2]  #increase or decrease
        # original obs space:
        #self.observation_space = Box(0.0, 1000.0, shape=(1,1), dtype=np.float32)
        # obs space initially suggested:
        #self.observation_space = Box(0.0, 1000.0, shape=(1,1), dtype=np.float32)
        # obs space suggested in this edit:
        self.observation_space = spaces.Box(np.array([0.0]), np.array([1000.0]), dtype=np.float32)

    def step(self, action):
        """ Enacts the specified action in the environment.

        Returns the new price, reward, whether we're finished and an empty dict for compatibility with Gym's
        interface. """

        self._take_action(Actions(action))

        next_state = self.price
        print('(self.price).shape =', (self.price).shape)
        #next_state = self.observation_space.sample()

        reward = self._get_reward()
        done = False

        if next_state < 0 or reward == 0:
            done = True

        print(next_state, reward, done, {})

        return np.array(next_state), reward, done, {}

    def reset(self):
        """ Resets the environment, selecting a random initial price. Returns the price. """
        #self.observation_space.value = np.random.rand()
        #return self.observation_space.sample()

        self.price = np.random.rand(1)

        print('reset -> (self.price).shape = ', (self.price).shape)

        return self.price

    def _take_action(self, action):
#         self.observation_space.value += PRICE_ADJUSTMENT[action]
        #print('price b =', self.price)
        print('price b =', self.price[0])
        #print('price b =', self.price[[0]])
        #self.price += PRICE_ADJUSTMENT[action]
        self.price[0] += PRICE_ADJUSTMENT[action]
        #self.price[[0]] += PRICE_ADJUSTMENT[action]
        #print('price a =', self.price)
        print('price a =', self.price[0])
        #print('price a =', self.price[[0]])

    #def _get_reward(self, price):
    def _get_reward(self):
#         price = self.observation_space.value
#         return max(np.random.poisson(sigmoid_price_fun(price, 50, 0.5)) * price, 0)
        #self.revenue = max(np.random.poisson(sigmoid_price_fun(self.price, 50, 0.5)) * self.price, 0)
        #return max(np.random.poisson(sigmoid_price_fun(self.price, 50, 0.5)) * self.price, 0)
        self.revenue = max(np.random.poisson(sigmoid_price_fun(self.price[0], 50, 0.5)) * self.price[0], 0)
        return max(np.random.poisson(sigmoid_price_fun(self.price[0], 50, 0.5)) * self.price[0], 0)

#     def render(self, mode='human'):
#         super().render(mode)

def testEnv():
    """
    register(
        id='ArrivalSim-v0',
        entry_point='env:ArrivalSim',
        kwargs= {'price' : 40.0}
    )
    env = gym.make('ArrivalSim-v0')
    """
    env = ArrivalSim(30.0)

    val = env.reset()
    print('val.shape = ', val.shape)

    for _ in range(5):
        print('env.observation_space =', env.observation_space)
        act = env.action_space.sample()
        print('\nact =', act)
        next_state, reward, done, _ = env.step(act)  # take a random action
        print('next_state = ', next_state)
    env.close()



if __name__ =='__main__':

    testEnv()

The edited training script:

import json
import os

import gym
import ray
from ray.tune import run_experiments
import ray.rllib.agents.a3c as a3c
import ray.rllib.agents.ppo as ppo
from ray.tune.registry import register_env
from mod_op_env import ArrivalSim

from sagemaker_rl.ray_launcher import SageMakerRayLauncher

"""
def create_environment(env_config):
    import gym
#     from gym.spaces import Space
    from gym.envs.registration import register

    # This import must happen inside the method so that worker processes import this code
    register(
        id='ArrivalSim-v0',
        entry_point='env:ArrivalSim',
        kwargs= {'price' : 40}
    )
    return gym.make('ArrivalSim-v0')
"""
def create_environment(env_config):
    price = 30.0
    # This import must happen inside the method so that worker processes import this code
    from mod_op_env import ArrivalSim
    return ArrivalSim(price)


class MyLauncher(SageMakerRayLauncher):
    def __init__(self):        
        super(MyLauncher, self).__init__()
        self.num_gpus = int(os.environ.get("SM_NUM_GPUS", 0))
        self.hosts_info = json.loads(os.environ.get("SM_RESOURCE_CONFIG"))["hosts"]
        self.num_total_gpus = self.num_gpus * len(self.hosts_info)

    def register_env_creator(self):
        register_env("ArrivalSim-v0", create_environment)

    def get_experiment_config(self):
        return {
          "training": {
            "env": "ArrivalSim-v0",
            "run": "PPO",
            "stop": {
              "training_iteration": 3,
            },

            "local_dir": "/opt/ml/model/",
            "checkpoint_freq" : 3,

            "config": {                                
              #"num_workers": max(self.num_total_gpus-1, 1),
              "num_workers": max(self.num_cpus-1, 1),
              #"use_gpu_for_workers": False,
              "train_batch_size": 128, #5,
              "sample_batch_size": 32, #1,
              "gpu_fraction": 0.3,
              "optimizer": {
                "grads_per_step": 10
              },
            },
            #"trial_resources": {"cpu": 1, "gpu": 0, "extra_gpu": max(self.num_total_gpus-1, 1), "extra_cpu": 0},
            #"trial_resources": {"cpu": 1, "gpu": 0, "extra_gpu": max(self.num_total_gpus-1, 0),
            #                    "extra_cpu": max(self.num_cpus-1, 1)},
            "trial_resources": {"cpu": 1,
                                "extra_cpu": max(self.num_cpus-1, 1)},              
          }
        }

if __name__ == "__main__":
    os.environ["LC_ALL"] = "C.UTF-8"
    os.environ["LANG"] = "C.UTF-8"
    os.environ["RAY_USE_XRAY"] = "1"
    print(ppo.DEFAULT_CONFIG)
    MyLauncher().train_main()

The notebook code:

!/bin/bash ./setup.sh

from time import gmtime, strftime
import sagemaker 
role = sagemaker.get_execution_role()

sage_session = sagemaker.session.Session()
s3_bucket = sage_session.default_bucket()  
s3_output_path = 's3://{}/'.format(s3_bucket)
print("S3 bucket path: {}".format(s3_output_path))

job_name_prefix = 'ArrivalSim'

from sagemaker.rl import RLEstimator, RLToolkit, RLFramework

estimator = RLEstimator(entry_point="mod_op_train.py", # Our launcher code
                        source_dir='src', # Directory where the supporting files are at. All of this will be
                                          # copied into the container.
                        dependencies=["common/sagemaker_rl"], # some other utils files.
                        toolkit=RLToolkit.RAY, # We want to run using the Ray toolkit against the ray container image.
                        framework=RLFramework.TENSORFLOW, # The code is in tensorflow backend.
                        toolkit_version='0.5.3', # Toolkit version. This will also choose an apporpriate tf version.                                               
                        #toolkit_version='0.6.5', # Toolkit version. This will also choose an apporpriate tf version.                        
                        role=role, # The IAM role that we created at the begining.
                        #train_instance_type="ml.m4.xlarge", # Since we want to run fast, lets run on GPUs.
                        train_instance_type="local", # Since we want to run fast, lets run on GPUs.
                        train_instance_count=1, # Single instance will also work, but running distributed makes things 
                                                # fast, particularly in the case of multiple rollout training.
                        output_path=s3_output_path, # The path where we can expect our trained model.
                        base_job_name=job_name_prefix, # This is the name we setup above to be to track our job.
                        hyperparameters = {      # Some hyperparameters for Ray toolkit to operate.
                          "s3_bucket": s3_bucket,
                          "rl.training.stop.training_iteration": 2, # Number of iterations.
                          "rl.training.checkpoint_freq": 2,
                        },
                        #metric_definitions=metric_definitions, # This will bring all the logs out into the notebook.
                    )

estimator.fit()
于 2019-09-02T21:41:06.780 回答