RocketMQ消费重试问题

2023-11-14

异常现象

监控日志展示如下:

[2019-10-30 14:31:23.339 INFO ] [ConsumeMessageThread_7] (com.xxx.service.mq.MQConsumerService:93) - 消费消息:msgId=0A064C3E000179C63692734B339201B0 topic=topic_xxx tag=yyy reconsumeTimes=12

reconsumeTimes 代表消费重试次数。
同时日志中频繁显示同一条msgId。

排查原因

通过debug发现在执行业务代码时,抛出的异常被mq捕获,如下:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run
try {
    this.processQueue.getLockConsume().lock();
    if (this.processQueue.isDropped()) {
        log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
            this.messageQueue);
        break;
    }

    status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
        RemotingHelper.exceptionSimpleDesc(e), //
        ConsumeMessageOrderlyService.this.consumerGroup, //
        msgs, //
        messageQueue);
    hasException = true;
} finally {
    this.processQueue.getLockConsume().unlock();
}

if (null == status) {
    status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

当发生异常时messageListener.consumeMessage方法的返回值为null
当status为null时,status会被赋值为ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT状态(即隔一段时间后重试)。

上文造成消息重试的异常信息如下

java.lang.NoSuchMethodError: com.soaclient.dto.xxx.LogQueryDto.getMqTypeIn()Ljava/util/List;

由于该异常为Error级别,而业务代码异常捕获级别为Exception,导致异常没有被捕获。

改进方法

1、修改业务代码异常
2、修改异常捕获级别为throwable级别,并输出异常日志
3、增加重试次数限制,当大于一定次数时,则不再重试。
如果不加重试,针对顺序消费的情况,可能会出现消费被阻塞的情况。
而无序消费,默认最多重试16次。

题外话

当为无序消费时,代码如下:

try {
    ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
    if (msgs != null && !msgs.isEmpty()) {
        for (MessageExt msg : msgs) {
            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
        }
    }
    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
        RemotingHelper.exceptionSimpleDesc(e), //
        ConsumeMessageConcurrentlyService.this.consumerGroup,
        msgs,
        messageQueue);
    hasException = true;
}

if (null == status) {
    log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
        ConsumeMessageConcurrentlyService.this.consumerGroup,
        msgs,
        messageQueue);
    status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

参考

消息队列 MQ > 高级特性 > 消息重试
https://help.aliyun.com/document_detail/43490.html?spm=a2c4g.11186623.6.555.35b96c99oNQKPt

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ消费重试问题 的相关文章

  • UC/OS-III 消息队列

    消息队列 一 消息队列基本概念讲解1 消息队列基本概念2 消息池2 1 消息池概念2 2 消息池初始化2 3 消息队列的运作机制2 4 消息队列的阻塞机制2 5 消息队列的应用场景 二 消息队列创建步骤1 定义消息队列2 创建消息队列 三
  • FreeRTOS消息队列

    FreeRTOS消息队列 队列又称消息队列 xff0c 是一种常用于任务间通信的数据结构 xff0c 队列可以在任务与任务间 中断和任务间传递信息 xff0c 实现了任务接收来自其他任务或中断的不固定长度的消息 xff0c 任务能够从队列里
  • FreeRTOS消息队列、信号量、事件组、任务通知之间的区别

    转载自 xff1a https blog csdn net p1279030826 article details 103471564 功能及区别列表 消息队列 xff08 需要传递消息时使用 xff09 在任务与任务间 中断和任务间传递信
  • Python logging将日志输出到Kafka

    Python logging用于输出Python程序运行的日志 现实中往往一个项目会部署在多台机器之上 这种情况下 为了方便对各主机运行日志进行收集 往往会使用消息队列 通过消息队列将各台机器上的日志收集并写入日志文件 本文使用Python
  • 【Linux】利用消息队列实现一个简单的进程间双向通信(两种方式)

    在实现利用消息队列的进程间通信之前 先了解一下基本的概念和所需要用到的函数 消息队列 消息队列是Linux内核地址空间中的内部链表 各个进程可以通过它来进行消息传递 进程发送的消息会顺序写入消息队列之中 且每个消息队列都有IPC标识符唯一地
  • 如何保证消息队列的顺序性?

    面试题 如何保证消息的顺序性 面试官心理分析 其实这个也是用 MQ 的时候必问的话题 第一看看你了不了解顺序这个事儿 第二看看你有没有办法保证消息是有顺序的 这是生产系统中常见的问题 面试题剖析 我举个例子 我们以前做过一个 mysql b
  • RocketMQ学习笔记 - 顺序消息

    文章目录 1 定义 2 代码示例 2 1 消息实体 2 2 生产者 2 3 消费者 2 3 测试结果 1 定义 顺序消息 FIFO 消息 是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型 顺序消息由两个部分组成 顺序发布和顺序消费
  • RocketMQ安装与启动

    分享知识 传递快乐 官网 https rocketmq apache org 1 准备环境 系统 Centos7 jdk 1 8 2 环境部署 解压 rocketmq 并进入 rocketmq 下的 bin 目录 调整启动内存 vim bi
  • 13 SpringBoot整合RocketMQ实现过滤消息-根据SQL表达式过滤消息

    SQL表达式方式可以根据发送消息时输入的属性进行一些计算 RocketMQ的SQL表达式语法 只定义了一些基本的语法功能 数字比较 如 gt gt lt lt BETWEEN 字符比较 如 lt gt IN IS NULL or IS NO
  • kafka(一)kafka的基础与常用配置

    文章目录 一 kafka基础内容 二 kafka 中重要的参数配置 2 1 log dirs 2 2 unclean leader election enable 2 3 message max bytes 2 4 request requ
  • Rocketmq原理&最佳实践

    一 MQ背景 选型 消息队列作为高并发系统的核心组件之一 能够帮助业务系统解构提升开发效率和系统稳定性 主要具有以下优势 削峰填谷 主要解决瞬时写压力大于应用服务能力导致消息丢失 系统奔溃等问题 系统解耦 解决不同重要程度 不同能力级别系统
  • 延时消息队列

    目录 前言 一 延时队列实用场景 二 DelayQueue DelayQueue的实现 使用延迟队列 DelayQueue实现延时任务的优缺点 三 RocketMQ 原理 四 Kafka 原理 实现 DelayMessage定义 消息发送代
  • RabbitMQ消息队列实战(1)—— RabbitMQ的体系

    RabbitMQ是一个开源的消息代理和队列服务器 用来在不同的应用之间共享数据 1983年 被认为是RabbitMQ的雏形的Teknekron创建 首次提出了消息总线的概念 中间经历过数个阶段的发展 一直到2004年 AMQP Advanc
  • MQ消息队列的重复消费问题的通用解决办法以及幂等性的原理

    详细介绍了MQ消息队列重复消费的原因 以及通过保证幂等性来避免重复消费带来的问题 文章目录 1 至少一次 2 重复消费的原因 3 幂等性处理重复消费 1 至少一次 消息领域有一个对消息投递的QoS定义 Quality of Service
  • RabbitMQ理论+实战

    1 引出 1 1 中间件应用场景 1 跨系统数据传输 2 高并发的流量削峰 3 数据的分发与异步处理 4 大数据分析与传递 5 分布式事务 1 2 中间件常用协议 01 什么是协议 所谓协议是指 1 计算机底层操作系统和应用程序通讯时共同遵
  • php消息队列的应用

    欢迎加入 新群号码 99640845 最近打算开发一个新功能 计划应用消息队列 以前对消息队列都是简单的理论了解 真正应用之后把自己的感觉和一些理解整理下来 说正事分割线 具体的业务场景如下 用户下单 生成订单 支付 返回支付信息 就是正常
  • RocketMQ系列之架构浅谈

    RMQ的架构设计 下面我从GitHub上截取了一张RMQ的源码结构图 图中我框框出来的9大模块 基本就构成了整个RMQ的内部结构 上面9大模块的依赖层次主要如下 依赖越强的越处于底层 下面介绍下最上层的4个模块 这4个模块中工具命令行就不讲
  • RocketMQ-高级原理

    本节讲解下当MQ消息消费失败 或者发送不成功时如何处理消息 消息发送不成功一般存在于几种情况 网络原因 服务宕机 或者broker配置 消息发送失败 如果是由于broker配置原因 可以通过报错提示排查原因 无法查到路由信息 一般考虑到ro
  • 5分钟学会RocketMQ

    RocketMQ 简介 RocketMQ 是一个队列模型的消息中间件 具有高性能 高可用 高实时等特性 它并不支持JMS java消息服务 规范 但参考了JMS规范和kafak等的思想 Producer Consumer 队列都可以分布式
  • ESP32学习笔记05-串口事件方式读取数据

    串口中断方式处理数据 事件机构体 typedef struct uart event type t type lt UART event type size t size lt UART data size for UART DATA ev

随机推荐

  • CARLA仿真软件(一)【软件简介及Windows下的安装】

    CARLA基本介绍 CARLA是一个开源的自动驾驶模拟器 它是从头开始构建的 用作模块化和灵活的API 以解决自动驾驶问题中涉及的一系列任务 CARLA的主要目标之一是帮助自动驾驶研发民主化 它是一种易于用户使用和定制的工具 为此 模拟器必
  • 使用阿里云OSS对象存储搭建个人图床

    原文链接 使用阿里云OSS对象存储搭建个人图床 文章目录 一 购买阿里云OSS对象存储 二 创建Bucket 三 获取AccessKey等相关信息 一 购买阿里云OSS对象存储 1 登录阿里云后 搜索OSS 然后点击 立即购买 2 阿里云O
  • IOU、GIOU、DIOU、CIOU、EIOU、Focal EIOU、alpha IOU损失函数分析及Pytorch实现

    IOU Loss 算法作用 Iou的就是交并比 预测框和真实框相交区域面积和合并区域面积的比值 计算公式如下 Iou作为损失函数的时候只要将其对数值输出就好了 def Iou loss preds bbox eps 1e 6 reducti
  • Qt之QProgressBar

    简述 QProgressBar部件提供了一个水平或垂直进度条 进度条用于给用户操作一个进度指示 并向它们说明应用程序仍在运行 简述 详细描述 读取方向 进度方向 效果 源码 文本显示 效果 源码 繁忙指示 效果 源码 QSS 详细描述 可以
  • C++ STL使用

    文章目录 C STL使用 一 什么是STL 二 STL内容介绍 2 1 STL中六大组件 2 2 容器 2 3 迭代器 2 4 算法 2 4 1 算法分类 2 5 仿函数 2 5 1 仿函数 functor 在编程语言中的应用 2 5 2
  • QT5生成.exe文件时,出现缺少QT5core.dll文件解决方法

    在 http qt project org downloads 下载Qt SDK安装需要Qt版本 在QtCreator下 程序可以正常运行 但是当关闭QtCreator后 在DeBug目录下再运行相应的 exe程序时 会提示缺少Qt5Cor
  • 管理工作中的“七种浪费”

    管理工作中的 七种浪费 丰田生产方式中所归纳的 七种 浪费 主要发生在生产现场 但是产生这些浪费的深层次的原因是什么 如果仅仅关注现场存在的问题 而不解决被现象所掩盖的本质问题 无疑是舍本逐末 即使表面上轰轰烈烈 但实际效果也很有限 为了能
  • 刷脸支付在支付前后商家可以做无限延展

    人脸识别的技术传输则需要很强大的流量支撑 才能将人脸复杂的各类生物体特征数据传输到中控电脑里 而5G的超快速度 解决了这一棘手的问题 使得将刷脸支付应用到移动载体上 得到了实现 刷脸支付更大的想象空间在于它的引流能力和交互营销 刷脸支付是一
  • 墙内搭建Android开发环境

    本文首发在我的个人博客 https jlice top p 6s1gi 欢迎大家前去参观 么么哒 提到搭建Android开发环境 一般给出的方案是在Eclipse输入 https dl ssl google com android ecli
  • QT多线程基础

    文章目录 简介 相关名词 QT 运行方式 基础使用方法 void QObject moveToThread QThread targetThread 退出线程过程 wait 等待子线程的结束 实例 QT锁QMutex QMutexLocke
  • 对于TIS,TRP的天线有源测试参数分析

    TRP Total Radiated Power 全向辐射功率 TIS Total Isotropic Sensitivity 全向 辐射 灵敏度 通过对整个辐射球面的发射功率进行积分并取平均得到 它反映手机整机的发射功率情况 跟手机在传导
  • 【100天精通python】Day40:GUI界面编程_PyQt 从入门到实战(完)_网络编程与打包发布

    目录 8 网络编程 8 1 使用PyQt 网络模块进行网络通信 服务器端示例 客户端示例 8 2 处理网络请求和响应 9 打包和发布 9 1 创建可执行文件或安装程序 9 2 解决依赖问题 9 3 发布 PyQt 应用到不同平台 9 3 1
  • http://localhost:8050无法访问Splash主页

    早上起来 发现splash服务已经起来了 http localhost 8050怎么也打不开 后来各种百度 发现有篇文章里说 原因是服务的IP为192 168 99 100 回去看了一把 果然是 也可以用命令docker machine i
  • iPhone - 如何找到最顶层的视图控制器

    UIViewController topViewController return self topViewControllerWithRootViewController UIApplication sharedApplication k
  • org.springframework.context.annotation.ConflictingBeanDefinitionException: Annotation-specified bean

    当遇到这个错误的时候 可能是因为 spring注解的时候两个bean的注解名相同或冲突 如果有其他的问题 可发表评论一同讨论 谢谢
  • 已解决WARNING: There was an error checking the latest version of pip.

    成功解决 pip提示升级 WARNING There was an error checking the latest version of pip 文章目录 报错问题 报错翻译 报错原因 解决方法 千人全栈VIP答疑群联系博主帮忙解决报错
  • Base64是什么、应用场景、开源库(libb64)使用

    1 Base64是什么 1 1 概念 Base64是一种 将二进制转为64个可打印字符 的编码方式 Base64是网络上最常见的用于传输8Bit字节码的编码方式 一种基于64个可打印字符来表示二进制数据的方法 Base64编码原理是从二进制
  • 手机喊话应用实现思路

    手机要是动一下 就喊话 摇摇零线 摇摇零线 是不是比较酷 这里实现一下手机翻转一下 播放声音的效果 通过sensor识别到手机的运动状况 然后播放音频 public class MainActivity extends AppCompatA
  • 【第65篇】行人属性识别研究综述(二)

    文章目录 6 PAR 行人属性识别 算法综述 6 1全局基于图像的模型 6 1 1 ACN iccvw 2015 6 1 2 DeepSAR and DeepMAR ACPR 2015 6 6 1 3 MTCNN TMM 2015 7 6
  • RocketMQ消费重试问题

    异常现象 监控日志展示如下 2019 10 30 14 31 23 339 INFO ConsumeMessageThread 7 com xxx service mq MQConsumerService 93 消费消息 msgId 0A0