rabbitmq的发布确认和事务

2023-11-15

confirm的工作机制

       ‍ Confirms是增加的一个确认机制的类,继承自标准的AMQP。这个类只包含了两个方法:confirm.select和confirm.select-ok。另外,basic.ack方法被发送到客户端。

        

       ‍ confirm.select是在一个channel中启动发布确认。注意:一个具有事务的channel不能放入到确认模式,同样确认模式下的channel不能用事务。

        

        当confirm.select被发送/接收。发布者/broker开始计数(首先是发布然后confirm.select被记为1)。一旦channel为确认模式,发布者应该期望接收到basic.ack方法,delivery-tag属性显示确认消息的数量。

        

        当broker确认了一个消息,会通知发布者消息被成功处理;

        

       ‍ basic的规则是这样的:

        一个未被路由的具有manadatory或者immediate的消息被正确确认后触发basic.return;

        另外,一个瞬时态的消息被确认目前已经入队;

        持久化的消息在持久化到磁盘或者每个队列的消息被消费之后被确认。

        

        关于confirm会有一些问题:

        首先,broker不能保证消息会被confirm,只知道将会进行confirm

        第二,当未被确认的消息堆积时消息处理缓慢,对于确认模式下的发布,broker会做几个操作,日志记录未被确认的消息

        第三,如果发布者与broker之间的连接删除了未能得到确认,它不一定知道消息丢失,所以可能会发布重复的消息

        最后,如果在broker中发生坏事会导致消息丢失,将会basic.nack那些消息

        

        总之,Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,哪些可能因为broker宕掉或者网络失败的情况而重新发布

确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。

 

事务

Spring AMQP做的不仅仅是回滚事务,而且可以手动拒绝消息,如当监听容器发生异常时是否重新入队

持久化的消息是应该在broker重启前都有效。如果在消息有机会写入到磁盘之前broker宕掉,消息仍然会丢失。在某些情况下,这是不够的,发布者需要知道消息是否处理正确。简单的解决方案是使用事务,即提交每条消息

案例:

  RabbitTemplate的使用案例(同步),由调用者提供外部事务,在模板中配置了channe-transacted=true。通常是首选,因为它是非侵入性的(低耦合)

 <rabbit:template id="rabbitTemplate"  connection-factory="cachingConnectionFactory"
exchange="sslexchange" channel-transacted="true"/>
@Transactional
public void doSomething() {
     ApplicationContext context =
             new GenericXmlApplicationContext("spring-amqp-test.xml");
     RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
     String incoming = (String) rabbitTemplate.receiveAndConvert();
     // do some more database processing...
     String outgoing = processInDatabaseAndExtractReply(incoming);
     //数据库操作中如果失败了,outgoing这条消息不会被发送,incoming消息也会返回到broker服务器中,因为这是一条事务链。
    //可做XA事务,在消息传送与数据库访问中共享事务。
    rabbitTemplate.convertAndSend(outgoing);
}
private String processInDatabaseAndExtractReply(String incoming){
     return incoming;
}

 

 异步使用案例(外部事务)

<bean id="rabbitTxManage" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
     <property name="connectionFactory" ref="cachingConnectionFactory"></property>
</bean>
<rabbit:listener-container  connection-factory="cachingConnectionFactory" transaction-manager="rabbitTxManage" channel-transacted="true">
     <rabbit:listener ref="foo" method="onMessage" queue-names="rabbit-ssl-test"/> 
</rabbit:listener-container>

 

在容器中配置事务时,如果提供了transactionManager,channelTransaction必须为true;如果为false,外部的事务仍然可以提供给监听容器,造成的影响是在回滚的业务操作中也会提交消息传输的操作

使用事务有两个问题:

Ø  一是会阻塞,发布者必须等待broker处理每个消息。如果发布者知道在broker死掉之前哪些消息没有被处理就足够了

Ø  第二个问题是事务是重量级的,每次提交都需要fsync(),需要耗费大量的时间

confirm模式下,broker将会确认消息并处理。这种模式下是异步的,生产者可以流水式的发布而不用等待broker,broker可以批量的往磁盘写入

发布确认

发布确认必须配置在CachingConnectionFactory上

<bean id="cachingConnectionFactory" 
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
     <property name="host" value="192.168.111.128"></property>
     <property name="port" value="5672"></property>
     <property name="username" value="admin"/>
     <property name="password" value="admin"/>
     <property name="publisherConfirms" value="true"/>
     <property name="publisherReturns" value="true"/>
</bean>

 

若使用confirm-callback或return-callback,必须要配置publisherConfirms或publisherReturns为true

每个rabbitTemplate只能有一个confirm-callback和return-callback

 //确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端ack。
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
     @Override
     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
         if (ack) {
             System.out.println("消息确认成功");
         } else {
             //处理丢失的消息(nack)
            System.out.println("消息确认失败");
         }
     }
});

 

使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true,可针对每次请求的消息去确定’mandatory’的boolean值,只能在提供’return -callback’时使用,与mandatory互斥

 rabbitTemplate.setMandatory(true);
//确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端ack。
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
     @Override
     public void returnedMessage(Message message, int replyCode, String replyText,
                                 String exchange, String routingKey) {
         //重新发布
         RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(errorTemplate,"errorExchange", "errorRoutingKey");
         Throwable cause = new Exception(new Exception("route_fail_and_republish"));
         recoverer.recover(message,cause);
         System.out.println("Returned Message:"+replyText);
     }
});

errorTemplate配置:
<rabbit:queue id="errorQueue" name="errorQueue" auto-delete="false" durable="true">
        <rabbit:queue-arguments>
            <entry key="x-ha-policy" value="all"/>
            <entry key="ha-params" value="1"/>
            <entry key="ha-sync-mode" value="automatic"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:direct-exchange id="errorExchange" name="errorExchange" auto-delete="false" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="errorQueue" key="errorRoutingKey"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
 <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="200" />
                <property name="maxInterval" value="30000" />
            </bean>
        </property>
        <property name="retryPolicy">
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                <property name="maxAttempts" value="5"/>
            </bean>
        </property>
    </bean>
<rabbit:template  id="errorTemplate" connection-factory="cachingConnectionFactory" exchange="errorExchange" queue="errorQueue" routing-key="errorRoutingKey" retry-template="retryTemplate" />

 同一个连接不同channel使用事务和发布确认

 

private RabbitTemplate rabbitTemplate;

private TransactionTemplate transactionTemplate;
 @Before
public void init() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("192.168.111.128");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("admin");
    connectionFactory.setPassword("admin");
    template = new RabbitTemplate(connectionFactory);
    template.setChannelTransacted(true);
    RabbitTransactionManager transactionManager = new RabbitTransactionManager(connectionFactory);
    transactionTemplate = new TransactionTemplate(transactionManager);
    connectionFactory.setPublisherConfirms(true);
    rabbitTemplate = new RabbitTemplate(connectionFactory);
}
//发布确认测试
@Test
public void testPublishConfirm(){
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                System.out.println("消息确认成功");
            }else{
                System.out.println("消息确认失败");
            }
        }
    });
    //发送到一个不存在的exchange,则会触发发布确认
    rabbitTemplate.convertAndSend("asd","aaa","message");
    String message = (String) rabbitTemplate.receiveAndConvert(ROUTE);
    assertEquals("message",message);
}
//事务测试
@Test
public void testSendAndReceiveInTransaction() throws Exception {
    //由于有spring的事务参与,而发送操作在提交事务时,是不允许除template的事务有其他事务的参与,所以这里不会提交
    //队列中就没有消息,所以在channel.basicGet时命令返回的是basic.get-empty(队列中没有消息时),而有消息时,返回basic.get-ok
    String result = transactionTemplate.execute(new TransactionCallback<String>() {
        @Override
        public String doInTransaction(TransactionStatus status) {
            template.convertAndSend(ROUTE, "message");
            return (String) template.receiveAndConvert(ROUTE);
        }
    });
    //spring事务完成,对其中的操作需要提交,发送与接收操作被认为是一个事务链而提交
    assertEquals(null, result);
    //这里的执行不受spring事务的影响
    result = (String) template.receiveAndConvert(ROUTE);
    assertEquals("message", result);
}

 

转载于:https://my.oschina.net/lzhaoqiang/blog/670749

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rabbitmq的发布确认和事务 的相关文章

随机推荐

  • MarkDown超级教程 Obsidian版_11.4

    date 2021 11 3 18 01 aliases Markdown教程 MD教程 tags Markdown 什么是 Markdown Markdown 是一款轻量级标记语言 不同于HTML Hypertext Markup Lan
  • 修改notebook的默认路径_Anaconda3修改jupyter_notebook打开的默认路径

    1 windows下 找到jupyter notebook配置文件jupyter notebook config py 默认安装在 C Users Administrator jupyter jupyter notebook config
  • GB28181协议EasyGBS国标视频云平台无法正常启动的排查步骤与解决方法

    EasyGBS国标视频云服务是基于国标GB T28181协议的视频能力平台 可实现的视频功能包括 实时监控直播 录像 检索与回看 语音对讲 云存储 告警 平台级联等功能 平台部署简单 可拓展性强 支持将接入的视频流进行全终端 全平台分发 分
  • upload-labs

    pass 01 先发一个后缀名为PHP的文件 发现不能上传 然后禁用js 说明 js就是所谓的客户端脚本语言 是一种在互联网浏览器 浏览器也称为Web客户端 因为它连接到Web服务器上 以下载页面 内部运行的计算机编程语言 普通网页内都会插
  • React新出来两个钩子函数是什么?和删掉的will系列有什么区别?

    React新出来两个钩子函数是什么 和删掉的will系列有什么区别 react16废弃的生命周期有3个will componentWillMount componentWillReceiveProps componentWillUpdate
  • SolidWorks二次开发语法技巧及基础

    语法 变量 HRESULT 接口返回值 用于异常调用时判断 本质 typedef LONG HRESULT 32位 S OK S FALSE OLECHAR 特定平台上表示文本数据 win32内 定义为 wchar t 16 或32 位 c
  • 正大国际:做期货交易的方法

    以多为例 空则反之 1 顺势交易 这话估计听的耳朵都起茧子了 但是还是要强调 顺势 顺势 不要抄底 不要摸顶 2 势的判断 做明晃晃的趋势 一眼就看出在上涨的 你就是找个小学生 老太太等完全不懂交易的 让他看走势图 他都知道在上涨 这就是明
  • 区块链知识转载博文1: 共识算法之争(PBFT,Raft,PoW,PoS,DPoS,Ripple)

    注 这是本人读到的关于共识算法最全和最好的分享博文 系统的介绍了拜占庭容错技术以及共识算法的原理和常用共识算法 原文链接请见后 目录 一 拜占庭容错技术 Byzantine Fault Tolerance BFT 二 PBFT Practi
  • 智慧“昆明”在路上 未来充满精彩

    智慧城市是运用物联网 云计算 大数据 移动互联网 空间地理信息集成等新一代信息技术 促进城市规划 建设 管理和服务智慧化的新理念和新模式 近年来 昆明市全面加快智慧城市建设 力争通过三年的努力 打造区域信息辐射中心的核心区 生态 融合发展的
  • Spring Boot系列四 Spring @Value 属性注入使用总结一

    Value注入 不通过配置文件的注入属性的情况 通过 Value将外部的值动态注入到Bean中 使用的情况有 注入普通字符串 注入操作系统属性 注入表达式结果 注入其他Bean属性 注入beanInject对象的属性another 注入文件
  • C语言if选择练习题

    C语言实现输出对应成绩的等级 include
  • hdu 5756:Boss Bo

    题目链接如下 Problem 5756 先用dfs确定每个节点的序号编号 并且可以获得每个节点可以包括的子树节点区间范围 再用线段树建立一棵树 在第一次建立的时候我们记录每个节点的深度 然后再进行一次dfs 这次dfs用来更新以不同节点为根
  • 【实验九】【使用触发器实现数据完整性】

    文章目录 触发器 一 实现域完整性 二 实现参照完整性 三 比较约束与触发器的执行顺序 Reference 触发器 触发器 trigger 是用户定义在关系表上的一类由事件驱动的特殊过程 触发器又叫做事件 条件 动作 event condi
  • ThinkPHP6.0 多应用模式 部署 Layuiadmin 单页版

    QQ 23426945 PHP技术群 159789818 个人技术博客 https www itqaq com TP6 0中的路由省略应用名只能用入口文件绑定应用 和 域名绑定应用 经过测试 最后得出域名绑定应用是最合适的部署方式 如果有更
  • React源码分析3-render阶段(穿插scheduler和reconciler)

    本章将讲解 react 的核心阶段之一 render阶段 我们将探究以下部分内容的源码 更新任务的触发 更新任务的创建 reconciler 过程同步和异步遍历及执行任务 scheduler 是如何实现帧空闲时间调度任务以及中断任务的 触发
  • SpringBoot 如何实现多数据源

    SpringBoot 如何实现多数据源 第一步 配置yml spring datasource type com alibaba druid pool DruidDataSource datasource1 url jdbc mysql 1
  • vue毕业设计分享 100例 (五)

    文章目录 前言 题目1 基于SSM的婚纱摄影业务系统 br 题目2 基于SSM的家教网课学习平台 br 题目3 基于SSM的家庭美食食谱管理系统 br 题目4 基于SSM的驾校预约培训管理系统 br 题目5 基于SSM的教师评价考核管理系统
  • 远程登陆之SSH的简单用法及命令

    SSH简单使用 SSH的安装 启动服务器的SSH服务 SSH远程登陆 口令登陆 公钥登陆 配置别名 传输文件 SSH只是一种协议 存在多种实现 既有商业实现 也有开源实现 OpenSSH是一种免费开源实现 OpenSSH是用于使用SSH协议
  • zabbix监控硬件

    一 通过snmp监控 1 idrac上开启snmp服务 2 然后到zabbix server服务器上测试一下是否能get到数据 snmpget v 2c c 1 3 6 1 4 1 674 10892 2 1 1 2 0 3 添加主机 二
  • rabbitmq的发布确认和事务

    2019独角兽企业重金招聘Python工程师标准 gt gt gt confirm的工作机制 Confirms是增加的一个确认机制的类 继承自标准的AMQP 这个类只包含了两个方法 confirm select和confirm select