RabbitMq基础篇-09-channel接口常用几种参数详解

2023-11-02

1. 背景概述

为了保证消息从队列可靠地达到消费者, RabbitMQ 提供了消息确认机制( message acknowledgement), 消费者在订阅队列时,可以指定autoAck参数,

  • 当autoAck 等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除) 。
  • 当autoAck 等于true时,RabbitMQ 会自动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

采用消息确认机制后,只要设置autoAck 参数为false ,消费者就有足够的时间处理消息(任务) ,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ 会一直等待持有消息直到消费者显式调用Basic.Ack 命令为止。

当autoAck 参数置为false ,对于RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息:一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。如果RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经断开连接, 则RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否己经断开, 这么设计的原因是RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

RabbtiMQ 的Web 管理平台(15672端口)上可以看到当前队列中的" Ready" 状态和"Unacknowledged" 状态的消息数,分别对应上文中的等待投递给消费者的消息数和己经投递给消费者但是未收到确认信号的消息数

也可以通过相应的命令来查看上述信息:

rabbitmqctl list_queues name messages_ready messages_unacknowledged

在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,那么应该怎么做呢?

RabbitMQ 在2 .0.0 版本开始引入了Basic.Reject 这个命令,消费者客户端可以调用与其对应的channel.basicReject 方法来告诉RabbitMQ 拒绝这个消息。

Channel 类中的basicReject 方法定义如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

其中deliveryTag 可以看作消息的编号,它是一个64 位的长整型值,最大值是9223372036854775807, 如果requeue 参数设置为true ,则RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者, 如果requeue 参数设置为false ,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。

Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack 这个命令, 消费者客户端可以调用channel.basicNack 方法来实现,方法定义如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

其中deliveryTag 和requeue 的含义可以参考basicReject 方法。multiple 参数设置为false 则表示拒绝编号为deliveryTag的这一条消息,这时候basicNack 和basicReject 方法一样;multiple 参数设置为true 则表示拒绝deliveryTag 编号之前所有未被当前消费者确认的消息。

注意要点:

  • 将channel.basicReject 或者channel.basicNack 中的requeue 设置为false ,可以启用"死信队列"的功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。
    对于requeue , AMQP 中还有一个命令Basic.Recover 具备可重入队列的特性。其对应的客户端方法为:

1.Basic.RecoverOk basicRecover() throws IOException;
2.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

这个channel.basicRecover 方法用来请求RabbitMQ 重新发送还未被确认的消息。如果requeue 参数设置为true, 则未被确认的消息会被重新加入到队列中, 这样对于同一条消息来说,可能会被分配给与之前不同的消费者。如果requeue 参数设置为false ,那么同一条消息会被分配给与之前相同的消费者, 默认情况下,如果不设置requeue 这个参数,相当于channel.basicRecover(true) ,即requeue 默认为true

2. 通常参数解释

  • consumerTag:会话的标签,是固定的 ;
  • deliveryTag : 每次接收消息+1,可以做此消息处理通道的名字。
    因此 deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)。

3. Channel一些Api解释

3.1. basicNack 不确认消息

    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

简单理解就是: 不确认deliveryTag对应的消息

  • 参数1: 消息
  • 参数2: 是否应用于多消息
  • 参数3: 是否重新放回队列,否则丢弃或者进入死信队列

第二个参数,怎么理解basic.nack多消息,比如现在有多条消息去调用这个nack方法,他是怎么执行的?

  • 有个先后顺序,就是调用nack时,之前所有没有ack的消息都会被标记为nack,多条消息同时调用,则调用的这个语句执行前,如果还有未执行回复确认的消息就会被回复nack,后续的消息回复nack可能只作用于当条消息。

注意: nack后的消息也会被自己消费

3.2. basicReject 拒绝消息

void basicReject(long deliveryTag, boolean requeue) throws IOException;

简单理解就是:拒绝deliveryTag对应的消息

  • 参数1: 消息
  • 参数2: 是否重新放回队列,否则丢弃或者进入死信队列

区别在于:

  • basicReject一次只能拒绝接收一个消息
  • basicNack方法可以支持一次0个或多个消息的拒收

3.3. RecoverOk 是否恢复消息到队列

Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。
false则消息会重新被投递给自己。

3.4. exchangeDeclare 声明交换机

有多个重载方法,这些方法都是由下面这个方法中的缺省参数构成的

Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String,Object> arguments) throws IOException;
  • exchange:交换机名称
  • type:交换机类型 有direct、fanout、topic三种
  • durable:设置是否持久化。durable设置true表示持久化 ,服务器重启会将Exchange(交换机)存盘。
    注意:仅设置此选项,不代表消息持久化。即不保证重启后消息还在。
  • autoDelete: 设置是否自动删除 。.当已经没有消费者时,服务器是否可以删除该Exchange。自动删除的前提是至少有一个队列或者交换机与这个交换器绑定的队列或者交换器都与之解绑;
  • internal:设置是否内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
  • argument:其他一些结构化参数,比如alternate-exchange

3.5. queueDeclare 声明队列

  Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
  • 队列的名字
  • 队列里面的消息是否支持持计化
  • 设置该队列,是否可以供对个消费者消费
  • 是否自动删除消息
  • 其他参数

3.6. queueBind 绑定队列

 Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
  • queue: 队列名
  • exchange: 交换器名称
  • routingKey :路由key或者绑定key
  • arguments: 一些参数

3.7. queueUnbind 解绑队列

Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
  • queue: 队列名
  • exchange: 交换器名称
  • routingKey :路由key或者绑定key
  • arguments: 一些参数

3.8. exchangeBind 绑定交换机

  Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
  • destination :目标交换器
  • source :源交换器
  • routingKey 路由key
  • arguments: 一些相关参数

消息从source交换器转发到destination交换器存储在destination绑定的队列queue中

3.9. exchangeUnbind 解绑交换机

 Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
  • destination :目标交换器
  • source :源交换器
  • routingKey 路由key
  • arguments: 一些相关参数

3.10. basicQos 消息流量

有多个重载方法,这些方法都是由下面这个方法中的缺省参数构成的,

void basicQos(int prefetchSize, int prefetchCount, boolean global)
  • param1:prefetchSize,消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制
  • param2:prefetchCount,告诉rabbitmq不要一次性给消费者推送大于N个消息
  • param3:global,是否将上面的设置应用于整个通道
    • false:表示只应用于当前消费者
    • true:表示当前通道的所有消费者都应用这个限流策略

消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。队列中没有被消费的消息不会被删除,还是存在于队列中。一般和channel.basicAck配套使用

3.11. basicAck 消息确认

 void basicAck(long deliveryTag, boolean multiple) throws IOException
  • deliveryTag:该消息的index
  • multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

3.12. basicConsume 消息消费

该重载方法有点多,具体我就不列举了,参数解释一下:

  • queue:队列名
  • autoAck:是否自动确认消息
    - deliverCallback: 当一个消息发送过来后的回调接口
    - cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
    - callback: 消费者对象的回调接口
    - shutdownSignalCallback: 当channel/connection 关闭后回调
    - arguments: 消费的一组参数
    - consumerTag: 客户端生成的用于建立上线文的使用者标识
    - nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记
    - exclusive: 如果是单个消费者,则为true

启动一个消费者,并返回服务端生成的消费者标识

3.13. basicPublish 发布消息

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
  • exchange:要将消息发送到的Exchange(交换器)
  • routingKey:路由的 key 是哪个
  • 其他参数
    • mandatory:true 如果mandatory标记被设置
    • immediate: true 如果immediate标记被设置,注意:RabbitMQ服务端不支持此标记
    • props:其它的一些属性,如:{@linkMessageProperties.PERSISTENT_TEXT_PLAIN}
  • body:发送消息的消息体

3.14. basicGet 主动拉取队列中的一条消息

GetResponse basicGet(String queue, boolean autoAck)
  • 参数1: 队列名
  • 参数2: 是否自动确认

3.15. basicCancel 取消消费者对队列的订阅关系

void basicCancel(String consumerTag)
  • consumerTag:服务器端生成的消费者标识

4. 消息确认一些观点

  • 消息监听内必须使用channel对消息进行确认,不管是确认消费成功还是确认消费失败
  • 消息监听内的异常处理有两种方式:
    • 内部catch后直接处理,然后使用channel对消息进行确认
    • 配置RepublishMessageRecoverer将处理异常的消息发送到指定队列专门处理或记录
  • 监听的方法内抛出异常貌似没有太大用处。因为抛出异常就算是重试也非常有可能会继续出现异常,当重试次数完了之后消息就只有重启应用才能接收到了,很有可能导致消息消费不及时。当然可以配置RepublishMessageRecoverer来解决,但是万一RepublishMessageRecoverer发送失败了呢。。那就可能造成消息消费不及时了。所以即使需要将处理出现异常的消息统一放到另外队列去处理,个人建议两种方式:
    • catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
    • 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMq基础篇-09-channel接口常用几种参数详解 的相关文章

  • 过期的消息不会从 RabbitMQ 中删除

    我通过生产者向 RabbitMQ 发送一条普通消息 然后发送第二条消息expiration属性分配给一个值 然后使用rabbitmqctl list queues命令我监视消息的状态 我发现如果我先发送一条普通消息 然后发送一条消息expi
  • Spring AMQP + RabbitMQ 3.3.5 ACCESS_REFUSED - 使用身份验证机制 PLAIN 拒绝登录

    我遇到以下异常 org springframework amqp AmqpAuthenticationException com rabbitmq client AuthenticationFailureException ACCESS R
  • 在 RabbitMQ 监听器中隐藏运行时异常

    在某些故意发生的情况下 我使用了一些异常来拒绝消息 但在控制台中显示了乍一看似乎不太正常的异常 如何在登录控制台 文件时隐藏该特定异常 我正在使用 spring boot 和默认记录器 public static class Undispa
  • 每次发布后我应该关闭通道/连接吗?

    我在 Node js 中使用 amqplib 但我不清楚代码中的最佳实践 基本上 我当前的代码调用amqp connect 当 Node 服务器启动时 然后为每个生产者和每个消费者使用不同的通道 而不会真正关闭它们中的任何一个 我想知道这是
  • 为什么需要消息队列来与 Web 套接字聊天?

    我在互联网上看到了很多使用 Web 套接字和 RabbitMQ 进行聊天的示例 https github com videlalvaro rabbitmq chat https github com videlalvaro rabbitmq
  • springrabbitmq:无法将id设置为属性?

    我有一个属性文件 其中包含队列 其值为queue name 如果我在其他请使用该属性 那么它可以工作 但如果我在 id 上使用它 那么它会失败
  • 在 Windows 10 和 PHP 7.3 中安装 AMQP

    我想在 Windows 10 中使用 PHP 7 3 安装 AMQP 以便在 symfony 4 中使用 Windows 不使用任何 apache iis nginx 并直接由 symfony 运行 一切还好 直到 我决定在项目中使用rab
  • RabbitMQ 上的 Nack 和拒绝

    我想处理消费者从队列中获取的不成功的消息并将它们重新排队 想象一下我有这样的情况 P gt foo bar baz gt C 其中 foo bar 和 baz 是消息 如果消费者读到baz但出了问题 我可以使用basic reject or
  • 在 Celery 工作线程中捕获 Heroku SIGTERM 以优雅地关闭工作线程

    我对此进行了大量研究 令我惊讶的是我还没有在任何地方找到一个好的答案 我正在 Heroku 上运行一个大型应用程序 并且我有某些运行很长时间处理的 celery 任务 并在任务结束时保存结果 每次我在 Heroku 上重新部署时 它都会发送
  • 更改 RabbitMQ 队列中的参数

    我有一个 RabbitMQ 队列 最初声明如下 var result channel QueueDeclare NewQueue true false false null 我正在尝试添加死信交换 因此我将代码更改为 channel Exc
  • RabbitMQ:如何创建和恢复备份

    我是 RabbitMQ 的新手 我需要一些帮助 如何备份和恢复到RabbitMQ 以及我需要保存哪些重要数据 谢谢 如果您安装了管理插件 您可以在Overview页 在底部你会看到导入 导出定义您可以使用它来下载代理的 JSON 表示形式
  • 服务器在 pika.exceptions.StreamLostError: Stream 连接丢失后关闭

    我的队列中有一些图像 我将每个图像传递到我的 Flask 服务器 在其中完成图像处理 并在我的rabbitmq 服务器中收到响应 收到响应后 我收到此错误 pika exceptions StreamLostError 流连接丢失 104
  • RabbitMQ:无法启动rabbitmq_management插件

    Version gt sudo rabbitmqctl status grep rabbit RabbitMQ rabbit RabbitMQ 3 5 6 Error gt sudo rabbitmq plugins enable rabb
  • Celery 广播 vs RabbitMQ 扇出

    我最近一直在使用 Celery 但我不喜欢它 它的配置很混乱 过于复杂并且文档记录很少 我想用 Celery 从单个生产者向多个消费者发送广播消息 让我困惑的是 Celery 术语和底层传输 RabbitMQ 术语之间的差异 在 Rabbi
  • RabbitMQ 3.1.3 和丢失的时间戳头

    如果消息中缺少时间戳头 是否可以将代理配置为插入时间戳头 因此 如果发布客户端没有添加时间戳标头 代理是否可以插入与交易所收到消息的时刻相匹配的时间戳值 我应该在哪里寻找该配置 或者这是一个坏主意 截至2015年 原来的问题有了新的答案 这
  • Django Celery 和多个数据库(Celery、Django 和 RabbitMQ)

    是否可以设置与 Django Celery 一起使用的不同数据库 我有一个配置了多个数据库的项目 并且不希望 Django Celery 使用默认数据库 如果我仍然可以使用 django celery 管理页面并读取存储在这个不同数据库中的
  • 如何停止rabbitmq服务器

    我正在尝试启动一个节点应用程序 但我认为rabbitmq 妨碍了我 与此线程类似 名为 rabbit 的节点已经在运行 但也 无法连接到节点 rabbit https stackoverflow com questions 8737754
  • RabbitMQ-在一个应用程序进程中为单个队列创建多个消费者是一种好习惯吗

    我刚刚处理一个由 RabbitMQ 支持的新项目 并且在应用程序启动时创建了多个监听同一个队列的消费者实例 然而 它们与不同的渠道共享相同的连接 来自队列的消息非常庞大 一次生成行为有数百万条消息 因此我猜第一个代码作者正在尝试做一些事情来
  • Celery 设计帮助:如何防止并发执行任务

    我对 Celery AMQP 相当陌生 正在尝试提出一个任务 队列 工作人员设计来满足以下要求 我有多种类型的 每用户 任务 例如 TaskA TaskB TaskC 这些 每用户 任务中的每一个都为系统中的一个特定用户读取 写入数据 因此
  • 如何让Spring RabbitMQ创建一个新的队列?

    根据我对rabbit mq的 有限 经验 如果您为尚不存在的队列创建新的侦听器 则会自动创建该队列 我正在尝试将 Spring AMQP 项目与rabbit mq 一起使用来设置侦听器 但出现错误 这是我的 xml 配置

随机推荐

  • google Guava之EventBus

    文章目录 EventBus基本用法 1 创建Listener 2 创建EventBus并发送消息 Listener之间的继承关系 Subscriber 不同类型参数的Subscribe event 继承关系的event DeadEvent
  • 树莓派3B安装python3 opencv环境

    树莓派3b 若干年前买的 软件具体配置不记得 python缺省版本为3 53 直接用pip命令安装不成功 显示一堆的红字 update upgrade一下 耗时挺长 再安装 成功了 如下图 sudo apt get update sodu
  • C++11 并发指南系列

    本系列文章主要介绍 C 11 并发编程 计划分为 9 章介绍 C 11 的并发和多线程编程 分别如下 C 11 并发指南一 C 11 多线程初探 本章计划 1 2 篇 已完成 1 篇 C 11 并发指南二 std thread 详解 本章计
  • 微信小程序:cavans的使用(一)

    最近用到这个canvas画布这个组件 其实还是挺有意思的 优点是可以给用户绘制宣传海报什么的 这个现在好多小程序都有这个 比如给你一个专属海报有你的头像还有你的二维码 你可以保存在你的相册里 你可以取拿它去拉人 微信小程序canvas组件官
  • 因为好兄弟一句话竞。。。

    职教云查同学考试成绩 本来在打游戏 好兄弟微信问了我一句职教云未公开的考试能不能 打包好的软件下载地址在文章后面 然后在想 考试未公开 只有那老师能看班里同学成绩 先试了抓一下包 发现在学生端啥也没有只能获得考试的ID 好家伙 突然灵光一闪
  • 继承与多态(Java实现)

    继承和多态 一 类的继承 1 继承的实现 语法格式 class 子类名 extends 父类名 类体 注 extends是关键字 Java中只支持单继承 所以子类只有一个父类 但可以多层继承 子类通过继承可以获得父类的public prot
  • PCL 计算点云的高斯曲率和平均曲率

    目录 一 算法原理 二 代码实现 三 结果展示 四 相关链接 一 算法原理 已知某点的主曲率为 k 1 k 2 k 1 k 2 k
  • 基于Opencv实现的多彩隔空画图

    1 问题概述 人工智能带火了计算机视觉的人才需求 作为计算机视觉应用开发框架OpenCV也越来越受到欢迎 市场需求大增 因此 在学习Python的基础上 进行Opencv技术的学习是十分重要且有必要的 该项技术不仅有着成熟的学习框架 更有广
  • 淘宝应对"双11"的技术架构分析

    http www uml org cn zjjs 201311272 asp http www uml org cn 火龙果软件
  • Ubuntu 14.04安装TeamViewer

    1 到官网下载teamviewer的deb包 2 拷贝到 下 打开文件 双击 deb包 3 在Ubuntu软件中心中点击安装 如果缺少依赖包 sudo apt get install f修复依赖关系
  • UAV021(五):STM32F429实现TIM6计时、TIM3输出4路PWM波、TIM5输入捕获

    目录 序 一 STM32F4定时器介绍 二 定时器配置 2 1 TIM6实现计数 2 2 TIM3输出4路PWM波 2 3 TIM5输入捕获 序 现在需要实现计时 输出PWM和输入捕获 其中计时实现0 1ms加1 用于陀螺仪积分计时 输出4
  • 离线win7上用anaconda离线创建虚拟环境

    本文所需文件的下载路径为 百度云链接 https pan baidu com s 14S xkERRhQVNfV dauVYzw 提取码 5hzn 第一步 安装anaconda win7系统不支持python3 9 因此不能在win7系统上
  • 凭借这5步,我30分钟学会了Python爬虫

    在不同公司的许多人可能出于各种原因需要从Internet收集外部数据 分析竞争 汇总新闻摘要 跟踪特定市场的趋势 或者收集每日股票价格以建立预测模型 无论你是数据科学家还是业务分析师 都可能时不时遇到这种情况 并问自己一个永恒的问题 我如何
  • win系统电脑如何打开sketch?

    2 个方法快速使用 Windows 系统打开 Sketch 文件 使用 Adobe XD 打开 Sketch 文件或者使用浏览器中就能做设计的即时设计直接打开 Sketch 文件 众所周知 Sketch 只能在 Mac 电脑上使用 因此只有
  • SQuirrel SQL Client数据库连接工具的配置与使用

    SQuirrel SQL Client介绍 SQuirrel SQL Client是一个用Java写的数据库客户端 用JDBC统一数据库访问接口以后 可以通过一个统一的用户界面来操作MySQL PostgreSQL MSSQL Oracle
  • Java Html嵌入applet 来读取客户端串口

    写在前面 之前没搞过html嵌入applet来读取本地客户端串口 就直接使用RXTXcom jar 来直接读取本机串口 这个是没问题的如下 RXTX 有三个文件 有针对操作系统64 还有32的 1 RXTXcomm jar 导入项目中 2
  • 【LaTeX】矢量图转为pdf格式(为了将高清矢量图插入LaTeX)

    在论文编写的时候 需要插入高清的矢量图 但是不同的图片生成软件 图片处理软件 论文编写软件所支持的矢量图格式都是不一致的 如 matplotlib可以保存的矢量图格式为 svg eps 等 visio可以保存的格式为 svg emf 等 但
  • 聊聊 Java 泛型

    概述 什么是泛型 为什么需要泛型 如何使用 是什么原理 如何改进 这基本上就是我们学习一项技术的正确套路 本文将按照以上顺序展开阐述 介绍我所了解的泛型 什么是泛型 泛型的本质是参数化类型 即给类型指定一个参数 然后在使用时再指定此参数具体
  • Unable to start web server; nested exception is org.springframework.boot.web.server.WebServerExcepti

    问题描述 在异常信息当中我发现到一个redis连接不上的异常 项目当中我使用的是多环境 我运行的时候是运行的dev 这里的 profile active 我们在idea的maven的配置处进行快速的切换 启动项目的时候报的是连接不上redi
  • RabbitMq基础篇-09-channel接口常用几种参数详解

    文章目录 1 背景概述 2 通常参数解释 3 Channel一些Api解释 3 1 basicNack 不确认消息 3 2 basicReject 拒绝消息 3 3 RecoverOk 是否恢复消息到队列 3 4 exchangeDecla