rabbitmq消息消费失败如何处理

2023-11-06

在介绍消息中间件 MQ 之前,我们先来简单的了解一下,为何要引用消息中间件。

例如,在电商平台中,常见的用户下单,会经历以下几个流程。

当用户下单时,创建完订单之后,会调用第三方支付平台,对用户的账户金额进行扣款,如果平台支付扣款成功,会将结果通知到对应的业务系统,接着业务系统会更新订单状态,同时调用仓库接口,进行减库存,通知物流进行发货!
在这里插入图片描述
试想一下,从订单状态更新、到扣减库存、通知物流发货都在一个方法内同步完成,假如用户支付成功、订单状态更新也成功,但是在扣减库存或者通知物流发货步骤失败了,那么就会造成一个问题,用户已经支付成功了,只是在仓库扣减库存方面失败,从而导致整个交易失败!

一单失败,老板可以假装看不见,但是如果上千个单子都因此失败,那么因系统造成的业务损失,将是巨大的,老板可能坐不住了!

因此,针对这种业务场景,架构师们引入了异步通信技术方案,从而保证服务的高可用,大体流程如下:
在这里插入图片描述
当订单系统收到支付平台发送的扣款结果之后,会将订单消息发送到 MQ 消息中间件,同时也会更新订单状态。

在另一端,由仓库系统来异步监听订单系统发送的消息,当收到订单消息之后,再操作扣减库存、通知物流公司发货等服务!

在优化后的流程下,即使扣减库存服务失败,也不会影响用户交易。

正如《人月神话》中所说的,软件工程,没有银弹!

当引入了 MQ 消息中间件之后,同样也会带来另一个问题,假如 MQ 消息中间件突然宕机了,导致消息无法发送出去,那仓库系统就无法接受到订单消息,进而也无法发货!

针对这个问题,业界主流的解决办法是采用集群部署,一主多从模式,从而实现服务的高可用,即使一台机器突然宕机了,也依然能保证服务可用,在服务器故障期间,通过运维手段,将服务重新启动,之后服务依然能正常运行!

但是还有另一个问题,假如仓库系统已经收到订单消息了,但是业务处理异常,或者服务器异常,导致当前商品库存并没有扣减,也没有发货!

这个时候又改如何处理呢?

今天我们所要介绍的正是这种场景,假如消息消费失败,我们应该如何处理?

二、解决方案
针对消息消费失败的场景,我们一般会通过如下方式进行处理:

当消息消费失败时,会对消息进行重新推送

如果重试次数超过最大值,会将异常消息存储到数据库,然后人工介入排查问题,进行手工重试
在这里插入图片描述
当消息在客户端消费失败时,我们会将异常的消息加入到一个消息重试对象中,同时设置最大重试次数,并将消息重新推送到 MQ 消息中间件里,当重试次数超过最大值时,会将异常的消息存储到 MongoDB数据库中,方便后续查询异常的信息。

基于以上系统模型,我们可以编写一个公共重试组件,话不多说,直接干!

三、代码实践
本次补偿服务采用 rabbitmq 消息中间件进行处理,其他消息中间件处理思路也类似!

3.1、创建一个消息重试实体类

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MessageRetryDTO implements Serializable {

private static final long serialVersionUID = 1L;

/**
 * 原始消息body
 */
private String bodyMsg;

/**
 * 消息来源ID
 */
private String sourceId;

/**
 * 消息来源描述
 */
private String sourceDesc;

/**
 * 交换器
 */
private String exchangeName;

/**
 * 路由键
 */
private String routingKey;

/**
 * 队列
 */
private String queueName;

/**
 * 状态,1:初始化,2:成功,3:失败
 */
private Integer status = 1;

/**
 * 最大重试次数
 */
private Integer maxTryCount = 3;

/**
 * 当前重试次数
 */
private Integer currentRetryCount = 0;

/**
 * 重试时间间隔(毫秒)
 */
private Long retryIntervalTime = 0L;

/**
 * 任务失败信息
 */
private String errorMsg;

/**
 * 创建时间
 */
private Date createTime;

@Override
public String toString() {
    return "MessageRetryDTO{" +
            "bodyMsg='" + bodyMsg + '\'' +
            ", sourceId='" + sourceId + '\'' +
            ", sourceDesc='" + sourceDesc + '\'' +
            ", exchangeName='" + exchangeName + '\'' +
            ", routingKey='" + routingKey + '\'' +
            ", queueName='" + queueName + '\'' +
            ", status=" + status +
            ", maxTryCount=" + maxTryCount +
            ", currentRetryCount=" + currentRetryCount +
            ", retryIntervalTime=" + retryIntervalTime +
            ", errorMsg='" + errorMsg + '\'' +
            ", createTime=" + createTime +
            '}';
}

/**
 * 检查重试次数是否超过最大值
 *
 * @return
 */
public boolean checkRetryCount() {
    retryCountCalculate();
    //检查重试次数是否超过最大值
    if (this.currentRetryCount < this.maxTryCount) {
        return true;
    }
    return false;
}

/**
 * 重新计算重试次数
 */
private void retryCountCalculate() {
    this.currentRetryCount = this.currentRetryCount + 1;
}

}
3.2、编写服务重试抽象类

public abstract class CommonMessageRetryService {

private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private MongoTemplate mongoTemplate;


/**
 * 初始化消息
 *
 * @param message
 */
public void initMessage(Message message) {
    log.info("{} 收到消息: {},业务数据:{}", this.getClass().getName(), message.toString(), new String(message.getBody()));
    try {
        //封装消息
        MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
        if (log.isInfoEnabled()) {
            log.info("反序列化消息:{}", messageRetryDto.toString());
        }
        prepareAction(messageRetryDto);
    } catch (Exception e) {
        log.warn("处理消息异常,错误信息:", e);
    }
}

/**
 * 准备执行
 *
 * @param retryDto
 */
protected void prepareAction(MessageRetryDTO retryDto) {
    try {
        execute(retryDto);
        doSuccessCallBack(retryDto);
    } catch (Exception e) {
        log.error("当前任务执行异常,业务数据:" + retryDto.toString(), e);
        //执行失败,计算是否还需要继续重试
        if (retryDto.checkRetryCount()) {
            if (log.isInfoEnabled()) {
                log.info("重试消息:{}", retryDto.toString());
            }
            retrySend(retryDto);
        } else {
            if (log.isWarnEnabled()) {
                log.warn("当前任务重试次数已经到达最大次数,业务数据:" + retryDto.toString(), e);
            }
            doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
        }
    }
}

/**
 * 任务执行成功,回调服务(根据需要进行重写)
 *
 * @param messageRetryDto
 */
private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
    try {
        successCallback(messageRetryDto);
    } catch (Exception e) {
        log.warn("执行成功回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
    }
}

/**
 * 任务执行失败,回调服务(根据需要进行重写)
 *
 * @param messageRetryDto
 */
private void doFailCallBack(MessageRetryDTO messageRetryDto) {
    try {
        saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
        failCallback(messageRetryDto);
    } catch (Exception e) {
        log.warn("执行失败回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
    }
}

/**
 * 执行任务
 *
 * @param messageRetryDto
 */
protected abstract void execute(MessageRetryDTO messageRetryDto);

/**
 * 成功回调
 *
 * @param messageRetryDto
 */
protected abstract void successCallback(MessageRetryDTO messageRetryDto);

/**
 * 失败回调
 *
 * @param messageRetryDto
 */
protected abstract void failCallback(MessageRetryDTO messageRetryDto);

/**
 * 构建消息补偿实体
 * @param message
 * @return
 */
private MessageRetryDTO buildMessageRetryInfo(Message message){
    //如果头部包含补偿消息实体,直接返回
    Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();
    if(messageHeaders.containsKey("message_retry_info")){
        Object retryMsg = messageHeaders.get("message_retry_info");
        if(Objects.nonNull(retryMsg)){
            return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
        }
    }
    //自动将业务消息加入补偿实体
    MessageRetryDTO messageRetryDto = new MessageRetryDTO();
    messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
    messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
    messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
    messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
    messageRetryDto.setCreateTime(new Date());
    return messageRetryDto;
}

/**
 * 异常消息重新入库
 * @param retryDto
 */
private void retrySend(MessageRetryDTO retryDto){
    //将补偿消息实体放入头部,原始消息内容保持不变
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
    messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
    Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
    rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
}



/**
 * 将异常消息存储到mongodb中
 * @param retryDto
 */
private void saveMessageRetryInfo(MessageRetryDTO retryDto){
    try {
        mongoTemplate.save(retryDto, "message_retry_info");
    } catch (Exception e){
        log.error("将异常消息存储到mongodb失败,消息数据:" + retryDto.toString(), e);
    }
}

}
3.3、编写监听服务类

在消费端应用的时候,也非常简单,例如,针对扣减库存操作,我们可以通过如下方式进行处理!

@Component
public class OrderServiceListener extends CommonMessageRetryService {

private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);

/**
 * 监听订单系统下单成功消息
 * @param message
 */
@RabbitListener(queues = "mq.order.add")
public void consume(Message message) {
    log.info("收到订单下单成功消息: {}", message.toString());
    super.initMessage(message);
}


@Override
protected void execute(MessageRetryDTO messageRetryDto) {
    //调用扣减库存服务,将业务异常抛出来
}

@Override
protected void successCallback(MessageRetryDTO messageRetryDto) {
    //业务处理成功,回调
}

@Override
protected void failCallback(MessageRetryDTO messageRetryDto) {
    //业务处理失败,回调
}

}
当消息消费失败,并超过最大次数时,会将消息存储到 mongodb 中,然后像常规数据库操作一样,可以通过 web 接口查询异常消息,并针对具体场景进行重试!

四、小结
可能有的同学会问,为啥不将异常消息存在数据库?

起初的确是存储在 MYSQL 中,但是随着业务的快速发展,订单消息数据结构越来越复杂,数据量也非常的大,甚至大到 MYSQL 中的 text 类型都无法存储,同时这种数据结构也不太适合在 MYSQL 中存储,因此将其迁移到 mongodb!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rabbitmq消息消费失败如何处理 的相关文章

  • 如何将本机库链接到 IntelliJ 中的 jar?

    我正在尝试在 IntelliJ 中设置 OpenCV 但是我一直在弄清楚如何告诉 IntelliJ 在哪里可以找到本机库位置 在 Eclipse 中 添加 jar 后 您可以在 Build Config 屏幕中设置 Native 库的位置
  • 如何让 BlazeDS 忽略属性?

    我有一个 java 类 它有一个带有 getter 和 setter 的字段 以及第二对 getter 和 setter 它们以另一种方式访问 该字段 public class NullAbleId private static final
  • 不同帐户上的 Spring Boot、JmsListener 和 SQS 队列

    我正在尝试开发一个 Spring Boot 1 5 应用程序 该应用程序需要侦听来自两个不同 AWS 帐户的 SQS 队列 是否可以使用 JmsListener 注解创建监听器 我已检查权限是否正确 我可以使用 getQueueUrl 获取
  • 如何通过 javaconfig 使用 SchedulerFactoryBean.schedulerContextAsMap

    我使用 Spring 4 0 并将项目从 xml 移至 java config 除了访问 Service scheduleService 带注释的类来自QuartzJobBean executeInternal 我必须让它工作的 xml 位
  • 使用 LinkedList 实现下一个和上一个按钮

    这可能是一个愚蠢的问题 但我很难思考清楚 我编写了一个使用 LinkedList 来移动加载的 MIDI 乐器的方法 我想制作一个下一个和一个上一个按钮 以便每次单击该按钮时都会遍历 LinkedList 如果我硬编码itr next or
  • .properties 中的通配符

    是否存在任何方法 我可以将通配符添加到属性文件中 并且具有所有含义 例如a b c d lalalala 或为所有以结尾的内容设置一个正则表达式a b c anything 普通的 Java 属性文件无法处理这个问题 不 请记住 它实际上是
  • Pig Udf 显示结果

    我是 Pig 的新手 我用 Java 编写了一个 udf 并且包含了一个 System out println 其中的声明 我必须知道在 Pig 中运行时该语句在哪里打印 假设你的UDF 扩展了 EvalFunc 您可以使用从返回的 Log
  • 在接口中使用默认方法是否违反接口隔离原则?

    我正在学习 SOLID 原则 ISP 指出 客户端不应被迫依赖于他们所使用的接口 不使用 在接口中使用默认方法是否违反了这个原则 我见过类似的问题 但我在这里发布了一个示例 以便更清楚地了解我的示例是否违反了 ISP 假设我有这个例子 pu
  • 来自 dll 的 Java 调用函数

    我有这个 python 脚本导入zkemkeeperdll 并连接到考勤设备 ZKTeco 这是我正在使用的脚本 from win32com client import Dispatch zk Dispatch zkemkeeper ZKE
  • Java 公历日历更改时区

    我正在尝试设置 HOUR OF DAY 字段并更改 GregorianCalendar 日期对象的时区 GregorianCalendar date new GregorianCalendar TimeZone getTimeZone GM
  • 检测并缩短字符串中的所有网址

    假设我有一条字符串消息 您应该将 file zip 上传到http google com extremelylonglink zip http google com extremelylonglink zip not https stack
  • 像 Java 这样的静态类型语言中动态方法解析背后的原因是什么

    我对 Java 中引用变量的动态 静态类型和动态方法解析的概念有点困惑 考虑 public class Types Override public boolean equals Object obj System out println i
  • 内部类的构造函数引用在运行时失败并出现VerifyError

    我正在使用 lambda 为内部类构造函数创建供应商ctx gt new SpectatorSwitcher ctx IntelliJ建议我将其更改为SpectatorSwitcher new反而 SpectatorSwitcher 是我正
  • 为什么 Java 8 不允许非公共默认方法?

    让我们举个例子 public interface Testerface default public String example return Hello public class Tester implements Testerface
  • Opencv Java 灰度

    我编写了以下程序 尝试从彩色转换为灰度 Mat newImage Imgcodecs imread q1 jpg Mat image new Mat new Size newImage cols newImage rows CvType C
  • 找不到符号 NOTIFICATION_SERVICE?

    package com test app import android app Notification import android app NotificationManager import android app PendingIn
  • 使用反射覆盖最终静态字段是否有限制?

    在我的一些单元测试中 我在最终静态字段上的反射中遇到了奇怪的行为 下面是说明我的问题的示例 我有一个基本的 Singleton 类 其中包含一个 Integer public class BasicHolder private static
  • 如何将双精度/浮点四舍五入为二进制精度?

    我正在编写对浮点数执行计算的代码的测试 不出所料 结果很少是准确的 我想在计算结果和预期结果之间设置一个容差 我已经证实 在实践中 使用双精度 在对最后两位有效小数进行四舍五入后 结果始终是正确的 但是usually四舍五入最后一位小数后
  • 使用 svn 1.8.x、subclise 1.10 的 m2e-subclipse 连接器在哪里?

    我读到 m2e 的生产商已经停止生产 svn 1 7 以外的任何版本的 m2e 连接器 Tigris 显然已经填补了维护 m2e subclipse 连接器的空缺 Q1 我的问题是 使用 svn 1 8 x 的 eclipse 更新 url
  • Spring Boot 无法更新 azure cosmos db(MongoDb) 上的分片集合

    我的数据库中存在一个集合 documentDev 其分片键为 dNumber 样本文件 id 12831221wadaee23 dNumber 115 processed false 如果我尝试使用以下命令通过任何查询工具更新此文档 db

随机推荐

  • C#学习记录——.NET的三层架构

    每一个不曾起舞的日子 都是对生命的辜负 尼采 每一个不读书的的日子 都是对时光的辜负 今天学习 零基础学C 3 0 NET的三层架构 为了实现大型应用系统后续功能的扩展性和程序的灵活性 NET编程语言借鉴了JAVA的MVC思想 产生了三层架
  • MySQL - 第9节 - MySQL内外连接

    目录 1 内连接 2 外连接 2 1 左外连接 2 2 右外连接 3 简单案例 1 内连接 表的连接分为内连接和外连接 内连接实际上就是利用where 子句对两种表形成的笛卡儿积进行筛选 我们前面学习的查询都是内连接 也是在开发过程中使用的
  • Markdown语法--Obsidian笔记

    Markdown 语法 笔记 文章目录 Markdown 语法 笔记 语法分类 文字层级类 1 标题 2 段落 3 区块引用 4 代码区块 5 列表 6 待办事项 文字格式类 1 样式 2 表格 链接引用类 1 链接 2 图片 3 脚注 4
  • Dubbo与Spring Cloud的区别

    这是个老生常谈的问题 每个技术团队在业务转型微服务化架构的时候都会纠结过这个选型问题 首先 dubbo 之前确实在 2012 年的时候发布了最后一个版本 2 5 3 并且停止维护更新 在2017年的时候又 起死回生 官方宣布重启更新 并重点
  • 2021图像检索综述

    论文地址 Deep Image Retrieval A Survey 本文是2021年最新的关于图像检索的综述 介绍了基于内容的图像检索 content based image retrieval CBIR 在深度学习技术上的进展 目录 0
  • Traceback (most recent call last): File “D:/python_workspace/hello.py“, line 3, in <module>

    错误背景 python的初学者 在学习多行语句 错误信息如下 错误原因 变量有字符串类型 有整型类型 有浮点型 在java 里面 String标识字符串类型 Int标识整型 在python里面 a yy1 就是字符串类型 a 1就是数字类型
  • 29_content 阶段的concat 模块

    文章目录 提升性能 content 阶段的 caoncat 模块 concat 模块的指令 示例配置 提升性能 content 阶段的 caoncat 模块 功能 当页面需要访问多个小文件时 把它们内容合并到一次http 响应中返回 提升性
  • 数组排序的方法?

    1 sort排序 let arr 1 2 3 4 5 6 7 8 9 0 9 8 7 6 3 4 5 5 var res console log arr 排序前 1 2 3 4 5 6 7 8 9 0 9 8 7 6 3 4 5 5 arr
  • SSD目标检测算法原理(上)

    目录 一 目标检测概述 1 1 项目演示介绍 1 2 图片识别背景 1 3 目标检测定义 二 目标检测算法原理 2 1 任务描述 2 2 目标检测算法必备基础 2 3目标检测算法模型输出 目标检测 overfeat模型 R CNN模型 候选
  • h2database源码解析-查询优化器原理

    目录 一 成本计算规则 二 单表查询 三 多表关联查询 一 成本计算规则 h2的查询优化器基于成本的 因此在执行查询前 会基于成本计算使用哪个索引 如果涉及多表关联 还会计算不同表关联顺序的成本 最终基于最小成本得出执行计划 单表查询时 遍
  • 树莓派驱动开发简单案例完整过程(动态加载驱动)

    1 下载树莓派os镜像 https www raspberrypi org downloads raspbian 2 使用命令 uname a 查看树莓派内核 Linux raspberrypi 4 19 118 v7 1311 SMP M
  • JQuery筛选器

    jQuery提供了强大的选择器让我们获取对象 在这边 我人为地将jQuery选择器分为两大部分 选择对象和筛选条件 选择对象表示要获取什么对象 筛选条件是对获取的对象进行筛选 最终留下符合某些特征的对象 1 选择对象1 基本 id根据给定的
  • VC++、MFC中最好的开源项目

    介绍一下用VC MFC写的最好的开源项目 Sourceforge net中有许多高质量的VC 开源项目 我列举了一些可以作为VC 程序员的参考 一 优秀的开源项目 7 Zip http sourceforge net projects se
  • react项目路由组件懒加载方法对比,@loadable/component和react-loadable和suspense lazy

    1 使用 loadable component方法 推荐使用这个 npm install loadable component S 先安装一下 2 在app js中引入 loadable component import Loadable
  • 随机森林回归模型--评分预测

    PS 介绍代码仅供介绍 源代码后期经过修改与介绍代码不一定完全相同 索引表 使用到的库 数据加载和预处理 划分训练集和测试集 模型选择和训练 模型评估 模型优化 结果展示 尾声 使用到的库 import pandas as pd 数据处理库
  • tomcat版本与jdk对应关系

    见tomcat官网说明 http tomcat apache org whichversion html Apache Tomcat Versions Apache Tomcat is an open source software imp
  • Mongodb系列- spring-data-mongodb使用MongoTemplate实现分页查询

    转载于 http www cnblogs com jycboy p 8969035 html 在用spring data mongodb框架开发的过程中 需要实现分页查询 就百度了下 没找到满意的又google了下 找到了思路 在sprin
  • IO进程线程day8(2023.8.6)

    一 Xmind整理 管道的原理 有名管道的特点 信号的原理 二 课上练习 练习1 pipe 功能 创建一个无名管道 同时打开无名管道的读写端 原型 include
  • 如何让Myeclipse已经关闭掉的项目不显示出来

    一 打开Package Explorer视图 在它的右上角有一个向下的三角图标 2 点击后选择Filters 在弹出的Filter配置窗口中选中 Closed Projects 转载于 https www cnblogs com rensh
  • rabbitmq消息消费失败如何处理

    在介绍消息中间件 MQ 之前 我们先来简单的了解一下 为何要引用消息中间件 例如 在电商平台中 常见的用户下单 会经历以下几个流程 当用户下单时 创建完订单之后 会调用第三方支付平台 对用户的账户金额进行扣款 如果平台支付扣款成功 会将结果