13.2 Deep Reinforcement Learning
import the libraries
from collections import deque
import numpy as np
np.random.seed(123)
print("NumPy:{}".format(np.__version__))
import tensorflow as tf
tf.set_random_seed(123)
print("TensorFlow:{}".format(tf.__version__))
import keras
print("Keras:{}".format(keras.__version__))
import gym
print('OpenAI Gym:',gym.__version__)
NumPy:1.13.3
TensorFlow:1.4.0
Keras:2.0.9
OpenAI Gym: 0.9.4
Functions for discretizing the observation values
def discretize(val,bounds,n_states):
discrete_val = 0
if val <= bounds[0]:
discrete_val = 0
elif val >= bounds[1]:
discrete_val = n_states-1
else:
discrete_val = int(round( (n_states-1) *
((val-bounds[0])/
(bounds[1]-bounds[0]))
))
return discrete_val
def discretize_state(vals,s_bounds,n_s):
discrete_vals = []
for i in range(len(n_s)):
discrete_vals.append(discretize(vals[i],s_bounds[i],n_s[i]))
return np.array(discrete_vals,dtype=np.int)
initialize the polecart environment
env = gym.make('CartPole-v0')
n_a = env.action_space.n
n_s = np.array([10,10,10,10])
s_bounds = np.array(list(zip(env.observation_space.low, env.observation_space.high)))
s_bounds[1] = (-1.0,1.0)
s_bounds[3] = (-1.0,1.0)
Q Table based algorithm
def policy_q_table(state, env):
if np.random.random() < explore_rate:
action = env.action_space.sample()
else:
action = np.argmax(q_table[tuple(state)])
return action
def episode(env, policy, r_max=0, t_max=0):
obs = env.reset()
state_prev = discretize_state(obs,s_bounds,n_s)
episode_reward = 0
done = False
t = 0
while not done:
action = policy(state_prev, env)
obs, reward, done, info = env.step(action)
state_new = discretize_state(obs,s_bounds,n_s)
best_q = np.amax(q_table[tuple(state_new)])
bellman_q = reward + discount_rate * best_q
indices = tuple(np.append(state_prev,action))
q_table[indices] += learning_rate*( bellman_q - q_table[indices])
state_prev = state_new
episode_reward += reward
if r_max > 0 and episode_reward > r_max:
break
t+=1
if t_max > 0 and t == t_max:
break
return episode_reward
def experiment(env, policy, n_episodes,r_max=0, t_max=0):
rewards=np.empty(shape=[n_episodes])
for i in range(n_episodes):
val = episode(env, policy, r_max, t_max)
rewards[i]=val
print('Policy:{}, Min reward:{}, Max reward:{}, Average reward:{}'
.format(policy.__name__,
np.min(rewards),
np.max(rewards),
np.mean(rewards)))
q_table = np.zeros(shape = np.append(n_s,n_a))
learning_rate = 0.8
discount_rate = 0.9
explore_rate = 0.2
n_episodes = 1000
experiment(env, policy_q_table, n_episodes)
Policy:policy_q_table, Min reward:8.0, Max reward:180.0, Average reward:17.592
Q Network based algorithm
tf.reset_default_graph()
keras.backend.clear_session()
def policy_q_nn(obs, env):
if np.random.random() < explore_rate:
action = env.action_space.sample()
else:
action = np.argmax(q_nn.predict(np.array([obs])))
return action
def episode(env, policy, r_max=0, t_max=0):
obs = env.reset()
state_prev = discretize_state(obs,s_bounds,n_s)
episode_reward = 0
done = False
t = 0
while not done:
action = policy(state_prev, env)
obs, reward, done, info = env.step(action)
state_next = discretize_state(obs,s_bounds,n_s)
memory.append([state_prev,action,reward,state_next,done])
states = np.array([x[0] for x in memory])
states_next = np.array([np.zeros(4) if x[4] else x[3] for x in memory])
q_values = q_nn.predict(states)
q_values_next = q_nn.predict(states_next)
for i in range(len(memory)):
state_prev,action,reward,state_next,done = memory[i]
if done:
q_values[i,action] = reward
else:
best_q = np.amax(q_values_next[i])
bellman_q = reward + discount_rate * best_q
q_values[i,action] = bellman_q
q_nn.fit(states,q_values,epochs=1,batch_size=50,verbose=0)
state_prev = state_next
episode_reward += reward
if r_max > 0 and episode_reward > r_max:
break
t+=1
if t_max > 0 and t == t_max:
break
return episode_reward
def experiment(env, policy, n_episodes,r_max=0, t_max=0):
rewards=np.empty(shape=[n_episodes])
for i in range(n_episodes):
val = episode(env, policy, r_max, t_max)
rewards[i]=val
print('Policy:{}, Min reward:{}, Max reward:{}, Average reward:{}'
.format(policy.__name__,
np.min(rewards),
np.max(rewards),
np.mean(rewards)))
memory = deque(maxlen=1000)
from keras.models import Sequential
from keras.layers import Dense
model = Sequential()
model.add(Dense(8,input_dim=4, activation='relu'))
model.add(Dense(2, activation='linear'))
model.compile(loss='mse',optimizer='adam')
model.summary()
q_nn = model
learning_rate = 0.8
discount_rate = 0.9
explore_rate = 0.2
n_episodes = 100
experiment(env, policy_q_nn, n_episodes)
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
dense_1 (Dense) (None, 8) 40
_________________________________________________________________
dense_2 (Dense) (None, 2) 18
=================================================================
Total params: 58
Trainable params: 58
Non-trainable params: 0
_________________________________________________________________
Policy:policy_q_nn, Min reward:8.0, Max reward:150.0, Average reward:41.27