【RabbitMQ教程】springboot整合rabbitmq(topic模式)

2023-11-16

 

下面还是模拟注册服务当用户注册成功后,向短信和邮件服务推送消息的场景
搭建SpringBoot环境

创建两个工程 mq-rabbitmq-producer和mq-rabbitmq-consumer,分别配置1、2、3(第三步本例消费者用注解形式,可以不用配)

1、添加AMQP的启动器:
 

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>

2、在application.yml中添加RabbitMQ的配置:  

server:
  port: 10086  
spring:
  application:
    name: mq-rabbitmq-producer
  rabbitmq:
    host: 192.168.1.103
    port: 5672
    username: kavito
    password: 123456
    virtualHost: /kavito
    template:
      retry:
        enabled: true
        initial-interval: 10000ms
        max-interval: 300000ms
        multiplier: 2
      exchange: topic.exchange
    publisher-confirms: true

属性说明:

 

属性说明:

  • template:有关AmqpTemplate的配置
    •  retry:失败重试
      •  enabled:开启失败重试
      •  initial-interval:第一次重试的间隔时长
      • max-interval:最长重试间隔,超过这个间隔将不再重试
      •  multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
    • exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
  • publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

当然如果consumer只是接收消息而不发送,就不用配置template相关内容。  

 3、定义RabbitConfig配置类,配置Exchange、Queue、及绑定交换机。

@Configuration
public class RabbitmqConfig {
    public static final String QUEUE_EMAIL = "queue_email";//email队列
    public static final String QUEUE_SMS = "queue_sms";//sms队列
    public static final String EXCHANGE_NAME="topic.exchange";//topics类型交换机
    public static final String ROUTINGKEY_EMAIL="topic.#.email.#";
    public static final String ROUTINGKEY_SMS="topic.#.sms.#";
 
    //声明交换机
    @Bean(EXCHANGE_NAME)
    public Exchange exchange(){
        //durable(true) 持久化,mq重启之后交换机还在
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
 
    //声明email队列
    /*
     *   new Queue(QUEUE_EMAIL,true,false,false)
     *   durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
     *   auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
     *   exclusive  表示该消息队列是否只在当前connection生效,默认是false
     */
    @Bean(QUEUE_EMAIL)
    public Queue emailQueue(){
        return new Queue(QUEUE_EMAIL);
    }
    //声明sms队列
    @Bean(QUEUE_SMS)
    public Queue smsQueue(){
        return new Queue(QUEUE_SMS);
    }
 
    //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
    @Bean
    public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,
                                @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
    @Bean
    public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,
                              @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }
 
}

生产者(mq-rabbitmq-producer)

为了方便测试,我直接把生产者代码放工程测试类:发送routing key是"topic.sms.email"的消息,那么mq-rabbitmq-consumer下那些监听的(与交换机(topic.exchange)绑定,并且订阅的routingkey中匹配了"topic.sms.email"规则的) 队列就会收到消息。
 

@SpringBootTest
@RunWith(SpringRunner.class)
public class Send {
 
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Test
    public void sendMsgByTopics(){
 
        /**
         * 参数:
         * 1、交换机名称
         * 2、routingKey
         * 3、消息内容
         */
        for (int i=0;i<5;i++){
            String message = "恭喜您,注册成功!userid="+i;
            rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"topic.sms.email",message);
            System.out.println(" [x] Sent '" + message + "'");
        }
 
    }
}

运行测试类发送5条消息: 

20190612112420397.png

web管理界面: 可以看到已经创建了交换机以及queue_email、queue_sms 2个队列,并且向这两个队列分别发送了5条消息

20190612125827992.png

 watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2thdml0bw==,size_16,color_FFFFFF,t_70

消费者(mq-rabbitmq-consumer)

编写一个监听器组件,通过注解配置消费者队列,以及队列与交换机之间绑定关系。(也可以像生产者那样通过配置类配置)

在SpringAmqp中,对消息的消费者进行了封装和抽象。一个JavaBean的方法,只要添加@RabbitListener注解,就可以成为了一个消费者。

@Component
public class ReceiveHandler {
 
    //监听邮件队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue_email", durable = "true"),
            exchange = @Exchange(
                    value = "topic.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"topic.#.email.#","email.*"}))
    public void rece_email(String msg){
        System.out.println(" [邮件服务] received : " + msg + "!");
    }
 
    //监听短信队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue_sms", durable = "true"),
            exchange = @Exchange(
                    value = "topic.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"topic.#.sms.#"}))
    public void rece_sms(String msg){
        System.out.println(" [短信服务] received : " + msg + "!");
    }
}

属性说明:

  • @Componet:类上的注解,注册到Spring容器
  • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
    • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
      • value:这个消费者关联的队列。值是@Queue,代表一个队列
      • exchange:队列所绑定的交换机,值是@Exchange类型
      •  key:队列和交换机绑定的RoutingKey,可指定多个

启动mq-rabbitmq-comsumer项目

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2thdml0bw==,size_16,color_FFFFFF,t_70

ok,邮件服务和短息服务接收到消息后,就可以各自开展自己的业务了。 

 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【RabbitMQ教程】springboot整合rabbitmq(topic模式) 的相关文章

随机推荐

  • 软件的最低测试方法

    前言 1 1 引言 对于大部分软件系统 如何测试及有效的测试 是一个很头痛的问题 在软件工程上 测试是软件工程中极其重要的一部分 但在具体的实际情况上 无论是时间 人手及资源的调配等原因 使国内大部分软件公司没有进行过理论上的完整的测试 本
  • JAVA变量与数据类型

    人生不如意之事十有八九 在最好的年纪要努力充实自己 莫等空悲切白了少年头 而是要及时当勉励 岁月不待人 一 java变量 变量概述 1 内存中存储的一个存储区域 2 该存储区域内的数据在同一类型范围内不断变化 3 变量是程序中最基本的存储单
  • 老虎证券美股策略——将动量策略日频调仓改成月频

    最近策略频繁回撤 跑不赢标普500指数 所以对策略简单修改 以待后效 新加入的代码 def get if trade day infile open countday dat r incontent infile read infile c
  • Linux系统中负载较高问题排查思路与解决方法

    Load 就是对计算机干活多少的度量 Load Average 就是一段时间 1分钟 5分钟 15分钟 内平均Load linux服务器出现高负载的情况下 一般都有一些具体的症状 比如cpu 内存等被耗尽 磁盘IO或者网络等出现问题 下面通
  • CentOS7下安装LNMP以及phpMyAdmin

    两种安装 第一种 下载 可以到官网找 版本 https www phpmyadmin net downloads cd 到你要下载的位置 wget https files phpmyadmin net phpMyAdmin 4 4 12 p
  • Maven中pom文件内scope标签中import、parent 、dependencies、dependencyManagement详解

    首先介绍parent 如果父项目中有这些依赖
  • linux-sed命令

    目录 1 linux shell sed获取某一段字符串 2 linux shell shell脚本中 sed n取出某一行赋给一个变量 3 linux shell sed查询某一行 1 linux shell sed获取某一段字符串 如果
  • 网络层(四)

    网络层 我们说过 网络层主要讲的就是ip编址和路由选择算法 更准确的说 应该是网际IP协议 网际IP协议主要说明了各个主机和服务器的ip编址规则 了解IP编址前 我们需要知道IP数据报 IP数据报在网络层中传输 我们看一下IP数据报的结构
  • STM32F103ZET6【标准库函数开发】------PB3,PB4当做普通IO口,重定义

    一 如题 我在设计原理图的时候将PB3和PB4当做了普通IO口 结果按照一般配置的方法操作后 PB3 PB4并没有输出自己想要的信号 配置如下 void MOTOR GPIO Init void 初始化 GPIO InitTypeDef G
  • 人社练兵比武怎样挣积分 python 源码在线答题源码

    可以自动答题积分 不明白如何用的可以联系我 下面2个函数是学练习的 需要用的库为selenium time re pickle 题库需要收集 def dan 单选或多选 j browser find element by xpath id
  • java绘制(可视化)树结构图

    以JPanel组件为画板 继承JPanel类并重写paint Graphics g 函数 在函数中使用画笔g绘制树结构图 实例代码 3个java源文件 Main java DrawNode java DrawTree java 1 Main
  • 周订单量趋势

    周订单量趋势 PreAuthorize hasAuthority admin statistics home chart order week ApiOperation value 周订单量趋势 RequestMapping value c
  • 《Perl语言入门》读书笔记(六)哈希

    1 哈希特点 哈希是一种数据结构 与数组相同点 能容纳任意多的值 而哈希的检索方式与数组不同 数组是以数字下标检索 而哈希中的值 value 以唯一的名字 key 检索 key value一一对应 乱序排列 类似一桶数据 由于检索方式不同
  • 程序记录(一)VGG16猫狗分类

    import torch from torchvision import datasets models transforms import os from torch utils data import DataLoader from t
  • RANSAC鲁棒参数估计

    转自 http blog csdn net zhanglei8893 archive 2010 01 23 5249470 aspx RANSAC 是 RANdom SAmple Consensus 的缩写 该算法是用于从一组观测数据中估计
  • U-Boot Passing Kernel Arguments

    本文转载至 http www denx de wiki view DULG LinuxKernelArgs In nearly all cases you will want to pass additional information t
  • ply格式文件导出

    ply格式导出代码片段 注意vertex 和tri都是 N 3 格式 三角形编号从1开始 def dump to ply vertex tri wfp index in tri should begin from 1 vertex in s
  • 【Vue】Vue基础自用笔记&Day04_①Vue组件②Vue插槽

    Vue基础 Day04 1 Vue组件 component 定义全局组件 定义私有组件 组件中数据和方法的调用 组件动画 父组件传值子组件 子组件传值父组件 2 Vue插槽 slot 如果出现具名插槽没有效果 但是也没有报错 极有可能是Vu
  • C语言——IIC协议概述+PCF8591

    IIC协议 SCL必须由主机发送 在SCL 1 高电平 时 SDA下跳则 判罚 为 起始信号 SDA上跳则 判罚 为 停止信号P 每个字节后应该由对方回送一个应答信号ACK做为对方在线的标志 非应答信号一般在所有字节的最后一个字节后 一般要
  • 【RabbitMQ教程】springboot整合rabbitmq(topic模式)

    下面还是模拟注册服务当用户注册成功后 向短信和邮件服务推送消息的场景 搭建SpringBoot环境 创建两个工程 mq rabbitmq producer和mq rabbitmq consumer 分别配置1 2 3 第三步本例消费者用注解