【Spring Boot 2.0学习之旅-13】消息队列介绍和SpringBoot2.x整合RocketMQ

2023-05-16

RocketMQ消息中间件

一、RocketMQ简介

1.RocketMQ整体介绍

  • RocketMQ是一款分布式、队列模式的消息中间件;
  • 官网地址:https://rocketmq.apache.org/

2.RocketMQ的好处

  • 支持集群模型、负载均衡、水平扩展能力;
  • 亿级别的消息堆积能力;
  • 采用零拷贝的原理、顺序写盘、随机读;
  • 丰富的API使用;
  • 代码优秀,底层通信框架采用Netty NIO框架;
  • NameServer代替ZZookeeper;
  • 强调集群无单点,可扩展,任意一点高可用,水平可扩展;
  • 消息失败重试机制,消息可查询;
  • 开源社区活跃、成熟度(经过双十一考验);

3.RocketMQ相关概念

  • Producer:消费生产者,负责产生消息,一般由业务系统负责产生消息。

  • Consumer:消费消费者,负责消费消息,一般是后台系统负责异步消费。

  • Push Consumer:Consumer的一种,需要向Consumer对象注册监听。

  • Pull Consumer:Consumer的一种,需要主动请求Broker拉去消息。

  • Producer Group:生产者集合,一般用于发送一类消息。

  • Consumer Group:消费者集合,一般用于接受一类消息进行消费。

  • Broker:MQ消息服务(中转角色,用于消息存储与生产)

4.RocketMQ常见模块

  • rocketmq-broker:主要的业务逻辑,消息收发,主从同步,pagecache;
  • rocketmq-client:客户端接口,比如生产者和消费者;
  • rocketmq-example:示例,比如生产者和消费者;
  • rocketmq-common:公用数据结构等等;
  • rocketmq-distribution:编译模块,编译输出等;
  • rocketmq-filter:进行Broker过滤的不感兴趣的消息传输,减小带宽压力;
  • rocketmq-logappender、rockermq-logging:日志相关;
  • rocketmq-namesrv Namesrc服务:用于服务协调;
  • rocketmq-openmessging:对外提供 服务;
  • rocketmq-remoting:远程调用 接口,封装Netty底层通信;
  • rocketmq-srvutil:提供一些公用的工具方法,比如解析命令行参数;

二、RocketMQ安装

官网地址:http://rocketmq.apache.org/

学习资源:
1)http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/
2)https://www.jianshu.com/p/453c6e7ff81c

在这里插入图片描述

1.RocketMQ本地快速安装部署

官网文档

下载最新的二进制版本,并将zip文件解压缩到本地磁盘中,如:D:\rocketmq

(1)添加环境变量

  • 在桌面上,右键单击“计算机”图标。
  • 从上下文菜单中选择属性。
  • 单击高级系统设置链接。
  • 单击环境变量。
  • 然后添加或更改环境变量。
ROCKETMQ_HOME="D:\rocketmq"
NAMESRV_ADDR="localhost:9876"

或者在打开的 powershell 中,键入所需的环境变量。

$Env:ROCKETMQ_HOME="D:\rocketmq"
$Env:NAMESRV_ADDR="localhost:9876"

如果你选择powershell方式。您应该为每个新打开的 powershell 窗口执行此操作。

(2)启动nameserver

设置正确的环境变量后,打开新的 powershell 窗口。然后将目录更改为 RocketMQ 类型并运行:

.\bin\mqnamesrv.cmd

在这里插入图片描述

(3)启动broker

设置正确的环境变量后,打开新的 powershell 窗口。然后将目录更改为 RocketMQ 类型并运行:

.\bin\mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

在这里插入图片描述

(4)发送和接收消息

发送消息

设置正确的环境变量后,打开新的 powershell 窗口。然后将目录更改为 RocketMQ 类型并运行:

.\bin\tools.cmd  org.apache.rocketmq.example.quickstart.Producer

接收消息

然后您将看到生成的消息。现在我们可以尝试消费者消息。

设置正确的环境变量后,打开新的 powershell 窗口。然后将目录更改为 RocketMQ 类型并运行

.\bin\tools.cmd  org.apache.rocketmq.example.quickstart.Consumer

2.RocketMQ可视化控制台

下载地址

(1)下载源码

https://github.com/apache/rocketmq-dashboard

(2)修改配置文件

src/main/resources/application.properties文件:

server.port=8088
rocketmq.config.namesrvAddr=127.0.0.1:9876

(3) 编译打包 mvn clean package -Dmaven.test.skip=true

在这里插入图片描述

(4) 运行

java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar

在这里插入图片描述

三、RocketMQ 代码演练

在这里插入图片描述

1.封装结果domain

package com.lcz.spring_demo20.domain;

import java.io.Serializable;

/**
 * 功能描述:响应结果类
 *
 * 创建时间:Apr 29, 2018 4:08:36 PM
 */
public class JsonData implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	private Integer code; // 状态码 0 表示成功,1表示处理中,-1表示失败
	private Object data; // 数据
	private String msg;// 描述

	public JsonData() {
	}

	public JsonData(Integer code, Object data, String msg) {
		this.code = code;
		this.data = data;
		this.msg = msg;
	}

	// 成功,传入数据
	public static JsonData buildSuccess() {
		return new JsonData(0, null, null);
	}

	// 成功,传入数据
	public static JsonData buildSuccess(Object data) {
		return new JsonData(0, data, null);
	}

	// 失败,传入描述信息
	public static JsonData buildError(String msg) {
		return new JsonData(-1, null, msg);
	}

	// 失败,传入描述信息,状态码
	public static JsonData buildError(String msg, Integer code) {
		return new JsonData(code, null, msg);
	}

	// 成功,传入数据,及描述信息
	public static JsonData buildSuccess(Object data, String msg) {
		return new JsonData(0, data, msg);
	}

	// 成功,传入数据,及状态码
	public static JsonData buildSuccess(Object data, int code) {
		return new JsonData(code, data, null);
	}

	public Integer getCode() {
		return code;
	}

	public void setCode(Integer code) {
		this.code = code;
	}

	public Object getData() {
		return data;
	}

	public void setData(Object data) {
		this.data = data;
	}

	public String getMsg() {
		return msg;
	}

	public void setMsg(String msg) {
		this.msg = msg;
	}

	@Override
	public String toString() {
		return "JsonData [code=" + code + ", data=" + data + ", msg=" + msg
				+ "]";
	}

}

2.jms/MsgProducer生产者

package com.lcz.spring_demo20.jms;

import javax.annotation.PostConstruct;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class MsgProducer {
	 /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private  DefaultMQProducer producer ;

    	
    public DefaultMQProducer getProducer(){
    	return this.producer;
    }
    
    
    
    
    @PostConstruct
    public void init() {
        //生产者的组名
    	producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
    	//如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876"); 
        producer.setNamesrvAddr(namesrvAddr);
        
        producer.setVipChannelEnabled(false);
        
        try {
            /**
             * Producer对象在使用之前必须要调用start初始化,只能初始化一次
             */
            producer.start();

        } catch (Exception e) {
            e.printStackTrace();
        } 
        
        // producer.shutdown();  一般在应用上下文,关闭的时候进行关闭,用上下文监听器

    }
    
    
    
    
    
    
}

2.jms/MsgConsumer

package com.lcz.spring_demo20.jms;

import javax.annotation.PostConstruct;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class MsgConsumer {
    /**
     * 消费者的组名
     */
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //设置consumer所订阅的Topic和Tag,*代表全部的Tag
            consumer.subscribe("testTopic", "*");

            //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,跳过历史消息
            //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);



            //MessageListenerOrderly 这个是有序的
            //MessageListenerConcurrently 这个是无序的,并行的方式处理,效率高很多
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {

                        System.out.println("messageExt: " + messageExt);//输出消息内容

                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.控制器controller

package com.lcz.spring_demo20.controller;

import java.io.UnsupportedEncodingException;


import com.lcz.spring_demo20.domain.JsonData;
import com.lcz.spring_demo20.jms.MsgProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 功能描述:模拟微信支付回调
 */
@RestController
@RequestMapping("/api/v1")
public class OrderController {
	
	
	@Autowired
	private MsgProducer msgProducer;
	
	/**
	 * 功能描述:微信支付回调接口
	 * @param msg 支付信息
	 * @param tag 消息二级分类
	 * @return
	 */
	@GetMapping("order")
	public Object order(String msg, String tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
	  
		/**
        * 创建一个消息实例,包含 topic、tag 和 消息体           
       */
       Message message = new Message("testTopic",tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
       
       SendResult result = msgProducer.getProducer().send(message);
       
       System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
     
       return JsonData.buildSuccess();
	}
	
	
	
	
	
	
	
	
	
	
	
	
	
//	
//	/**
//	 * 功能描述:微信支付回调接口
//	 * @param msg 支付信息
//	 * @return
//	 */
//	@GetMapping("comment")
//	public Object comment(String msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
//	  
//		/**
//        * 创建一个消息实例,包含 topic、tag 和 消息体           
//       */
//       Message message = new Message("commentTopic","add", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
//       
//       //同步的方式,会有返回结果,发送的是普通消息
//       SendResult result = msgProducer.getProducer().send(message);
//       
//       System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
//     
//       return JsonData.buildSuccess();
//	}
//	
//	
//	
	
	
	
}

四、RocketMQ常见问题

问题:
1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed

​ 2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]

​ 3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local]
​ 解决:多网卡问题处理
​ 1、设置producer: producer.setVipChannelEnabled(false);
​ 2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip)
​ namesrvAddr = 192.168.0.101:9876
​ brokerIP1 = 192.168.0.101

​ 3、DESC: service not available now, maybe disk full, CL:
​ 解决:修改启动脚本runbroker.sh,在里面增加一句话即可:
​ JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
​ (磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息)

​ 常见问题处理:
​ https://blog.csdn.net/sqzhao/article/details/54834761
​ https://blog.csdn.net/mayifan0/article/details/67633729
​ https://blog.csdn.net/a906423355/article/details/78192828

件:broker.conf(下列ip为自己的ip)
​ namesrvAddr = 192.168.0.101:9876
​ brokerIP1 = 192.168.0.101

​ 3、DESC: service not available now, maybe disk full, CL:
​ 解决:修改启动脚本runbroker.sh,在里面增加一句话即可:
​ JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
​ (磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息)

​ 常见问题处理:
​ https://blog.csdn.net/sqzhao/article/details/54834761
​ https://blog.csdn.net/mayifan0/article/details/67633729
​ https://blog.csdn.net/a906423355/article/details/78192828

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【Spring Boot 2.0学习之旅-13】消息队列介绍和SpringBoot2.x整合RocketMQ 的相关文章

  • Docker的asp.net core应用部署系列——进入正在运行的docker容器里面

    本系列目录请看这里 https blog csdn net michel4liu article details 80819510 我们之前已经可以通过交互或后台方式启动一个容器 xff0c 有时我们需要进入docker里面看一些log或者
  • DATAX:MongoDB增量数据写入到mysql中

    项目场景 xff1a 简述 xff1a 使用DATAX进行Mongo的数据抽取 xff0c 然后写入到mysql中 xff0c 其中会牵涉到全量数据的写入和增量数据的写入 全量 数据的写入我们只需要正常写JSON模板即可 xff0c 使用c
  • 无人机自动驾驶软件系列 E04

    无人机自动驾驶软件系列 E04 xff1a 深度估计 xff0c 八叉树地图以及路径规划 详细配置https gaas gitbook io guide software realization build your own autonom
  • Window11安装如何跳过TPM2.0这一步

    最近想体验一把window11 xff0c 怎奈自己的电脑太老了 xff0c 显示不能安装 xff0c 网上大佬们都说是tpm2 0的问题 xff0c 顺着这个角度 xff0c 找到了一个国外大神的解决办法 1 在出现 Win11 会提示
  • 修改已运行Docker容器的端口映射

    方法一 删除容器 xff0c 重新新建容器多加一个 p端口映射即可 方法二 修改容器配置文件 重启docker服务 模拟创建运行中的容器 span class token punctuation span root 64 redmine t
  • e指数函数

    使用win自带的计算器 xff0c 进行以e为底的指数函数进行运算时 xff0c 发现计算器上的EXP键只能用做表示10的多少次方 xff0c 如5e3 xff0c 指的是5乘以10的3次方 xff0c 及5000 查了一下 xff0c 也
  • xsens惯导在ROS下输出汇总

    rostopic list 分别记录下每个对应的信息 xff1a 1 rostopic echo diagnostics header seq 454 stamp secs 1572609754 nsecs 307622909 frame
  • c 编程中extern关键字 使用跨文件全局变量

    我们知道 xff0c 程序的编译单位是源程序文件 xff0c 一个源文件可以包含一个或若干个函数 在函数内定义的变量是局部变量 xff0c 而在函数之外定义的变量则称为外部变量 xff0c 外部变量也就是我们所讲的全局变量 它的存储方式为静
  • 年度回忆录(?——2011.01)

    这是在CSDN 上的第一篇总结 xff08 或者说是回忆录 xff09 xff0c 个人认为 xff1a 总结 xff0c 尤其是年度总结是十分必要的 她可以很好的映射出自己以往的不足 xff0c 并为自己下一步的学习指明方向 以前在网易上
  • 使用JMF实现java写自己的视频播放器

    JMF这个多媒体开发框架太牛了 xff0c 简单的几句代码就能实现一个视频播放器的开发 xff0c 厉害 xff0c 就是支持的格式少了一些 xff0c 没关系 xff0c 这个视频播放器可以播放mpg xff0c avi fvl等等 xf
  • 相机标定:关键点法 vs 直接法

    相机标定中最常见的方法是关键点法 xff0c 比如 OpenCV 和 MatLab 中使用棋盘格 圆阵列等二维图案进行标定 xff0c 这上面的棋盘格角点和圆心就是所谓的关键点 虽然关键点法有很多优点 xff0c 但在某些情况下容易遇到标定
  • 8本推荐游戏开发书籍

    很多刚刚接触游戏开发的朋友经常问我 xff1a 如何开始学习游戏开发 xff1f 我从事游戏开发行业很多年了 xff0c 坦率地讲 xff0c 开发游戏充满挑战性 xff0c 需要开发人员具备大量的技能与积极的创新精神 希望这篇小文能帮助朋
  • Maxwell启动停止脚本

    Maxwell启动停止脚本 进入 maxwell bin nbsp 直接运行下面的内容 生成脚本 bin bash description maxwell File maxwell Description Starts and stops
  • Epoll 的time_out参数引发的cpu占用问题

    转自 xff1a https www cnblogs com Jimmy104 p 5258205 html 针对自己写的一个服务器网络引擎Engine 文章后面附上源码 使用epoll 刚刚开始时候发现占用CPU 特别高 xff0c 但是
  • 【周志华机器学习】集成学习

    第八章 集成学习 个体与集成BoostingBagging 与随机森林Bagging随机森林 结合策略平均法投票法学习法 多样性 个体与集成 集成学习通过构建并结合多个学习器来完成学习任务 xff0c 也被称为多分类器系统 集成学习的一般结
  • 用户句柄表的遍历

    私有句柄表 HANDLE TABLE ENTRY的Object直接指向 OBJECT HEADER不用减 span class token macro property span class token directive hash spa
  • 树莓派安装后--安装必要软件(个人版)

    树莓派安装后 安装必要软件 xff08 个人版 xff09 查看是哪个版本 lsb release a 查看raspbian是哪个版本 getconf LONG BIT 查看系统位数 uname a kernel 版本 opt vc bin
  • 固定翼姿态控制流程

    固定翼控制流程 主文件夹 子文件 fw att control位于src moudle文件夹下 fw att control main c 主文件 fw att control params c 主文件参数 CMakeList attitu
  • 固定翼位置控制_Tecs

    Tecs在位置控制中主要控制纵向的高度 xff0c 因为升降舵可以控制飞机的高度 xff0c 油门可以控制飞机的速度 xff0c 但是单单通过升降舵改变高度会使速度下降或者上升 xff0c 单单通过油门改变速度会使高度改变 xff0c 所以
  • 固定翼位置控制_L1

    L1算法控制航向 飞机从现在位置到设定位置 xff0c 需要进行转弯 转弯需要一个横向的加速度来改变速度的方向 这里横向加速度的计算公式 a 61 V2R 又因为R 61 2 sin L1 可得 a 61 2 V2L1 sin L1是现在位

随机推荐