DDPG tensorflow 2.0

2023-05-16

DDPG算法的tensorflow2.0实现
算法的详细解析可以看DDPG解析

import tensorflow as tf
import numpy as np
import pandas as pd
import gym
from matplotlib import pyplot as plt
import os

# 设置随机数种子
SEED = 65535

ENV = gym.make('Pendulum-v1')
# 环境的全局变量,不同环境可能会不同
action_dim = ENV.action_space.shape[0]
observation_dim = ENV.observation_space.shape[0]
action_span = ENV.action_space.high


class DDPG:
    def __init__(self,
                 n_features,
                 n_actions_dim,
                 gamma=0.9,
                 mode='train',
                 update_param_n=1,
                 actor_learning_rate=0.001,
                 critic_learning_rate=0.001,
                 soft_tau=0.1,
                 explore_span=3,
                 learning_rate=0.001,
                 experience_pool_size=1000,
                 batch_size=64,
                 ):
        # 随机数种子
        np.random.seed(SEED)
        tf.random.set_seed(SEED)
        # 强化学习超参数
        self.gamma = gamma
        self.explore_span = explore_span  # 探索的范围,越大越容易探索
        # 神经网络相关定义
        self.n_features = n_features  # 特征的维度
        self.n_actions_dim = n_actions_dim  # 动作的维度
        self.actor_learning_rate = actor_learning_rate  # 学习率,可以为不同的网络设置不同的优化函数,设置不同的学习率,这里并没有使用到这两个参数
        self.critic_learning_rate = critic_learning_rate
        self.learning_rate = learning_rate
        self.soft_tau = soft_tau  # 软更新比例,这个值要和新的值进行相乘
        self.learn_time = 0  # 学习次数
        self.update_param_n = update_param_n  # 学习n次更新一次target网络
        self.mode = mode  # 设置模式,可以为训练(train)或测试(test)
        # 建立四个网络,并且使其参数相同
        self.critic_pred_model = self.critic_init(critic_trainable=True, name='pred')
        self.critic_target_model = self.critic_init(critic_trainable=False, name='target')
        self.critic_param_replace()
        self.actor_pred_model = self.actor_init(actor_trainable=True, name='pred')
        self.actor_target_model = self.actor_init(actor_trainable=False, name='target')
        self.actor_param_replace()
        self.opt = tf.keras.optimizers.Adam(self.learning_rate)
        # 经验池相关参数
        self.experience_pool_size = experience_pool_size  # 经验池大小
        self.experience_length = self.n_features * 2 + self.n_actions_dim + 1 + 1  # 一条经验的长度
        self.experience_pool_is_full = False
        self.experience_pool_can_learn = False
        self.experience_pool = pd.DataFrame(np.zeros([self.experience_pool_size, self.experience_length]))  # 建立经验池
        self.experience_pool_index = 0  # 经验池的当前目录
        self.batch_size = batch_size  # 批大小

    def experience_pool_store(self, s, a, r, s_, done):
        """
        存储经验
        :param s: 状态
        :param a: 动作
        :param r: 回报
        :param s_: 下一个状态
        :param done: 是否完成游戏
        :return:
        """
        experience = []
        for i in range(self.experience_length):
            if i < self.n_features:
                experience.append(s[i])
            elif self.n_features <= i < self.n_features + self.n_actions_dim:
                experience.append(a[i - self.n_features])
            elif self.n_features + 1 <= i < self.n_features + self.n_actions_dim + 1:
                experience.append(r)
            elif self.n_features + 2 <= i < self.n_features * 2 + self.n_actions_dim + 1:
                experience.append(s_[i - self.n_features - self.n_actions_dim - 1])
            else:
                experience.append(done)
        self.experience_pool.loc[self.experience_pool_index] = experience
        self.experience_pool_index += 1
        # 判断能否开始训练,以及经验池是否已经满了
        if self.experience_pool_index >= self.batch_size:
            self.experience_pool_can_learn = True
        if self.experience_pool_index == self.experience_pool_size:
            self.experience_pool_is_full = True
            self.experience_pool_index = 0

    def critic_init(self, critic_trainable, name):
        """
        critic 网络定义,s,a 输入,Q(s,a)输出。这里与AC网络不同,AC网络输出的是V(s)
        :param name: 网络名称
        :param critic_trainable: 是否可以被训练,target网络是不能被训练的,设置False,预测网络设置为True
        :return: critic 网络模型
        """
        # 多输入网络的定义法
        input_s = tf.keras.Input(shape=(self.n_features,))
        input_a = tf.keras.Input(shape=(self.n_actions_dim,))
        inputs = tf.concat([input_s, input_a], axis=-1)
        dense1 = tf.keras.layers.Dense(32, activation='relu')(inputs)
        out_put = tf.keras.layers.Dense(1)(dense1)
        critic_model = tf.keras.Model(inputs=[input_s, input_a],
                                      outputs=out_put,
                                      trainable=critic_trainable,
                                      name='critic_' + name)
        return critic_model

    def actor_init(self, actor_trainable, name):
        """
        actor 网络定义,输入s,输出动作a
        :param name: 网络名称
        :param actor_trainable: 是否可以被训练,target网络是不能被训练的,设置False,预测网络设置为True
        :return: actor 网络模型
        """
        # 多输入网络的定义法
        input_s = tf.keras.Input(shape=(self.n_features,))
        dense1 = tf.keras.layers.Dense(32, activation='relu')(input_s)
        # 加入tanh的激活函数,映射到-1 1,需要再将其映射到动作空间上。
        out_put = tf.keras.layers.Dense(1, activation='tanh')(dense1)
        out_put = tf.keras.layers.Lambda(lambda x: x * np.array(action_span))(out_put)
        actor_model = tf.keras.Model(inputs=input_s, outputs=out_put, trainable=actor_trainable, name='actor_' + name)
        return actor_model

    def choose_action(self, s):
        s = s.reshape(1, self.n_features)
        a = self.actor_pred_model.predict(np.array(s))
        if self.mode == 'train':
            # 正式的测试中,可以去掉这个噪声 直接返回动作a[0]即可,这里是训练用,所以要加入噪声,使其能够充分的探索环境
            action = np.clip(np.random.normal(a[0], self.explore_span), -action_span, action_span)
            return action
        elif self.mode == 'test':
            return a[0]

    def DDPG_learn(self):
        """
        在这里进行两个网络的更新
        :return:
        """
        if not self.experience_pool_can_learn:
            return
        elif not self.experience_pool_is_full:
            data_pool = self.experience_pool.loc[:self.experience_pool_index - 1, :].sample(self.batch_size)
        else:
            data_pool = self.experience_pool.sample(self.batch_size)
        exp_s = np.array(data_pool.loc[:, :self.n_features - 1])
        exp_a = np.array(data_pool.loc[:, self.n_features - 1 + self.n_actions_dim]).reshape(self.batch_size, 1)
        exp_r = np.array(data_pool.loc[:, self.n_features - 1 + self.n_actions_dim + 1]).reshape(self.batch_size, 1)
        exp_s_ = np.array(
            data_pool.loc[:, self.n_features + self.n_actions_dim + 1:self.n_features * 2 + self.n_actions_dim])
        # done 没有用到
        exp_done = np.array(data_pool.loc[:, self.n_features * 2 + self.n_actions_dim + 1]).reshape(self.batch_size, 1)
        with tf.GradientTape() as Tape:
            # 首先更新actor 网络,注意loss是Q值
            a = self.actor_pred_model(exp_s)
            Q_pred = self.critic_pred_model([exp_s, a])
            loss_actor = - tf.reduce_mean(Q_pred)  # 负值,最大
            actor_gradients = Tape.gradient(loss_actor, self.actor_pred_model.trainable_variables)
            self.opt.apply_gradients(zip(actor_gradients, self.actor_pred_model.trainable_variables))
        with tf.GradientTape() as Tape:
            # critic网络更新
            a_ = self.actor_target_model(exp_s_)
            Q_pred_critic = self.critic_pred_model([exp_s, exp_a])
            Q_target_critic = exp_r + self.gamma * self.critic_target_model([exp_s_, a_])
            loss_critic = tf.keras.losses.mse(Q_target_critic, Q_pred_critic)
            critic_gradients = Tape.gradient(loss_critic, self.critic_pred_model.trainable_variables)
            self.opt.apply_gradients(zip(critic_gradients, self.critic_pred_model.trainable_variables))
        self.learn_time += 1
        # 参数更新,采用软更新的方式
        if self.learn_time == self.update_param_n:
            self.soft_param_update(self.critic_target_model, self.critic_pred_model)
            self.soft_param_update(self.actor_target_model, self.actor_pred_model)
            self.learn_time = 0

    def soft_param_update(self, target_model, pred_model):
        """
        采用软更新的方式进行参数的更新,不采用DQN中的直接赋值操作,也可以采用别的软更新方式来实现。
        :param pred_model: 预测网络
        :param target_model: 目标网络
        """
        param_target = target_model.get_weights()
        param_pred = pred_model.get_weights()
        for i in range(len(param_target)):
            param_target[i] = param_target[i] * (1 - self.soft_tau)
            param_pred[i] = param_pred[i] * self.soft_tau
        param = np.add(param_pred, param_target)
        target_model.set_weights(param)

    def critic_param_replace(self):
        """
        替换critic网络的参数
        """
        self.critic_target_model.set_weights(self.critic_pred_model.get_weights())

    def actor_param_replace(self):
        """
        替换actor网络的参数
        """
        self.actor_target_model.set_weights(self.actor_pred_model.get_weights())

    def save_model(self, episode):
        """
        save trained weights
        :return: None
        """
        if not os.path.exists('model'):
            os.makedirs('model')
        self.actor_pred_model.save(f'model/ddpg_actor_pred_model_{episode}_episode.h5')
        self.actor_target_model.save(f'model/ddpg_actor_target_model_{episode}_episode.h5')
        self.critic_pred_model.save(f'model/ddpg_critic_pred_model_{episode}_episode.h5')
        self.critic_target_model.save(f'model/ddpg_critic_target_model_{episode}_episode.h5')

    def load_model(self, episode):
        """
        load trained weights
        :return: None
        """
        self.actor_pred_model = tf.keras.models.load_model(f'model/ddpg_actor_pred_model_{episode}_episode.h5')
        self.actor_target_model = tf.keras.models.load_model(f'model/ddpg_actor_target_model_{episode}_episode.h5')
        self.critic_pred_model = tf.keras.models.load_model(f'model/ddpg_critic_pred_model_{episode}_episode.h5')
        self.critic_target_model = tf.keras.models.load_model(f'model/ddpg_critic_target_model_{episode}_episode.h5')


def DDPG_train(episode=300):
    DDPG_agent = DDPG(n_features=observation_dim,
                      n_actions_dim=action_dim,
                      batch_size=64,
                      mode='train',
                      experience_pool_size=640)
    ENV.seed(SEED)
    score = []
    if not os.path.exists('img'):
        os.makedirs('img')
    for i_episode in range(episode):
        # 初始化,
        observation = ENV.reset()
        score_one_episode = 0
        for t in range(500):
            # 刷新环境
            ENV.render()
            # 选择动作
            action = DDPG_agent.choose_action(observation)
            observation_, reward, done, info = ENV.step(action)
            # 存储经验
            DDPG_agent.experience_pool_store(s=observation, a=action, r=reward, s_=observation_, done=done)
            # 学习 流程与DQN相似
            DDPG_agent.DDPG_learn()
            observation = observation_
            score_one_episode += reward
            if done:
                score.append(score_one_episode)
                print(f"the game is finished,episode is {i_episode}, the score is {score_one_episode}")
                break
        if (i_episode + 1) % 100 == 0:
            plt.plot(score)  # 绘制波形
            DDPG_agent.explore_span = DDPG_agent.explore_span / 2
            # plt.draw()
            DDPG_agent.save_model(i_episode + 1)
            plt.savefig(
                f"img/DDPG_score_train_episode:{i_episode + 1}.png")


def DDPG_test(episode=300):
    DDPG_agent = DDPG(n_features=observation_dim,
                      n_actions_dim=action_dim,
                      batch_size=64,
                      mode='test',
                      experience_pool_size=640)
    DDPG_agent.load_model(episode=300)
    ENV.seed(SEED)
    score = []
    for i_episode in range(episode):
        # 初始化,
        observation = ENV.reset()
        score_one_episode = 0
        for t in range(500):
            # 刷新环境
            ENV.render()
            action = DDPG_agent.choose_action(observation)
            observation_, reward, done, info = ENV.step(action)
            observation = observation_
            score_one_episode += reward
            if done:
                score.append(score_one_episode)
                print(f"the game is finished,episode is {i_episode}, the score is {score_one_episode}")
                break
        if (i_episode + 1) % 100 == 0:
            plt.plot(score)  # 绘制波形
            # plt.draw()
            DDPG_agent.save_model(i_episode + 1)
            plt.savefig(
                f"img/DDPG_score_test:{i_episode + 1}.png")


if __name__ == '__main__':
    DDPG_train(episode=300)

训练结果如下:
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

DDPG tensorflow 2.0 的相关文章

随机推荐

  • 本科学完C语言、C++、python(学透点),还有必要学别的语言吗?

    原作者是一名高校的信息技术类的教师 xff0c 主教程序设计类课程 这样的问题 xff0c 作者的学生也会经常问他 本篇文章意于为各位大学生提供一些在编程上的疑惑 xff0c 希望能够对大家有帮助 作者 xff1a 悟空问答丨EXCEL进阶
  • C/C++编程笔记:C/C++中的strrchr()函数,到底该怎么用?

    在C 43 43 中 xff0c strrchr xff08 xff09 是用于字符串处理的预定义函数 cstring是字符串函数所需的头文件 此函数返回一个指针 xff0c 该指针指向字符串中最后一次出现的字符 我们想要找到的最后一个出现
  • C++编程书籍推荐:零基础入门书籍,学C++看它们就够了!

    如果你是一个没有编程经验的C 43 43 零基础小白 xff0c 或者有其它语言经验的C 43 43 初学者 xff0c 那么强烈推荐下面的十本零基础小白入门C 43 43 书籍 1 C 43 43 Primer 作者 xff1a Stan
  • 【ROS2 入门】虚拟机环境 ubuntu 18.04 ROS2 安装

    大家好 xff0c 我是虎哥 xff0c 从今天开始 xff0c 我将花一段时间 xff0c 开始将自己从ROS1切换到ROS2 xff0c 做为有别于ROS1的版本 xff0c 做了很多更新和改变 xff0c 我还是很期待自己逐步去探索R
  • 如何解压.gz的压缩文件

    如何解压 gz的压缩文件 gzip d xxx gz tar命令 root 64 linux tar cxtzjvfpPN 文件与目录 参数 xff1a c xff1a 建立一个压缩文件的参数指令 create 的意思 xff1b x xf
  • GPS经纬度坐标与XY坐标相互转换的python程序

    文章目录 前言一 说明二 函数1 import 和 常数2 GPS经纬度转XY坐标3 XY坐标转GPS经纬度 总结 前言 室外定位常用的是GPS xff0c 故编队队形 设定轨迹都是基于GPS经纬度坐标 而在仿真中我们通常会在XY坐标系下进
  • AD20 原理图设计流程

    Altium Designer 20 的原理图设计大致可以分为 9 个步骤 xff1a xff08 1 xff09 新建原理图 这是原理图设计的第一步 xff08 2 xff09 图纸设置 图纸设置就是要设置图纸的大小 xff0c 方向等信
  • JavaScript基础——DOM节点操作学习笔记

    目录 笔记 方法的使用 案例一 动态生成表格 案例二 下拉菜单 xff0c 鼠标经过和离开实现 案例全部代码 笔记 节点概述 1 网页中的任何内容都是节点 文字 标签 元素 文档等 节点至少有nodeType 节点类型 nodeName 节
  • MAVLINK包的校验方法

    这段时间做一个项目要进行MAVLINK的解包校验 xff0c 但有一个叫做 CRC EXTRA的位导致这个校验码怎么算结果都不对 xff0c 后来找了好久还是在github的论坛上看见别人讨论才找到方法的 1 先上从官网上拿的mavlink
  • 机器人工程专业课程

    1 机器人工程专业的课程主要有 xff1a 高级语言程序设计 电路分析 机械设计基础 模拟电路技术 数字电子技术 自动控制原理 微机原理及接口技术 电机与电气控制技术 单片机原理及其应用 机械制造基础 工业机器人控制系统 运动控制系统 工业
  • python获取当前执行py文件的绝对路径

    python获取当前执行py文件的绝对路径 python3 home appuser test py span class token comment 获取当前执行py文件的绝对路径 span py file path span class
  • 相机内参的标定方法

    简介 摄像机标定 Camera calibration 简单来说是从世界坐标系换到图像坐标系的过程 xff0c 也就是求最终的投影矩阵 PP 的过程 xff0c 下面相关的部分主要参考UIUC的计算机视觉的课件 xff08 网址Spring
  • python中的函数、类和对象、模块和包都是啥意思?

    python中的函数 类 对象 包都是啥意思 xff1f 1 函数 重复的事情不做两次 函数还是比较好理解的吧 xff0c 数学中就学到过函数 xff0c 就是用来解决某一些问题的过程 为啥要写函数 xff1f 首先是方便代码重用 xff0
  • E3ZG_D62传感器 STM32C8T6

    E3ZG D62传感器 在STM32C8T6的简单应用 该图便是E3ZG D62传感器的样子 第一个旋钮是灵敏度调节旋钮的 xff0c 第二个旋钮是改变模式 xff0c 在L时 xff0c 长灭 xff0c 检测到 xff0c 为亮 xff
  • Learning High-Speed Flight in the Wild 环境安装

    有许多问题可以去github项目内的issues查找一下 xff0c 里面有相当一部分问题的解决方案 也可参考论文学习 Learning High Speed Flight in the Wild 一 环境安装 论文程序github地址 x
  • AES加密算法

    密钥类型 AES 128 xff1a 128位比特 xff08 16字节 xff09 AES 192 xff1a 192位比特 xff08 24字节 xff09 AES 256 xff1a 256位比特 xff08 32字节 xff09 一
  • Ros noetic : XTDrone安装

    一 安装参考 安装过程绝大部分参考如下的文件语雀 xff1a 仿真平台基础配置 进行配置 二 出现的错误以及需要注意的问题 这里的配置如下 xff1a ROS noetic Ubuntu20 04 python3 8 2 1 依赖安装 在
  • DQN、DDQN、Dueling DQN tensorflow2.0

    一 tensorflow2 0 实现DQN算法 算法代码如下 span class token keyword import span numpy span class token keyword import span tensorflo
  • PG-REINFORCE tensorflow 2.0

    REINFORCE 算法实现 REINFORCE算法是策略梯度算法最原始的实现算法 xff0c 这里采用tensorflow2 0进行实现 span class token keyword import span tensorflow sp
  • DDPG tensorflow 2.0

    DDPG算法的tensorflow2 0实现 算法的详细解析可以看DDPG解析 span class token keyword import span tensorflow span class token keyword as span