一 、 tensorflow2.0 实现DQN算法
算法代码如下
import numpy
import tensorflow as tf
from tensorflow import keras
import copy
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import gym
def DQN_run(DQN_agent=None, episode=200):
DQN_agent = DQN_agent.DQN(n_actions=2, n_features=4, n_experience_pool=300)
DQN_agent.net_init()
score = []
env = gym.make('CartPole-v1')
for i_episode in range(episode):
observation = env.reset()
for t in range(1000):
env.render()
action = DQN_agent.choose_action(observation)
observation_, reward, done, info = env.step(action)
x, x_dot, theta, theta_dot = observation
r2 = - abs(theta) * 2
DQN_agent.experience_store(s=observation, a=action, r=reward + r2, s_=observation_, done=done)
DQN_agent.learn()
observation = observation_
if done:
print("Episode finished after {} time steps".format(t + 1))
if DQN_agent.experience_pool_is_full:
score.append(t + 1)
break
plt.plot(score, color='red')
plt.show()
print()
class DQN:
def __init__(self,
n_actions,
n_features,
epsilon=0.1,
batch_size=64,
learning_rate=0.001,
gamma=0.9,
replace_time=300,
n_experience_pool=300):
self.n_actions = n_actions
self.n_features = n_features
self.batch_size = batch_size
self.learning_rate = learning_rate
self.gamma = gamma
self.epsilon = epsilon
self.n_experience_pool = n_experience_pool
self.experience_pool = pd.DataFrame(np.zeros([self.n_experience_pool, self.n_features * 2 + 1 + 1 + 1]))
self.experience_pool_index = 0
self.experience_pool_is_full = False
self.q_pred = None
self.q_target = None
self.opt = tf.keras.optimizers.Adam(self.learning_rate)
self.replace_time = replace_time
self.now_learn_time = 0
def loss_f(self, y_true, y_pred):
return keras.losses.mse(y_true, y_pred)
def choose_action(self, s):
s = s.reshape(1, 4)
rand = np.random.rand(1)
if rand < self.epsilon:
self.epsilon = self.epsilon * 0.999999
return np.random.randint(0, self.n_actions)
else:
action_value = self.q_pred.predict(np.array(s))
return np.argmax(action_value)
def net_init(self):
input_features = tf.keras.Input(shape=(self.n_features,), name='input_features')
dense_0 = tf.keras.layers.Dense(32, activation='relu')(input_features)
dense_1 = tf.keras.layers.Dense(64, activation='relu')(dense_0)
dense_2 = tf.keras.layers.Dense(32, activation='relu')(dense_1)
out_put = tf.keras.layers.Dense(self.n_actions, name='prediction_q_pred')(dense_2)
self.q_pred = tf.keras.Model(inputs=input_features, outputs=out_put)
input_features_target = tf.keras.Input(shape=(self.n_features,), name='input_features')
dense_0_target = tf.keras.layers.Dense(32, activation='relu')(input_features_target)
dense_1_target = tf.keras.layers.Dense(64, activation='relu')(dense_0_target)
dense_2_target = tf.keras.layers.Dense(32, activation='relu')(dense_1_target)
out_put_target = tf.keras.layers.Dense(self.n_actions, name='prediction_q_target')(dense_2_target)
self.q_target = tf.keras.Model(inputs=input_features_target, outputs=out_put_target)
self.q_target.set_weights(self.q_pred.get_weights())
def experience_store(self, s, a, r, s_, done):
experience = []
for i in range(self.n_features * 2 + 2 + 1):
if i < self.n_features:
experience.append(s[i])
elif self.n_features <= i < self.n_features + 1:
experience.append(a)
elif self.n_features + 1 <= i < self.n_features + 2:
experience.append(r)
elif self.n_features + 2 <= i < self.n_features * 2 + 2:
experience.append(s_[i - self.n_features - 2])
else:
experience.append(done)
self.experience_pool.loc[self.experience_pool_index] = copy.deepcopy(experience)
self.experience_pool_index += 1
if self.experience_pool_index == self.n_experience_pool:
self.experience_pool_is_full = True
self.experience_pool_index = 0
def learn(self):
if not self.experience_pool_is_full:
return
data_pool = self.experience_pool.sample(self.batch_size)
s = np.array(data_pool.loc[:, 0:self.n_features - 1])
a = np.array(data_pool.loc[:, self.n_features], dtype=np.int32)
r = np.array(data_pool.loc[:, self.n_features + 1])
s_ = np.array(data_pool.loc[:, self.n_features + 2:self.n_features * 2 + 1])
done = np.array(data_pool.loc[:, self.n_features * 2 + 2])
with tf.GradientTape() as Tape:
y_pred = self.q_pred(s)
y_target = y_pred.numpy()
q_target = self.q_target(s_)
q_target = q_target.numpy()
index = np.arange(self.batch_size, dtype=np.int32)
y_target[index, a] = r + (1 - done) * self.gamma * np.max(q_target, axis=1)
loss_val = tf.keras.losses.mse(y_target, y_pred)
gradients = Tape.gradient(loss_val, self.q_pred.trainable_variables)
self.opt.apply_gradients(zip(gradients, self.q_pred.trainable_variables))
self.now_learn_time += 1
if self.now_learn_time == self.replace_time:
self.replace_param()
self.now_learn_time = 0
def replace_param(self):
print("replace the param")
self.q_target.set_weights(self.q_pred.get_weights())
主函数调用:
from RL_algorithm_package import DQN
if __name__ == '__main__':
DQN.DQN_run(DQN_agent=DQN)
二 、 DQN算法实现过程中一些注意事项
1.实现过程
①设计经验池,可以用pandas中的DataFrame来实现,经验池要存储s,a,r,done,s_五个元素,根据元素的维数设计经验池的大小。
self.experience_pool = pd.DataFrame(np.zeros([self.n_experience_pool, self.n_features * 2 + 1 + 1 + 1]))
同时设计经验池的存储函数,从上往下依次存储,等完全存储完成之后,再从头开始存储
def experience_store(self, s, a, r, s_, done):
experience = []
for i in range(self.n_features * 2 + 2 + 1):
if i < self.n_features:
experience.append(s[i])
elif self.n_features <= i < self.n_features + 1:
experience.append(a)
elif self.n_features + 1 <= i < self.n_features + 2:
experience.append(r)
elif self.n_features + 2 <= i < self.n_features * 2 + 2:
experience.append(s_[i - self.n_features - 2])
else:
experience.append(done)
self.experience_pool.loc[self.experience_pool_index] = copy.deepcopy(experience)
self.experience_pool_index += 1
if self.experience_pool_index == self.n_experience_pool:
self.experience_pool_is_full = True
self.experience_pool_index = 0
②深度神经网络建立
这里采用的tensorflow中的Keras函数式编程算法实现。
def net_init(self):
input_features = tf.keras.Input(shape=(self.n_features,), name='input_features')
dense_0 = tf.keras.layers.Dense(32, activation='relu')(input_features)
dense_1 = tf.keras.layers.Dense(64, activation='relu')(dense_0)
dense_2 = tf.keras.layers.Dense(32, activation='relu')(dense_1)
out_put = tf.keras.layers.Dense(self.n_actions, name='prediction_q_pred')(dense_2)
self.q_pred = tf.keras.Model(inputs=input_features, outputs=out_put)
input_features_target = tf.keras.Input(shape=(self.n_features,), name='input_features')
dense_0_target = tf.keras.layers.Dense(32, activation='relu')(input_features_target)
dense_1_target = tf.keras.layers.Dense(64, activation='relu')(dense_0_target)
dense_2_target = tf.keras.layers.Dense(32, activation='relu')(dense_1_target)
out_put_target = tf.keras.layers.Dense(self.n_actions, name='prediction_q_target')(dense_2_target)
self.q_target = tf.keras.Model(inputs=input_features_target, outputs=out_put_target)
self.q_target.set_weights(self.q_pred.get_weights())
这里要注意的就是参数提取以及设置的方式
self.q_target.set_weights(self.q_pred.get_weights())
③DQN参数更新
之前采用DataFrame的格式来存储经验,这里就显示出了其的好处,可以直接调用DataFrame中的sample()函数,来显示随机采样。以实现打散数据间关联性
data_pool = self.experience_pool.sample(self.batch_size)
随后再将这个采样后的经验池中的经验提取出来,注意,这里要转换成ndarray格式。
s = np.array(data_pool.loc[:, 0:self.n_features - 1])
a = np.array(data_pool.loc[:, self.n_features], dtype=np.int32)
r = np.array(data_pool.loc[:, self.n_features + 1])
s_ = np.array(data_pool.loc[:, self.n_features + 2:self.n_features * 2 + 1])
done = np.array(data_pool.loc[:, self.n_features * 2 + 2])
采用的是DataFrame的loc函数,该函数可以返回某行某列的元素内容,[:, 0:self.n_features - 1]意思是从所有的行中,返回0到self.n_features - 1列的内容,这里是包括后一个元素的。这里a要注意转成int格式,方便后续的操作。
梯度下降算法是固定的写法
with tf.GradientTape() as Tape:
y_pred = self.q_pred(s)
y_target = y_pred.numpy()
q_target = self.q_target(s_)
q_target = q_target.numpy()
index = np.arange(self.batch_size, dtype=np.int32)
y_target[index, a] = r + (1 - done) * self.gamma * np.max(q_target, axis=1)
loss_val = tf.keras.losses.mse(y_target, y_pred)
gradients = Tape.gradient(loss_val, self.q_pred.trainable_variables)
self.opt.apply_gradients(zip(gradients, self.q_pred.trainable_variables))
这里尤其要注意,只能对Q-target进行操作,不要对y_pred进行操作,否则会出现梯度为None的错误
y_target[index, a] = r + (1 - done) * self.gamma * np.max(q_target, axis=1)
就是对所有index的a列元素进行操作,这是一个批操作,可以简化代码量
值得注意的是,这里包含了DQN的更新公式,要对Q-pred网络的a(就是经验中存储的a)所在的位置更新,而对其他位置不更新。简单来说就是要进行如下步骤
1.将y-target复制Q-pred的输出y-pred
2.将y-target中a位置的元素,更换为更新公式计算出来的值,其余值不要动,仍与y-pred相同
3.进行mse求loss
④动作选择
将s输入到Q-pred网络中,进行输出,这里s要是tensor类型
⑤环境迭代
1.初始化环境,得到初始状态s
2.将s带入神经网络中的动作选择函数,得到动作a
3.将a带入到环境中,更新,得到s_,r,done(r也可以自己设计,不必使用环境得到的)
4.将s,a,r,done,s_存储到经验池中
5.若经验池满了,进行DQN学习(也可以隔多少次学习一次)
6.s = s_
7.执行2
2.效果图
效果图没保存,训练比较慢,并且不是很稳定。
3.算法原理
类似于Q-learning,将表格更改成了神经网络。其余的算法原理网上内容很多。
三 、 DQN的改进 - DDQN、Dueling DQN
1.DDQN
由于DQN存在Q值高估的问题,所以可以进行改进,将其改进成DDQN
只需要将算法中的
y_target[index, a] = r + (1 - done) * self.gamma * np.max(q_target, axis=1)
np.max(q_target, axis=1)改成不是来自于q_target,而是先从将s_带入到q_pred网络中,得到最大的a,再将s_带入到q_target网络中,选择a所对应的值,进行更新。
with tf.GradientTape() as Tape:
y_pred = self.q_pred(s)
y_target = y_pred.numpy()
q = self.q_pred(s_)
q = q.numpy()
arg_max_a = np.argmax(q,axis=1)
q_target = self.q_target(s_)
q_target = q_target.numpy()
index = np.arange(self.batch_size, dtype=np.int32)
y_target[index, a] = r + (1 - done) * self.gamma * q_target[index, arg_max_a]
loss_val = tf.keras.losses.mse(y_target, y_pred)
gradients = Tape.gradient(loss_val, self.q_pred.trainable_variables)
self.opt.apply_gradients(zip(gradients, self.q_pred.trainable_variables))
其余内容均与DQN相同,测试了一下,效果比DQN要好
2.Dueling DQN
竞争DQN网络,这个网络是把原本的Q网络进行分割,分割成价值网络和优势网络。
相比与DQN,只需要在网络架构上进行更改就行,其余内容不变
def net_init(self):
input_features = tf.keras.Input(shape=(self.n_features,), name='input_features')
dense_0 = tf.keras.layers.Dense(32, activation='relu')(input_features)
dense_1 = tf.keras.layers.Dense(64, activation='relu')(dense_0)
dense_2 = tf.keras.layers.Dense(32, activation='relu')(dense_1)
s_value = tf.keras.layers.Dense(1, name='prediction_s_value')(dense_2)
A_s_a_ori = tf.keras.layers.Dense(self.n_actions, name='prediction_A_s_a')(dense_2)
out_put = s_value + A_s_a_ori - tf.reduce_mean(A_s_a_ori, axis=1, keepdims=True)
self.q_pred = tf.keras.Model(inputs=input_features, outputs=out_put)
self.q_target = tf.keras.Model(inputs=input_features, outputs=out_put)
self.q_target.set_weights(self.q_pred.get_weights())
效果图
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)