关于RocketMq消息积压问题排查

2023-10-29

1、最近生产的mq出现了一个问题:我的消费者是集群(就是双节点),现在消息积压到1亿多条,如下图所示
在这里插入图片描述
其中有两个问题:问题1,就是为什么我的消息积压这么多?问题2:我的消费者是集群,为什么只有一台消息在消费

2、接着开始排查问题,结果发现在消费端的逻辑代码里面有异常,因为我们在mq里面配置了消息重试机制,及重试机制的间隔时间,所以消息一直卡在那。但是问题来了,我们对异常代码进行捕获,在catch里面也有对mq进行返回,如下图所示

    @Override
    public OrderAction consume(final Message message, final ConsumeOrderContext context) {

        log.info("MQ消息消费者监听消息内容:{}", message);

        MqMessageRecordMo mqMessageRecordMo = new MqMessageRecordMo();
        try {
            String body = new String(message.getBody());
            String tag = message.getTag();
            mqMessageRecordMo.setMsgId(message.getMsgID());
            mqMessageRecordMo.setOrderTopic(message.getTopic());
            mqMessageRecordMo.setProducerIp(message.getBornHost());
            mqMessageRecordMo.setTag(tag);
            //mqMessageRecordMo.setMessageKey(message.getKey());
            mqMessageRecordMo.setShardingKey(message.getShardingKey());
            mqMessageRecordMo.setBodyJson(body);
            mqMessageRecordMo.setProducerTime(LocalDateLUtils.timestampToDatetime(message.getBornTimestamp()));
            mqMessageRecordMo.setCreateTime(LocalDateTime.now());
            mqMessageRecordMo.setPMsgId(message.getUserProperties("pMsgId"));

            log.info("MQ消费者消息消费成功,解析并处理相应的业务逻辑, tag = {},key = {},messageId = {}", tag, message.getShardingKey(),message.getMsgID());

            DataBaseEnum dataBase = DataBaseEnum.getEnum(message.getShardingKey());
            targetServiceImpl.process(message.getMsgID(),dataBase,body);

            log.info("MQ消息体消费监听解析结果:{}", body);
            mqMessageRecordMo.setIsSuccess(true);
            return OrderAction.Success;
        } catch (Exception e) {
            //消费失败,挂起当前队列
            // 存储错误消息,重试,记录日志
            log.error("target MQ消费者消息监听消息业务逻辑处理失败:",e);
            mqMessageRecordMo.setIsSuccess(false);
            mqMessageRecordMo.setErrorMsg(e.getMessage());
            return OrderAction.Success;
        } finally {
            mqMessageRecordDao.save(mqMessageRecordMo);
        }
    }

如上图代码所示,在catch里面有 return OrderAction.Success; 这行代码,即使消息消费失败后,重试几次后,理论上这条消息也是会消费完毕。但是事实不是这样的,消息一直在积压,没有被消费掉,所以唯一的解释就是,catch里面的 return OrderAction.Success这行代码没有生效。果然,经过排查发现
mqMessageRecordMo.setErrorMsg(e.getMessage()); 这种写法是有异常的,在catch里面他有异常,结果return OrderAction.Success;这行代码就不会往下走,所以造成mq没有收到成功确认的回执,所以异常消息一直没有消费成功,积压在mq里面。
3、解决方案:修改上述代码里面的

targetServiceImpl.process(message.getMsgID(),dataBase,body);

这个方法里面的异常代码,其次,把catch里面的保存异常的代码换一种写法,如下

		catch (Exception e) {
            //消费失败,挂起当前队列
            // 存储错误消息,重试,记录日志
            mqMessageRecordMo.setIsSuccess(false);
            StringWriter sw = new StringWriter();
            e.printStackTrace(new PrintWriter(sw,true));
            mqMessageRecordMo.setErrorMsg("event消息接收处理失败:" + sw.toString());
            log.error("target-MQ消费者消息监听消息业务逻辑处理失败:",e);
            return OrderAction.Success;
        }

4、经过上述修改后,消息正常消费。
5、但是还有一个问题,就是消息为什么都积压在同一台服务器上(我的消费者明明是两台服务器),其次,有什么办法加快消息的消费速度,如图1所示,mq的消费速度为每秒60条。
6、首先我尝试了加大消费者的消费线程数,如下图所示:

rocket:
  mq:
    dc:
      consume-thread-nums: 60
      max-reconsume-times: 3

我把 consume-thread-nums 由10改成了60,结果发现消费速率还是这么多。
7、后面我观察积压的消息的特征,发现所有的消息都是同一个
shardingKey,看到这里,我就明白了,因为我们项目所用的rocketMq是阿里巴巴的,我们在项目里面设置的是顺序消费(还有一个乱序消费,不保证顺序),而Roctmq的集群消息是根据shardingKey 分区的,在同一个区里面的消息是按照顺序来的。所以这也就能说明为啥我的1亿多条消息都发送到同一台服务器上,因为这些消息都同属于一个shardingKey,同时也能说明为啥我调消费者的线程数没有用,因为同一个shardingKey 分区里面的消息是单线程消费。

8、后续优化,把shardingKey 分区粒度调小一些,尽可能分散。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

关于RocketMq消息积压问题排查 的相关文章

  • RocketMQ 安装

    镜像方式安装 首先再把上一接中提到的 RocketMQ 部署架构图看一下 从图中可以看出 RocketMQ的服务端分为两块 Name Server 和 Broker Name Server 是一个几乎无状态节点 可集群部署 在消息队列Roc
  • rocketmq搭建2m2s踩坑版

    搭建过程中遇到了些问题百度了很久终于东拼西凑成功解决了 看看成果 console完美运行 代码生产消费也是正常的 言归正传开始搭建 0 服务器环境介绍 没有将master与对应的slave安装在同一个节点 如果在一个节点挂了master就挂
  • 本周 RocektMQ社区活动

    本周 RocektMQ社区邀请了陈厚德老师进行源码直播分享 陈老师曾就职于腾讯 盛大 斗鱼等知名互联网公司 现就职于尚德机构 分享相关信息 直播方式 钉钉群直播方式 群号 21791227 分享题目 RocketMQ同步 异步刷盘机制 分享
  • RocketMQ-Broker异常恢复后部分队列重新加载已消费消息问题解决

    问题描述 线上Broker所有在主机IO异常 导致Broker异常退出 主机问题解决后 重启Broker 发现group A的几个consumeQueue diff值有几十万 而通过查看日志信息发现该Group的consumeQueue只有
  • K8S部署rocketmq单机和集群

    K8S部署rocketmq单机和集群 版本 Rocketmq介绍 RocketMQ 的核心概念 2 1 Topic Queue tags 2 2 Producer 与 Producer Group 2 3 Consumer 与 Consum
  • MQClientException: CODE: 208  DESC: query message by key finished, but no message.

    2019 05 15 10 19 31 401 INFO closeChannel close the connection to remote address 127 0 0 1 10911 result true 2019 05 15
  • RocketMQ源码(十三)—消费者DefaultMQPushConsumer启动主要流程源码

    此前我们学习了Broker和Producer的启动源码 以及Producer发送消息源码和Broker接收存储消息的源码 现在 我们来学习Consumer的启动以及消费消息的源码 Consumer的启动源码和Producer的启动源码还是有
  • 漏洞复现-CVE-2023-33246 Apache RocketMQ RCE漏洞原理与复现

    目录 漏洞原理 漏洞描述 影响范围 Apache RocketMQ学习 文档学习 代码审计 漏洞复现 docker环境搭建 exp代码 总结 参考 漏洞原理 漏洞描述 For RocketMQ versions 5 1 0 and belo
  • rocketMq消息队列原生api使用以及rocketMq整合springboot

    rocketMq消息队列 文章目录 rocketMq消息队列 一 RocketMQ原生API使用 1 测试环境搭建 2 RocketMQ的编程模型 3 RocketMQ的消息样例 3 1 基本样例 3 2 顺序消息 3 3 广播消息 3 4
  • 报错:ImportError: rocketmq dynamic library not found解决方法

    目录 一 ImportError rocketmq dynamic library not found 二 OSError librocketmq so cannot open shared object file No such file
  • RocketMQ-名词和架构

    RocketMQ rocketMQ是做什么的我就不用解释了吧 以及他的背景 本文主要是为了让大家明白RocketMQ的工作原理 架构图 上图 双箭头代表是双向通信 ProducerGroup和ConsumerGroup以及Broker集群
  • 【RocketMQ】消息重试、重试次数设置、死信队列

    文章目录 1 死信队列 1 1 死信特性 1 2 查看死信消息 2 重试次数参数 2 1 Producer端重试 2 2 Consumer端重试 3 1 异常重试 3 2 超时重试 参考 1 死信队列 上一篇 RocketMQ 消息重试中我
  • rocketmq客户端配置

    1 客户端配置 相对于RocketMQ的Broker集群 生产者和消费者都是客户端 2 客户端寻址方式 RocketMQ可以令客户端找到Name Server 然后通过Name Server再找到Broker 如下所示有多种配置方式 优先级
  • 22道常见RocketMQ面试题以及答案

    面试宝典到手 搞定面试 不再是难题 系列文章传送地址 请点击本链接 1 RocketMQ是什么 2 RocketMQ有什么作用 3 RoctetMQ的架构 4 RoctetMQ的优缺点 8 消息过滤 如何实现 9 消息去重 如果由于网络等原
  • Failed to execute goal on project rocketmq-console-ng: Could not resolve dependencies for project

    Apache RocketMQ安装部署 Failed to execute goal on project rocketmq console ng Could not resolve dependencies for project org
  • 关于rocketmq 中日志文件路径的配置

    前些天发现了一个巨牛的人工智能学习网站 通俗易懂 风趣幽默 忍不住分享一下给大家 点击跳转到网站 rocketmq 中的数据和日志文件默认都是存储在user home路径下面的 往往我们都需要修改这些路径到指定文件夹以便管理 服务端日志 网
  • RocketMQ-高级原理

    本节讲解下当MQ消息消费失败 或者发送不成功时如何处理消息 消息发送不成功一般存在于几种情况 网络原因 服务宕机 或者broker配置 消息发送失败 如果是由于broker配置原因 可以通过报错提示排查原因 无法查到路由信息 一般考虑到ro
  • RocketMQ的消息优先级

    有些场景 需要应用程序处理几种类型的消息 不同消息的优先级不同 RocketMQ是个先入先出的队列 不支持消息级别或者Topic级别的优先级 业务中简单的优先级需求 可以通过间接的方式解决 下面列举三种优先级相关需求的具体处理方法 第一种
  • RocketMQ消费者可以手动消费但无法主动消费问题,或生成者发送超时

    1 大多数是配置问题 修改rocketmq文件夹broker conf 2 配置与集群IP或本地IPV4一样 重启 在RocketMQ独享实例中支持IPv4和IPv6双栈 主要是通过在网络层面上同时支持IPv4和IPv6协议栈来实现的 Ro
  • RocketMQ源码(26)—DefaultMQPushConsumer事务消息源码【一万字】

    事务消息是RocketMQ的一大特性 其被用来实现分布式事务 关于RocketMQ的事务消息的相关原理的介绍见这篇博客 RocketMQ的分布式事务机制 事务消息 关于事务消息的基本案例看这里 消息事务样例 本文主要介绍RocketMQ的事

随机推荐

  • centos7.0中搭建dhcp服务器

    一 dhcp 二 配置dhcp的方法 1 安装dhcp服务器 使用命令 vpm qa dhcp 查看一下dhcp有没有安装 如果没安装将不会有任何提示信息 如果安装好了 将会返回dhcp的版本号 可以看到我已经安装好了 在centos7的安
  • Swagger2介绍及使用

    项目中整合Swagger2 1 什么是swagger2 2 常用注解 3 项目中整合Swagger2 3 1 引入Swagger2依赖 3 2 编写swgger2配置类代码 3 3 在需要测试的模块中引入有swagger2的模块坐标 3 4
  • QT 智能提示设置

    关于QT的智能提示 有两点 一 默认只能在Ctrl Space或打 会自动转成 gt 的时候会出现 由于Ctrl Space默认在我电脑上是输入法的切换 所以一直以为没这个功能 敲代码时特别郁闷 于是在QT Creator中的Tool gt
  • 离散事件仿真原理DES

    参考 SYSTEM SIMULATION AND OPTIMIZATION 目录 1 系统仿真原理 1 1系统 模型和系统仿真 1 2系统仿真分类 1 2 1 蒙特卡洛仿真 Monte Carlo Simulation 1 2 2 离散系统
  • 傅里叶变换的一些总结

    傅里叶变换的一些总结 1 三角函数的正交性 三角函数系 1 c o s 0 x
  • 2023年Java面试笔试题

    Java 第一部分 1 什么叫多态 多态是同一个行为具有多个不同表现形式或形态的能力 即同一个接口采用不同的实例而执行不同的操作 2 以下哪个选 项属于多态存在的必要条件 A 继承 B 重写 C 父类引用指向子类对象 D 以上都是 多态存在
  • C语言代码规范

    1 排版 1 1 程序块要采用缩进风格编写 缩进的空格数为4个 1 2 相对独立的程序块之间 变量说明之后必须加空行 1 3 较长的语句 gt 80字符 要分成多行书写 长表达式要在低优先级操作符处划分新行 操作符放在新行之首 划分出的新行
  • 「网页开发|前端开发|Vue」06 公共组件与嵌套路由:让每一个页面都平等地拥有导航栏

    本文主要介绍在多个页面存在相同部分时 如何提取公共组件然后在多个页面中导入组件重复使用来减少重复代码 在这基础上介绍了通过嵌套路由的方式来避免页面较多或公共部分较多的情况下 避免不断手动导入公共组件的麻烦 并且加快页面跳转的速度 文章目录
  • Java集合(List、Set、Map)

    Java中的集合是用于存储和组织对象的数据结构 Java提供了许多不同的集合类 包括List Set和Map等 以满足不同的需求 下面将介绍一些常见的Java集合类及其使用方法 一 List List是一个有序的集合 它允许元素重复出现 并
  • MultipleFile转File、File转Byte

    MultipleFile转File File转Byte 工具类 file2byte param file return public static byte convertFileToByteArray File file try File
  • 多维数组np.pad函数的理解

    多维数组np pad函数的理解 原函数是 np pad array x1 y1 x2 y2 x3 y3 constant x1 y1 意思是着在a这个三维矩阵中 整个大矩阵中首尾分别添加x1 y1个和a中各个矩阵形状一样的0矩阵 效果如下图
  • 抓取鼠标动画

    今天给大家分享一个抓取鼠标的动画 嗯 真的是抓取鼠标 代码如下
  • python怎么封装函数_python怎么封装函数

    什么是封装 在程序设计中 封装 Encapsulation 是对具体对象的一种抽象 即将某些部分隐藏起来 在程序外部看不到 其含义是其他程序无法调用 要了解封装 离不开 私有化 就是将类或者是函数中的某些属性限制在某个区域之内 外部无法调用
  • 连接器出线方法分享(持续更新)

  • 基于情境化时空网络的出租车OD需求预测

    1 文章信息 Contextualized Spatial Temporal Network for Taxi Origin Destination Demand Prediction 是2019年发表在IEEE上的一篇文章 2 摘要 本文
  • 区块链之添加节点

    1 查询节点信息 gt admin nodeInfo enode enode b817560f061b1f14551f87060806847c4c6b7cf8b56b6027fd3d8400c3abb4e2a3d535dd78ab46f28
  • CAD打开字体无法选择,cad打开无字体,cad无法加载字体

    在命令栏中输入filedia 然后回车 输入1保存关闭 再重新打开即可
  • gcc编译出现:error: invalid operands to binary & (have ‘char *’ and ‘int *’)

    1 2 gt File Name ptr variable c 3 gt Author Mr Yang 4 gt Purpose 演示指向变量的指针 5 gt Created Time 2017年06月03日 星期六 08时47分33秒 6
  • JDBC PostgreSQL

    上一节 JDBC可以操作多种数据库 而且都是标准化操作 区别仅仅在使用不同的数据库连接驱动程序 及URL连接方式的书写 引用SQL包 import java sql public class JDBCTest param args publ
  • 关于RocketMq消息积压问题排查

    1 最近生产的mq出现了一个问题 我的消费者是集群 就是双节点 现在消息积压到1亿多条 如下图所示 其中有两个问题 问题1 就是为什么我的消息积压这么多 问题2 我的消费者是集群 为什么只有一台消息在消费 2 接着开始排查问题 结果发现在消