【RabbitMQ】Consumer之消费模式、消息确认与拒绝 - 基于AMQP 0-9-1

2023-11-11

这篇文章主要和大家分享RabbitMQ Consumer端的知识点,主要包括Consumer的消费模式,消息是如何确认以及如何拒绝的,当消息拒绝之后,如何让消息重新进入队列。

推模式

RabbitMQ支持推和拉两种消费模式,推模式就是由Broker向Consumer端推送消息。

下面是示例代码,可以比较直观的看到使用方式。

String queueName = "";
boolean autoAck = false; // 关闭自动确认
String consumerTag = "myConsumerTag"; // 消费者标签,区分同一Channel中的不同消费者
channel.basicQos(64); // 消费端能保持的最大未确认消息数

Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(Thread.currentThread().getName() + " -> consumer : " + consumerTag + " , receive message : " + new String(body));


        // 手动确认消息,envelope.getDeliveryTag()表示消息的编号,确认哪条消息
        // 第二参数:false确认单挑消息,true确认当前deliveryTag之前所有的消息
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
};


channel.basicConsume(queueName, autoAck, consumerTag, consumer);

上面的代码将autoAck设置为false,即手动确认,这样设置很重要,因为可以有效防止消息丢失的问题发生。

手动ack的逻辑是在DefaultConsumer的handleDelivery方法中,当然,在这个方法中还可以处理一些其它逻辑,比如拒绝消息,将消息重新入队等。 这个方法还有几个重载方法,使用方式一样,支持参数不同,但都很简单,小伙伴看一下接口就明白了。

推模式的优点就是消费消息的实时性比较高,但是需要评估消费能力,来控制未确认消息的缓存数量,防止客户端内存溢出。

拉模式

RabbitMQ Consumer客户端的拉模式则是主动从Borker上拉取消息,目前RabbitMQ提供的API只有get方法,而且一次只能拉取一条消息,使用方式也很简单,看下面的示例代码。

GetResponse response = channel.basicGet(QUEUE_NAME , false) ;
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);

拉模式中也有autoAck的参数,最好也设置成false,采用手动ack的方式。

拉模式的优点就是自己可以根据当前的负载控制何时可以消费消息,缺点就是消费消息的实时性低,而且有可能造成消息在Broker上的堆积。

消息确认

为保证消息正确的到达Consumer,RabbitMQ提供了Consumer端的确认机制,如何使用在上面的代码示例中已有体现,就是设置autoAck参数控制。设置为false,Broker会等收到Consumer的ack之后,才会将消息打上删除标记,然后从磁盘或者内存中删除。设置为true,Broker发出消息之后,就会把消息移除,不管Consumer是否消费到了消息。

建议autoAck设为false的另外一点,RabbitMQ会一直保存未收到ack的消息,除非消费此消息的Consumer断开连接,RabbitMQ会把消息重新投入队列。RabbitMQ不会给未确认的消息设置TTL,判断消息是否需要重新投递给Consumer的唯一依据就是消费改消息的Consumer已经断开连接。

消息拒绝

Consumer有可能收到错误的消息或者无法处理的消息,而拒绝此消息,RabbitMQ也为此场景提供了相应的API。channel.basicReject 告诉Broker,拒绝消费此消息。来看代码示例。

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("consumer : " + consumerTag + " , receive message : " + new String(body));

        // 拒绝消息,重新入队               
        channel.basicReject(envelope.getDeliveryTag(), true);
    }
};

从使用上来看,和消息确认一样,只需将回调函数中的 channel.basicAck 换成 channel.basicReject ,第一个参数的含义和上面channel.basicAck一样,第二个参数表示是否重新入队,true:重新入队,false:直接从队列中移除,不再发送给Consumer。

注意,channel.basicReject 方法一次只能拒绝一条消息。

消息批量拒绝

如果Consumer端需要批量拒绝消息,可以使用channel.nack 方法,它的multiple参数可以控制拒绝单条消息还是批量拒绝消息。具体用法和参数含义参考下面示例代码。

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("consumer : " + consumerTag + " , receive message : " + new String(body));

               
        // 第一个参数:当前消息的标识
        // 第二个参数:拒绝之后,是否重新进入队列
        // 第三个参数:是否批量拒绝,true:deliveryTag之前所有的消息都会被拒绝
        channel.basicNack(envelope.getDeliveryTag(), false, false);

    }
};

消息可重入队列

上面消息拒绝和消息批量拒绝的方法提供了参数,控制被拒绝的消息是否重新进入队列,RabbitMQ还提供了一个接口,用于直接将消息重新入队,channal.recover,它的参数默认就是true。使用的时候,需要注意,避免因为代码逻辑的问题造成消息循环入队。

下面的示例代码中重写了handleRecoverOk回调函数,当消息recover成功后,回调此函数。

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("consumer : " + consumerTag + " , receive message : " + new String(body));

        // 让消息重新入队
        channel.basicRecover(true);

    }

    // 监听消息recover是否成功
    @Override
    public void handleRecoverOk(String consumerTag) {
        System.out.println(consumerTag);
    }
};

好了,以上Consumer端关于消费模式,消息确认,消息拒绝,消息重新入队的内容了。

RabbitMQ系列文章会陆续更新,欢迎各位小伙伴关注后面的技术分享。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【RabbitMQ】Consumer之消费模式、消息确认与拒绝 - 基于AMQP 0-9-1 的相关文章

  • 列出与rabbitmq java客户端API交换的绑定

    我似乎在文档中找不到任何信息 所以我想知道是否可以通过某种方式使用 java RabbitMQ API 获取与交换相关的所有绑定 我在查询 api bindings 时正在寻找类似 http api 结果的内容 api definition
  • 无法从 docker 将 RabbitMQ 连接到我的应用程序 [重复]

    这个问题在这里已经有答案了 我目前被这个问题困扰了大约一周 确实找不到合适的解决方案 问题是 当我尝试连接到 dockerized RabbitMQ 时 它每次都会给出相同的错误 wordofthedayapp wordofthedayap
  • Django、RabbitMQ 和 Celery - 为什么在我更新开发中的 Django 代码后,Celery 会运行旧版本的任务?

    所以我有一个 Django 应用程序 它偶尔会向 Celery 发送任务以进行异步执行 我发现 当我在开发中处理代码时 Django 开发服务器知道如何自动检测代码何时发生更改 然后重新启动服务器 以便我可以看到我的更改 然而 我的应用程序
  • springrabbitmq:无法将id设置为属性?

    我有一个属性文件 其中包含队列 其值为queue name 如果我在其他请使用该属性 那么它可以工作 但如果我在 id 上使用它 那么它会失败
  • 在 Windows 10 和 PHP 7.3 中安装 AMQP

    我想在 Windows 10 中使用 PHP 7 3 安装 AMQP 以便在 symfony 4 中使用 Windows 不使用任何 apache iis nginx 并直接由 symfony 运行 一切还好 直到 我决定在项目中使用rab
  • 定义具有多种消息类型的消息传递域

    到目前为止 我见过的大多数 F 消息传递示例都使用 2 4 种消息类型 并且能够利用模式匹配将每条消息定向到其正确的处理函数 对于我的应用程序 由于处理和所需参数的不同性质 我需要数百种独特的消息类型 到目前为止 每个消息类型都是其自己的记
  • 在 Celery 工作线程中捕获 Heroku SIGTERM 以优雅地关闭工作线程

    我对此进行了大量研究 令我惊讶的是我还没有在任何地方找到一个好的答案 我正在 Heroku 上运行一个大型应用程序 并且我有某些运行很长时间处理的 celery 任务 并在任务结束时保存结果 每次我在 Heroku 上重新部署时 它都会发送
  • RabbitMQ Java 客户端自动重新连接

    当我的应用程序失去与 RabbitMQ 的连接时 我将其连接工厂设置为自动尝试并重新连接 ConnectionFactory factory new ConnectionFactory factory setUsername usernam
  • AMQP如何克服直接使用TCP的困难?

    AMQP如何克服直接使用TCP发送消息时的困难 或者更具体地说 在发布 订阅场景中 在 AMQP 中 有一个代理 该代理接收消息 然后完成将消息路由到交换器和队列的困难部分 您还可以设置持久队列 即使客户端断开连接 也可以为客户端保存消息
  • 基于多线程的 RabbitMQ 消费者

    我们有一个 Windows 服务 它监听单个 RabbitMQ 队列并处理消息 我们希望扩展相同的 Windows 服务 以便它可以监听 RabbitMQ 的多个队列并处理消息 不确定使用多线程是否可以实现这一点 因为每个线程都必须侦听 阻
  • 面向服务的架构 - AMQP 或 HTTP

    一点背景 非常大的整体 Django 应用程序 所有组件都使用相同的数据库 我们需要分离服务 以便我们可以独立升级系统的某些部分而不影响其余部分 我们使用 RabbitMQ 作为 Celery 的代理 现在我们有两个选择 使用 REST 接
  • 服务器在 pika.exceptions.StreamLostError: Stream 连接丢失后关闭

    我的队列中有一些图像 我将每个图像传递到我的 Flask 服务器 在其中完成图像处理 并在我的rabbitmq 服务器中收到响应 收到响应后 我收到此错误 pika exceptions StreamLostError 流连接丢失 104
  • 确认 RabbitMQ 消息是否有超时?

    我想设置一个超时时间 超过该超时时间后 出队的消息将自动被 NACK 拒绝 当我将消息出队时 我会等待消息通过套接字传输并且另一方确认其接收 我是否需要保留计时器列表 或者 RMQ 可以自动处理这个列表吗 private void Run
  • RabbitMQ 3.1.3 和丢失的时间戳头

    如果消息中缺少时间戳头 是否可以将代理配置为插入时间戳头 因此 如果发布客户端没有添加时间戳标头 代理是否可以插入与交易所收到消息的时刻相匹配的时间戳值 我应该在哪里寻找该配置 或者这是一个坏主意 截至2015年 原来的问题有了新的答案 这
  • 公共交通错误队列正在消耗,但仍然不为空

    我正在使用 Mastransit 3 5 0 和 RabbitMq 如果队列消费者抛出异常 则默认由 MoveExceptionToTransportFilter 处理异常并移至 error 队列 对于 error 队列 我有单独的消费者
  • 具有重新排队功能的 BasicReject 实际上去了哪里?

    这似乎是一个简单的问题 但我很难找到明确的答案 如果在 RabbitMQ 3 6 1 中我有一个如下所示的队列 5 4 3 2 1 lt head 我使用消息 1 然后执行以下操作 channel BasicReject ea Delive
  • 使用 Spring Boot 的多个 Rabbitmq 队列

    来自 Spring Boot 教程 https spring io guides gs messaging rabbitmq https spring io guides gs messaging rabbitmq 他们给出了创建 1 个队
  • 如何停止rabbitmq服务器

    我正在尝试启动一个节点应用程序 但我认为rabbitmq 妨碍了我 与此线程类似 名为 rabbit 的节点已经在运行 但也 无法连接到节点 rabbit https stackoverflow com questions 8737754
  • RabbitMq 和“致命错误:握手失败 -handshake_decode_error”

    我正在使用 Windows Server 2012 Erlang 19 2 和 RabbitMq 3 6 6 我在使用 TLS 配置端点之间的连接时遇到问题 我已经尝试了所有关于 SO 的答案 以及所有 RabbitMq 文档here ht
  • 具有延迟的简单可扩展工作/消息队列

    我需要设置一个作业 消息队列 并可以选择为任务设置延迟 以便空闲工作人员不会立即拾取它 而是在一定时间后 可能因任务而异 我研究了几个 Linux 队列解决方案 rabbitmq gearman memcacheq 但它们似乎都没有提供开箱

随机推荐

  • NMOS作为开关的两种接法

    NMOS作为开关的两种接法 1 左边电路负载是接在S极对地 如果R1很小且Q1 G极一直为High 那么流过Q1的电流可能将会非常大 MOS管容易烧 2 R1 I Us VGS Vg Vs 此时VGS不一定会大于Vgs th MOS会不完全
  • html抽奖概率,求一个可挑概率的html5抽奖 圆盘的

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼圆盘抽奖 margin 0 padding 0 elm1 height 40px background color a00 elm2 height 50px background color 0a
  • mysql库的安装

    编译文件时找不到mysql库 使用以下命令查看是否安装mysql库 dpkg l grep libmysqlclient dev 安装 sudo apt get install libmysqlclient dev 安装完成可以正常编译
  • Parallels Desktop 17 发布 针对M1大幅优化

    今天 Parallels 公司发布了 Parallels Desktop 17 它对 Windows 11 和 macOS Monterey 进行了适配优化 同时为基于Apple M1 和Intel 芯片的Mac进行图形 性能提升和生产力的
  • 【.NET8】访问私有成员新姿势UnsafeAccessor(上)

    前言 前几天在 NET性能优化群里面 有群友聊到了 NET8新增的一个特性 这个类叫 UnsafeAccessor 有很多群友都不知道这个特性是干嘛的 所以我就想写一篇文章来带大家了解一下这个特性 其实在很早之前我就有关注到这个特殊的特性
  • Windows 常用运行库下载 (DirectX、VC++、.Net Framework等)

    经常听到有朋友抱怨他的电脑运行软件或者游戏时提示缺少什么 d3dx9 xx dll 或 msvcp71 dll msvcr71 dll又或者是 Net Framework 初始化之类的错误而无法正常使用 其实很多时候 只是因为你的电脑没有安
  • kettle8 新插件开发 调试

    参考 https blog csdn net u013468915 article details 82629810 https blog csdn net zougen article details 80825751 基于eclipse
  • 【自然语言处理】大模型高效微调:PEFT 使用案例

    文章目录 一 PEFT介绍 二 PEFT 使用 2 1 PeftConfig 2 2 PeftModel 2 3 保存和加载模型 三 PEFT支持任务 3 1 Models support matrix 3 1 1 Causal Langu
  • 从新建项目到打包成APK(Cocos2d-x 2.2.1)

    好久没有更新Cocos2d x的学习文章了 最近在整理大学期间做过的东西 同时也新做了几个Cocos2d x的小程序 并且在网上搜索了不少资料 终于成功地打包成APK了 并在两个Android手机上成功运行 小for的环境是Windows8
  • DE-FAKE: Detection and Attribution ofFake Images Generated by Text-to-Image Generation Models

    一 文章信息 论文名称 DE FAKE Detection and Attribution of Fake Images Generated by Text to Image Generation Models 作者团队 二 主要创新 本文
  • 用python-opencv实现简单的人脸检测(代码+理论知识)

    目录 1 理论知识 1 安装opencv 2 opencv人脸检测器 3 加载人脸分类器 2 代码介绍 1 用摄影头调用图像 2 选择图片 3 完整代码 1 理论知识 1 安装opencv 本文实现人脸目标检测的方法是opencv图像采集
  • 华为云CodeArts DevSecOps系列插件——助力更高效的软件研发

    HDC期间入驻华为云 可参与Toolkit插件抽奖活动 活动链接在文末 一 前言 DevOps的概念想必大家都不陌生 它是一组过程 方法与系统的统称 通过它可以对交付速率 协作效率 部署频率速率 质量 安全和可靠性等进行提升改善 相比传统的
  • 基于NAR神经网络的化工产品价格预测的实现(Matlab)

    clear all clc 清除环境 xlsread styrene xls 读取苯乙烯价格序列 ans 系统会显示具体值 styrene ans 变换为行向量 lag 3 自回归阶数 iinput styrene n length iin
  • JSP实现简单的两数加法运算

    Request对象的作用 1 获取http请求行中信息 请求方式和请求路径 2 获取客户端信息 如ip 3 获取请求资源路径 4 域对象 具体需求 在浏览器上实现两个整数的加法运算 利用request携带参数实现加法运算 测试代码如下
  • Layui实现点击文字、缩略图查看图片功能

    刚完成一个客户需求 同一个页面上要有点击缩略图查看大图功能 也有点击图片名称查看原图的功能 点击缩略图查看大图的功能 点击缩略图查看大图的功能实现用的是layui开发文档内的layer photos 相册层 官方开发文档里photos支持传
  • weboffice 6版本实现在线word

    公司最近需要开发一个在线word功能 开始用pageoffice开发的功能被否决因为pageoffice的版权问题 后采用点聚weboffice的免费版开发 因为多个页面需要使用在线word功能 对于weboffice提供的activeX对
  • PHP代码审计示例(一)——淡然点图标系统SQL注入漏洞审计

    今天继续给大家介绍渗透测试相关知识 本文主要内容是PHP代码审计示例 淡然点图标系统SQL注入漏洞审计 免责声明 本文所介绍的内容仅做学习交流使用 严禁利用文中技术进行非法行为 否则造成一切严重后果自负 再次强调 严禁对未授权设备进行渗透测
  • Intellij多行同时缩进或者同时空格

    在使用JetBrains旗下的集成软件 如IDEA Pycharm PhpStorm Clion等时 通常需要整体向前或者向后缩进代码 以更加美观地编写代码 此时 可通过以下两个快捷键实现该功能 1 代码整体向后缩进 选中多行代码 按下ta
  • day9:JAVA中while的用法

    一 while循环 while循环是先判断条件是否为真 如果条件为真 则执行循环体 语句形式 while 循环条件 一条语句 多条语句 循环体 二 do while循环 do while循环是先执行循环体 再根据条件确定是否能在执行循环体
  • 【RabbitMQ】Consumer之消费模式、消息确认与拒绝 - 基于AMQP 0-9-1

    这篇文章主要和大家分享RabbitMQ Consumer端的知识点 主要包括Consumer的消费模式 消息是如何确认以及如何拒绝的 当消息拒绝之后 如何让消息重新进入队列 推模式 RabbitMQ支持推和拉两种消费模式 推模式就是由Bro