【RabbitMQ教程】“Hello World”工作队列模式

2023-11-05

目录

前言

“Hello World”工作队列模式介绍

消息模型

入门案例代码示例(自动ACK)

消息确认机制

自动ACK存在的问题

演示手动ACK


前言

1、将‘Hello World工作队列模式’单独抽出来细讲,目的是借助这个模式好好讲一下rabbitmq的‘自动ACK’和‘手动ACK’。

2、代码中的每一步,都演示了rabbitmq管理界面的变化;

“Hello World”工作队列模式介绍

rabbitmq六大工作模式架构图:

消息模型

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序

  • C:消费者:消息的接受者,会一直等待消息到来。

  • queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

入门案例代码示例(自动ACK)

 基于maven采用java原生写法。不需要写properties或者yml配置文件

新建一个maven工程,添加amqp-client依赖:

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.1</version>
</dependency>

连接工具类:

public class ConnectionUtil {
    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("192.168.1.103");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/kavito");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        factory.setUsername("kavito");
        factory.setPassword("123456");
        // 通过工厂获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

生产者:

public class Send {
 
    private final static String QUEUE_NAME = "simple_queue";
 
    public static void main(String[] argv) throws Exception {
        // 1、获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 2、从连接中创建通道,使用通道才能完成消息相关的操作
        Channel channel = connection.createChannel();
        // 3、声明(创建)队列
        //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 参数明细
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4、消息内容
        String message = "Hello World!";
        // 向指定的队列中发送消息
        //参数:String exchange, String routingKey, BasicProperties props, byte[] body
        /**
         * 参数明细:
         * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
         * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
         * 3、props,消息的属性
         * 4、body,消息内容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        
        //关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
        channel.close();
        connection.close();
    }
}

控制台:

web管理页面:服务器地址/端口号 (本地:127.0.0.1:15672,默认用户及密码:guest guest) 

点击队列名称,进入详情页,可以查看消息:

 消费者接收消息:

public class Recv {
    private final static String QUEUE_NAME = "simple_queue";
 
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
        Channel channel = connection.createChannel();
        // 声明队列
        //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 参数明细
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //实现消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel){
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            /**
             * 当接收到消息后此方法将被调用
             * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
             * @param envelope 信封,通过envelope
             * @param properties 消息属性
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                // body 即消息体
                String msg = new String(body,"utf-8");
                System.out.println(" [x] received : " + msg + "!");
            }
        };
        
        // 监听队列,第二个参数:是否自动进行消息确认。
        //参数:String queue, boolean autoAck, Consumer callback
        /**
         * 参数明细:
         * 1、queue 队列名称
         * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
         * 3、callback,消费方法,当消费者接收到消息要执行的方法
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

控制台打印:

再看看队列的消息,已经被消费了

 我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印. 

消息确认机制

通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。

那么问题来了:RabbitMQ怎么知道消息被接收了呢

如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

  •     自动ACK:消息一旦被接收,消费者自动发送ACK
  •     手动ACK:消息接收后,不会发送ACK,需要手动调用

大家觉得哪种更好呢?

这需要看消息的重要性:

  •     如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
  •     如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
     

我们之前的测试都是自动ACK的,如果要手动ACK,需要改动我们的代码:

public class Recv2 {
    private final static String QUEUE_NAME = "simple_queue";
 
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
                // 手动进行ACK
                /*
                 *  void basicAck(long deliveryTag, boolean multiple) throws IOException;
                 *  deliveryTag:用来标识消息的id
                 *  multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
                 */
                
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列,第二个参数false,手动进行ACK
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

 最后一行代码设置第二个参数为false

channel.basicConsume(QUEUE_NAME, false, consumer);

自动ACK存在的问题

修改消费者,添加异常,如下:

生产者不做任何修改,直接运行,消息发送成功:  

 运行消费者,程序抛出异常:

 管理界面:

消费者抛出异常,但是消息依然被消费,实际上我们还没获取到消息。

演示手动ACK

重新运行生产者发送消息:

同样,在手动进行ack前抛出异常,运行Recv2

 再看看管理界面:

 消息没有被消费掉!

还有另外一种情况:修改消费者Recv2,把监听队列第二个参数自动改成手动,(去掉之前制造的异常) ,并且消费方法中没手动进行ACK

 

 生产者代码不变,再次运行:

运行消费者 :

但是,查看管理界面,发现:

停掉消费者的程序,发现:  

这是因为虽然我们设置了手动ACK,但是代码中并没有进行消息确认!所以消息并未被真正消费掉。当我们关掉这个消费者,消息的状态再次变为Ready。

正确的做法是:

我们要在监听队列时设置第二个参数为false,代码中手动进行ACK

 再次运行消费者,查看web管理页面:

消费者消费成功!  

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【RabbitMQ教程】“Hello World”工作队列模式 的相关文章

随机推荐

  • i.MX6ULL - 问题解决:NFS挂载失败 - VFS: Unable to mount root fs on unknown-block(2,0)

    i IMX6ULL 问题解决 NFS挂载失败 VFS Unable to mount root fs on unknown block 2 0 开发环境 移植的linux5 4 7 0 ubuntu1804 x64 arm linux gn
  • 毕业设计-机器视觉深度学习的视频去水印算法

    目录 前言 课题背景和意义 实现技术思路 实现效果图样例 前言 大四是整个大学期间最忙碌的时光 一边要忙着备考或实习为毕业后面临的就业升学做准备 一边要为毕业设计耗费大量精力 近几年各个学校要求的毕设项目越来越难 有不少课题是研究生级别难度
  • MFC视频教程(孙鑫)学习笔记2-掌握C++

    这一集中 主要总结了C 经典语法与应用 1 C 的三大特性 封装 继承 多态 2 C 中提供了一套输入输出流类的对象 它们是cin cout和cerr 对应c语言中的三个文件指针stdin stdout stderr 分别指向终端输入 终端
  • Ubuntu下网页打开速度缓慢的解决方法

    Ubuntu下网页打开速度缓慢的解决方法 网速较慢可能是网络配置的原因导致 解决步骤如下 以下指令均在Ubuntu终端输入执行 一 查看Ubuntu版本信息 lsb release a 二 使用pdnsd软件为本机搭建DNS代理服务器 1
  • Redis第二十讲 Redis哨兵自动故障转移与优缺点

    sentinel哨兵是特殊的redis服务 不提供读写服务 主要用来监控redis实例节点 哨兵架构下client端第一次从哨兵找出redis的主节点 后续就直接访问redis的主节点 不会每次都通过sentinel代理访问redis的主节
  • ES 聚合函数的用法

    1 ES聚合分析是什么 聚合分析是数据库中重要的功能特性 完成对一个查询的数据集中数据的聚合计算 如 找出某字段 或计算表达式的结果 的最大值 最小值 计算和 平均值等 ES作为搜索引擎兼数据库 同样提供了强大的聚合分析能力 对一个数据集求
  • K和KB的区别

    来源 综合自己和网上的观点 问题1 K与KB之间有什么区别 我在做一道解时 就是 某计算机字长16位 它的存储容量是1MB 按字编址 这经的寻址范围是 A 512K B 1M C 512KB 答案给的是A 我很不解 为什么512K与512K
  • (error) CROSSSLOT Keys in request don‘t hash to the same slot 解决方法

    Redis 哈希槽基本概念 哈希槽 hash slot 是来自Redis Cluster的概念 但在各种集群方案都有使用 哈希槽是一个key的集合 Redis集群共有16384个哈希槽 每个key通过CRC16散列然后对16384进行取模来
  • Python opencv 机器学习 5.knn pca降维 ocr识别数字 mnist数据集

    coding utf 8 from numpy import import numpy as np import struct import matplotlib pyplot as plt import operator 定义一个全局特征
  • 轻松获取在线媒体:视频下载工具推荐

    ytdl org youtube dl Stars 121 0k License Unlicense youtube dl 一个命令行程序 可以从YouTube com和其他视频网站下载视频 基于 Python 实现 你可以在Unix Wi
  • 上班之路 华为OD真题 200

    public class Main public static char map 地图 public static int t 转弯次数 public static int c 路障个数 public static int n 地图行数 p
  • Android 项目必备(十一)--> 轮询操作

    文章目录 前言 实战 前言 什么叫轮询请求 简单理解就是 App 端每隔一定的时间重复请求的操作就叫做轮询请求 比如 App 端每隔一段时间上报一次定位信息 App 端每隔一段时间拉去一次用户状态等 这些应该都是轮询请求 为何不用长连接代替
  • ibm服务器开机后显示器闪烁,IBM E50彩色显示器,开机后电源指示灯闪烁,机内有“咔嗒”声,黑屏...

    经观察发现 咔嗒 声是消磁继电器断开 闭合的声音 经测量 该继电器13V供电电压时有时无 该故障现象特别 不易判断 但从屏幕无显示这一故障现象入手 可初步判定故障范围可能在电源电路和行扫描电路 首先 不开机直观检查相关电路 未见异常 然后测
  • 由bibtex生成引用文献字符串

    word 文档写引用文献 用 mendeley 的插件生成的效果似乎一般 用法见 1 2 而且自己改格式的那个网页令人火大 可能我网速问题 用 python 写了个脚本 通过解析 bibtex 来生成 格式自编 项目页见 4 Code 目前
  • C++的函数_默认参数详解

    案例 int func int a int b 10 return a b int main func 10 return 0 注意事项 1 实现的函数中参数有默认参数 调用时如再传入 默认参数的值被替换为传入的值 func 10 20 r
  • vue报错

    启动vue项目时报如下错误 可是我的代码里并没有matched 后来才发现是路由引入错误 在 main js 文件中 上面的 import router from router 这个语句的前面 router 中的 R 必须的小写 不然就会出
  • 《substrate 快速入门与开发实战》

    视频地址 https www bilibili com video BV1C4411U7Rv substrate的升级过程 编写的runtime代码 gt 编译后 得到runtime的wasm二进制文件 gt 通过链上的治理模块发送升级ru
  • 关于nodejs中使用fluent-ffmpeg模块、ffmpeg工具的使用心得

    类人猿Blog 欢迎来到我的博客 您好 这是本人第一次写博客 请多多指教 nodejs中使用 fluent ffmpeg 详细方法和系统配置 适应于 windows和 linux 特别是在 redhat6 x中得以验证通过 简介 我们都知道
  • 对于c++中模板函数的一点体会

    何为模板函数 从字面上就可以看出来模板函数必须具备通用性 举个简单却很实用的例子 交换两个值的函数Swap 交换两个字符型void Swap char a char b 交换两个整型void Swap int a int b 交换两个浮点型
  • 【RabbitMQ教程】“Hello World”工作队列模式

    目录 前言 Hello World 工作队列模式介绍 消息模型 入门案例代码示例 自动ACK 消息确认机制 自动ACK存在的问题 演示手动ACK 前言 1 将 Hello World工作队列模式 单独抽出来细讲 目的是借助这个模式好好讲一下