RabbitMQ保证消息的一致性解决方案

2023-10-31

RabbitMQ保证消息的一致性

一、采用confirm消息确认机制及return返回机制 确保消息发送成功

二、将队列以及消息设置持久化 保证rabbitmq突然宕机消息仍然存在

三、手动确认接收消息方式 消息处理失败拒收重回队列


1. yml配置

spring:
  rabbitmq:
    host: ip
    port: 5672
    username: guest
    password: guest
    ##消息发送确认回调
    publisher-confirms: true
    #采用confirm以及return机制 发送返回监听回调
    publisher-confirm-type: correlated
    publisher-returns: true
listener:
      type: simple
      simple:
        #手动接收消息方式
        acknowledge-mode: manual

2. RabbitMQ配置类

@Configuration
@Slf4j
@AllArgsConstructor
public class RabbitmqConfig {
    private final ConnectionFactory connectionFactory;
    private final RabbitLogsMapper rabbitLogsMapper;
 
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //confirm确认
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            String msgId = correlationData.getId();
            if (ack) {
                //发送成功
                log.info("消息成功发送 , msgId: {}," ,msgId);
                //状态更新  消息发送成功
                BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs();
                biddingRabbitLogs.setStatus(SendStatus.SEND_SUCCESS.getValue());
                rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4"));
            } else {
                //发送失败
                log.error("消息发送失败, {}, cause: {}, msgId: {}", correlationData, cause, msgId);
                //状态更新  消息发送失败
                BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs();
                biddingRabbitLogs.setStatus(SendStatus.SEND_FAILD.getValue());
                rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4"));
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            //触发回调  只有交换机找不到队列时才会触发
            log.error("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
            //状态更新 消息发送失败
            String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
            BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs();
            biddingRabbitLogs.setStatus(SendStatus.SEND_FAILD.getValue());
            rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4"));
        });
        return rabbitTemplate;
    }
    @Bean
    public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

说明:

  • confirm机制只是确保了消息是否成功发送到交换机
  • Return机制确保了消息是否从交换机发送到指定的队列

- - ConfirmCallback则根据状态判断发送成功还是失败 进行更新日志表记录状态
  • ReturnCallback则根据收到消息就是未找到队列发送失败,未收到消息就是发送成功 进行更新日志表记录状态

3. 声明的队列一定要将队列持久化

public String createQueue(String queueName) {
        BiddingQueueConfig biddingQueueConfig = queueMapper.selectOne(Wrappers.lambdaQuery(BiddingQueueConfig.class).eq(BiddingQueueConfig::getQueue, queueName));
        if (biddingQueueConfig == null) {
            biddingQueueConfig = new BiddingQueueConfig();
            biddingQueueConfig.setCreatetime(new Date());
            biddingQueueConfig.setQueue(queueName);
            biddingQueueConfig.setStatus("1");
            int insert = queueMapper.insert(biddingQueueConfig);
            //将队列持久化
            rabbitAdmin.declareQueue(new Queue(queueName,true));
            return queueName + "队列创建成功";
        }
        return queueName + "队列创建失败";
}

4. 发送消息 将发送的消息设置为持久化

发送消息前首先将发送的数据插入数据库,状态变为发送中在这里插入图片描述

5. 消费者监听队列

  • 如果根据消息id查询日志表为空的话那么是没有发送消息,消息自动接收,发送成功消息后日志表会有数据
  • 判断是否重复消费 根据状态是否成功消费以及失败重试次数判断
  • 处理业务逻辑,如果成功消息接收 状态更新
  • 如果处理业务逻辑失败报错则会拒收,消息重回队列重新处理此条消息,当处理次数超过3次处理失败则消息改为接收
// 启动自动创建队列
@RabbitListener(queuesToDeclare = { @Queue("queue_work_dontask") })
@RabbitHandler
@SneakyThrows
public void receiveDonTask(String data, Message message, Channel channel){
    //消息id
    String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
    //根据消息id查询BiddingRabbitLogs日志表
    BiddingRabbitLogs biddingRabbitLogs = remoteLogsService.get(msgId, SecurityConstants.FROM_IN).getData();
    if (biddingRabbitLogs == null) {
        log.error("消息ID查询 biddingRabbitLogs:null");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        return;
    }
    //状态:1.消息发送中 2.消息发送成功 3.消息发送失败 4.消费成功 5.消费失败
    if (SendStatus.CONSUME_SUCCESS.getValue().equals(biddingRabbitLogs.getStatus()) || SendStatus.SEND_FAILD.getValue() == String.valueOf(biddingRabbitLogs.getTryTimes())) {
        //重复消费
        log.info("消息ID:{},重复消费",msgId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        return;
    }
    try {
        //处理业务逻辑
        Map map = JSON.parseObject(data, Map.class);
        String dataString = (String) map.get("data");
        String username = (String) map.get("username");
        Integer tenantId = (Integer) map.get("tenantId");
        ApproveParam approveParam = JSON.parseObject(dataString, ApproveParam.class);
        R<String> stringR = doneTask(approveParam,username,tenantId);
        //处理成功  更新状态
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        biddingRabbitLogs.setStatus(SendStatus.CONSUME_SUCCESS.getValue());
        biddingRabbitLogs.setSuccesstime(new Date());
        remoteLogsService.updateById(biddingRabbitLogs,SecurityConstants.FROM_IN);
        log.info("消费成功,消息ID:{}",msgId);
    } catch (Exception e) {
        e.printStackTrace();
        if (biddingRabbitLogs.getTryTimes() >= Integer.parseInt(SendStatus.TRY_TIMES.getValue())) {
            //多次消费不成功 自动接收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            log.error("多次消费失败,消息ID:{}",msgId);
        } else {
            //消费失败 拒收 重回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            log.error("消费失败,消息ID:{}",msgId);
        }
        biddingRabbitLogs.setStatus(SendStatus.CONSUME_FAILD.getValue());
        biddingRabbitLogs.setTryTimes(biddingRabbitLogs.getTryTimes()+1);
        remoteLogsService.updateById(biddingRabbitLogs,SecurityConstants.FROM_IN);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ保证消息的一致性解决方案 的相关文章

  • 使用 JPA/ORM 生成数据库模式是一个坏主意吗?

    Salve Part of 另一个问题 答案 https stackoverflow com questions 7595578关于 SO 以及其他声称相同的声明 如果您通过 JPA 更新数据库架构 但通常不是一个好的做法 您是否真的不应该
  • 光学标记阅读器的开源库[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我想要一个用于光学标记读取 OMR 的开源库 有这样的库吗 我想在我的 java 应用程序中使用这个库 zxing 可能对你有用 http
  • 在 Java 中跨平台地播种随机生成器,无需时间

    我几乎同时在两个线程上初始化两个随机数生成器 并且我希望这两个生成器的行为完全不同 我会打电话Random nextInt 7 经常一个接一个地在两台发电机上运行 使用System currentTimeMillis 这不是一个好主意 因为
  • Java 多头中的斐波那契计算显示负值

    我的斐波那契计算器工作正常 但当数字增加时 结果会出现负值 就像它是一个Integer超过其最大值 它正在使用缓存java util Map
  • 是否可以使用检测重新定义核心 JDK 类?

    我想重新定义字节码StackOverflowError构造函数 因此当堆栈溢出发生时我有一个 钩子 我想要做的就是在构造函数的开头插入对我选择的静态方法的单个方法调用 是否有可能做到这一点 您应该能够使用两种方法之一来完成此操作 除非在过去
  • Spring Rest-API - 403 禁止错误响应

    我是 Spring 新手 我正在编写 REST API 我收到 403 删除 放置禁止错误 以下是我正在处理的示例 RequestMapping value noteId method RequestMethod PUT public Re
  • 使用 Hibernate 或 Spring 打印 DBMS_OUTPUT.put_line

    我想知道 Hibernate 或 Spring 或任何第 3 方库是否提供将 DBMS OUTPUT put line 消息直接打印到 system out 或日志文件的能力 目的是在控制台中同时显示 PLSQL 日志消息和 java 日志
  • 使用 spring security 找不到 AuthenticationProvider

    我一直在尝试使用 x509 证书通过 LDAP 对用户进行身份验证 但似乎无法正常工作 我声明了一个身份验证提供程序 但仍然抛出错误 提示没有提供程序 这是我的调试输出 INFO Initiating Jersey application
  • 将位于 jar 中的文件读取为 java.io.File 对象

    与此类似的问题已发布 但似乎没有一个答案对我的情况有帮助 我正在编写一个程序包 它使用 Google 的凭据来获取 Google Apps 用户 为此 我使用服务帐户 因此为了检索凭据 我需要提供 除其他外 一个 p12 签名文件 Cred
  • java“类文件包含错误的类”错误

    我正在尝试制作一个控制台应用程序来测试我的网络服务 我成功部署了一个网络服务http localhost 8080 WS myWS http localhost 8080 WS myWS我用 wsimport 制作了代理类 wsimport
  • 如何使用 JAVA 和 ADB 命令检查 Appium 中键盘是否打开

    我正在尝试检查 Android 默认键盘是否打开 我没有找到任何可以在 Appium 中使用 JAVA 和 ADB 命令检查键盘的内容 我发现这个 ADB 命令可以检查键盘是否打开 adb shell dumpsys input metho
  • 使用java读取Excel工作表的单列

    我有一张 Excel 表格 我想编写一个方法 该方法将参数作为要读取的列号 并返回一个由该列中的所有数据组成的数组 然后将该列元素放置在 xml 工作表中 我怎样才能编写一个方法来做到这一点 使用 Apache POI 您可以在他们的使用页
  • Apache HttpClient 执行时会在所有 HTTP 5XX 错误上抛出 IOException 吗?

    The Apache HttpClient 文档 http hc apache org httpcomponents client ga httpclient apidocs org apache http client HttpClien
  • 使用 Mockitos 传递参数化输入

    我正在使用 Mockito 进行单元测试 我想知道是否可以使用 Junit 测试中的方式发送参数化输入参数 e g InjectMocks MockClass mockClass new MockClass Test public void
  • 在同一台计算机上设置 JBoss 的多个实例

    我在 JBoss 社区中找到了下一页 http www jboss org community wiki ConfigurePorts http www jboss org community wiki ConfigurePorts 有下一
  • 我们可以用java定制一个垃圾收集器吗?

    我们知道java的垃圾收集器是一个低优先级线程 在java中我们可以创建任何具有高优先级的线程 那么是否有可能拥有我们自己定制的具有可变优先级的垃圾收集器线程 我们可以根据内存管理的级别进行设置 有人尝试过吗 如果是的话 您能分享一些关于如
  • 如何在jpa中共享EntityManagerFactory

    我是 jpa 的新手 这是场景 我正在开发一个 Web 应用程序 其中 多个用户可以登录 当 user1 注销时 我正在使用下面的代码 public static void closeEntityManagerFactory if enti
  • 如何使用 AEM 解析 org.apache.http.ssl?

    最终 我尝试在 Java 代码中使用 AWS S3 库来通过 AEM 启用服务器端 S3 上传 但在安装依赖项和 或由 AEM 识别时遇到了问题 每次我添加新的依赖项时 都会弹出五个问题 在我尝试构建的这个包中 这是我看到的错误 The i
  • 在测试期间调用预定方法[重复]

    这个问题在这里已经有答案了 我正在使用 Maven 开发 SpringBoot 应用程序 我有一个班级 Component有方法的注释m与 Scheduled initialDelay 1000 fixedDelay 5000 注解 这里f
  • 生成签名和加密的 JWT

    我正在尝试使用生成签名和加密的 JWT 令牌雨云智威汤逊 http connect2id com products nimbus jose jwt private void generateToken throws JOSEExceptio

随机推荐

  • 线程及线程的同步互斥

    目录 1 线程的简单介绍 2 同步互斥的概念 3 为什么要进行线程的同步互斥 4 信号量 5 互斥量 6 条件变量 1 线程的简单介绍 1 进程 在讲到线程之前 我们应该先了解一下进程的概念 进程 Process 是指计算机中已运行的程序
  • FRP内网穿透(linux->windows)

    使用背景 由于内网环境所在的电脑无法通过公网暴露访问 而使用类似于向日葵等其他代理工具 又存在一定的延迟卡顿 因此 决定待用Frp的内网穿透的功能 来实现借由公网服务器代理访问内网所在的电脑 原理 frp 主要由 客户端 frpc 和 服务
  • u盘魔术师给服务器装系统,U盘魔术师怎么装系统 U盘魔术师USM制作PE启动盘方法...

    U盘魔术师是一个很好用的装系统的工具 并且可以利用USM制作PE启动盘 很多用户都不太了解具体的方法 其实也非常的简单 下面小编就来给大家介绍一下U盘魔术师怎么装系统 赶紧来看看吧 U盘魔术师怎么装系统 U盘魔术师体积较大1G多如果是小水管
  • MyBatis学习笔记

    MyBatis MyBatis Mapper代理开发 MyBatis是一款优秀的持久层框架 用于简化JDBC MyBatis 持久层 负责把数据保存到数据库的那一层 JavaEE三层架构 表现层 页面展示 业务层 逻辑处理 持久层 对数据持
  • 在外SSH远程连接macOS服务器【cpolar内网穿透】

    文章目录 前言 1 macOS打开远程登录 2 局域网内测试ssh远程 3 公网ssh远程连接macOS 3 1 macOS安装配置cpolar 3 2 获取ssh隧道公网地址 3 3 测试公网ssh远程连接macOS 4 配置公网固定TC
  • 状态机的置位与复位

    1 状态机的异步置位与复位 异步置位与复位是与时钟无关的 当异步置位与复位到来时它们立即分别置触发器的输出为1或0 不需要等到时钟沿到来才置位或复位 把它们列入always块的事件控制括号内就能触发always块的执行 因此 当它们到来时就
  • Linux设置所有用户环境变量

    Linux中每个用户都要指定各自的环境变量 这样会比较麻烦 那么如何配置一个环境变量 所有的用户都可以使用呢 比如说我想把Linux默认语言由en US UTF 8修改为zh CN UTF 8 那么我需要设置环境变量 LANG 百度很多方法
  • Conda 配置 Python 环境

    文章目录 前言 一 Conda 是什么 二 如何获取 三 使用 Conda 命令配置多环境 1 创建新环境 2 激活新环境 3 配置新环境 4 退出新环境 5 检查所有环境 6 检查所有安装的包 7 删除某环境 8 重命名某环境 四 使用
  • Crontab配置任务定时执行

    一 每奇数周的周一执行 16 0 1 date W 2 eq 1 gt dev null sh data1 test sh 具体地 1 分钟字段 Minute field 16 2 小时字段 Hour field 0 3 日期字段 Day
  • 亚马逊首席技术官Werner Vogels:2023年及未来五大技术趋势预测

    近年来 随着我们经历的数次全球危机 如何借助技术解决人类棘手问题至关重要 如今 我们获取数据的来源比以往任何时候都多 包括可穿戴设备 医疗设备 环境传感器 视频捕获和其他联网设备 当这些数据与计算机视觉 机器学习和模拟仿真等云技术相结合时
  • OpenWrt目录之target

    target目录下主要是和平台有关的代码 最主要的是linux文件夹 linux文件夹的ramips中 ramips应该指的是对应cpu的架构 ramips文件夹下的就是不同系列的cpu对应的芯片的型号 进行试验一下 首先在根目录下运行ma
  • IDEA工具实用开发快捷键

    选中new ArrayList lt gt 或者光标放在new前面 按ctrl alt v 选中new ArrayList lt gt 或者光标放在new后边面 按ctrl alt 空格 ideal 工具没识别maven项目的话 右键pom
  • uni-app开发微信小程序,button通过数组的length判断disabled无效(数组length === 0写法无效)

    错误写法
  • caffe特征提取/C++数据格式转换

    Caffe生成的数据分为2种格式 Lmdb 和 Leveldb 它们都是键 值对 Key Value Pair 嵌入式数据库管理系统编程库 虽然lmdb的内存消耗是leveldb的1 1倍 但是lmdb的速度比leveldb快10 至15
  • 国产操作系统进入被彻底抛弃的时代

    当倪光南正在不断呼喊支持国产操作系统的时候 国产操作系统却迎来了噩梦 国产操作系统接连倒闭 国产操作系统进入一个被国家彻底抛弃的时代 红旗linux梦断国产操作系统 今年2月中科红旗linux因为缺钱倒闭解散了 一直以来做得最好的国产操作系
  • 图形学数学基础之基本蒙特卡罗尔积分(Monte Carlo Integration)

    作者 i dovelemon 日期 2017 07 29 来源 CSDN 主题 Monte Carlo Integration 引言 好久没有写博客了 最近一直在忙于工作 同时GLB库中关于PBR的渲染算法 一直卡住 无法实现下去 不过在这
  • dd大牛的《背包九讲》

    P01 01背包问题 题目 有N件物品和一个容量为V的背包 第i件物品的费用是c i 价值是w i 求解将哪些物品装入背包可使这些物品的费用总和不超过背包容量 且价值总和最大 基本思路 这是最基础的背包问题 特点是 每种物品仅有一件 可以选
  • 【HCNP路由交换学习指南】学习笔记丨第07章 BGP

    07 BGP BGP 的基本概念 BGP 对等体关系类型 IBGP 水平分割原则 路由黑洞问题及 BGP 同步规则 路由通告 Router ID 报文类型及格式 Open 报文 Update 报文 Keepalive 报文 Notifica
  • PaddleOCR使用笔记之模型训练

    目录 简介 模型训练 步骤一 文本检测模型 detection 1 准备训练数据集 2 下载预训练模型 模型介绍 下载预训练模型 3 开始训练 断点训练 4 模型评估 5 模型测试 6 训练模型转inference模型 步骤二 文本识别模型
  • RabbitMQ保证消息的一致性解决方案

    RabbitMQ保证消息的一致性 一 采用confirm消息确认机制及return返回机制 确保消息发送成功 二 将队列以及消息设置持久化 保证rabbitmq突然宕机消息仍然存在 三 手动确认接收消息方式 消息处理失败拒收重回队列 1 y