SpringBoot RabbitMq 六大模式

2023-11-17

目录

依赖、配置

简单队列

模型

代码示例

工作队列

模型

代码示例

订阅模式

模型

代码示例

路由模式

模型

代码示例

主题模式

模型

代码示例

RPC


依赖、配置

依赖:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置:

spring:
  rabbitmq:
    host: 192.168.31.10
    port: 5672 #通过控制台可以查看
    username: guest
    password: guest
    virtual-host: /vhost_sys_logs #可以不配置,会使用的是默认virtual-host

简单队列

模型

 简单队列,consumer和producer通过队列直连。

代码示例

配置类:

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue() {
        //name,名字;durable,是否开启持久化
        return new Queue("logs",false);
    }
}

生产者:

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes=PrivilegeSystemMain.class)
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void simpleTest() {
        rabbitTemplate.convertAndSend("logs","helo world!");
    }
}

消费者:

@Component
@Slf4j
public class ConsumeBean {
    @RabbitListener(queues={"logs"})
    public void getMsg(String message){
        log.info(message);
    }
}

工作队列

模型

 工作队列(work queue),让多个消费者去消费同一个消息队列中的消息,支持轮询分发(默认)、公平分发两种分发模式。

代码示例

配置类:

@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue() {
        //name,名字;durable,是否开启持久化
        return new Queue("logs",false);
    }
}

生产者:

@SpringBootTest(classes=PrivilegeSystemMain.class)
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void simpleTest() {
        rabbitTemplate.convertAndSend("logs","helo world 01!");
        rabbitTemplate.convertAndSend("logs","helo world 02!");
    }
}

消费者:

@Component
@Slf4j
public class ConsumeBean {
    @RabbitListener(queues={"logs"})
    public void consumer_01(String message){
        log.info("consumer_01 get message "+message);
    }
    @RabbitListener(queues={"logs"})
    public void consumer_02(String message){
        log.info("consumer_02 get message "+message);
    }
}

订阅模式

模型

订阅模式(fanout),也叫广播模式,见名知意,其特点是将消息广播出去。通过交换机将生产者生产的消息分发到多个队列中去,从而支持生产者生产的一个消息被多个消费者消费。

代码示例

配置类:

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue_01() {
        //name,名字;durable,是否开启持久化
        return new Queue("queue_01",false);
    }
    @Bean
    public Queue queue_02(){
        return new Queue("queue_02",false);
    }
    //订阅模式的交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_exchange",false,false);
    }

    //将队列绑定到交换机上
    @Bean
    public Binding bindingSmsQueue_01(@Qualifier("queue_01") Queue logsAccess, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(logsAccess).to(fanoutExchange);
    }
    @Bean
    public Binding bindingSmsQueue_02(@Qualifier("queue_02") Queue logsError, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(logsError).to(fanoutExchange);
    }

}

生产者:

@SpringBootTest(classes=PrivilegeSystemMain.class)
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void simpleTest() {
        rabbitTemplate.convertAndSend("fanout_exchange","","helo world 01!");
    }
}

消费者:

@Component
@Slf4j
public class ConsumeBean {
    @RabbitListener(queues={"queue_01"})
    public void consumer_01(String message){
        log.info("consumer_01 get message "+message);
    }
    @RabbitListener(queues={"queue_02"})
    public void consumer_02(String message){
        log.info("consumer_02 get message "+message);
    }
}

路由模式

模型

 路由模式(direct),在订阅模式支持一条消息被多个消费者消费的特性上增加了分类投递的特性,通过交换机,支持消息以类别(routing key)的方式投送到不同的消息队列中去。

代码示例

配置类:

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue_01() {
        //durable,是否开启持久化
        return new Queue("queue_01",false);
    }
    @Bean
    public Queue queue_02(){
        return new Queue("queue_02",false);
    }

    //路由模式的交换机
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct_exchange",false,false);
    }

    //将队列绑定到交换机上
    @Bean
    public Binding bindingSmsQueue_01(@Qualifier("queue_01") Queue logsAccess, DirectExchange directExchange) {
        return BindingBuilder.bind(logsAccess).to(directExchange).with("routing_key_01");
    }
    @Bean
    public Binding bindingSmsQueue_02(@Qualifier("queue_02") Queue logsError, DirectExchange directExchange) {
        return BindingBuilder.bind(logsError).to(directExchange).with("routing_key_02");
    }

}

生产者:

@SpringBootTest(classes=PrivilegeSystemMain.class)
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void simpleTest() {
        rabbitTemplate.convertAndSend("direct_exchange","routing_key_01","helo world!");
    }
}

消费者:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ConsumeBean {
    @RabbitListener(queues={"queue_01"})
    public void consumer_01(String message){
        log.info("consumer_01 get message "+message);
    }
    @RabbitListener(queues={"queue_02"})
    public void consumer_02(String message){
        log.info("consumer_02 get message "+message);
    }
}

主题模式

模型

 主题模式,也叫通配符模式,在路由模式以类别进行消息投送的基础上增加了对通配符的支持,这样就可以使用通配符将多个类别聚合成一个主题。

代码示例

配置类:

@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue_01() {
        //durable,是否开启持久化
        return new Queue("queue_01",false);
    }
    @Bean
    public Queue queue_02(){
        return new Queue("queue_02",false);
    }

    //主题模式的交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topic_exchange",false,false);
    }

    //将队列绑定到交换机上
    @Bean
    public Binding bindingSmsQueue_01(@Qualifier("queue_01") Queue logsAccess, TopicExchange topicExchange) {
        return BindingBuilder.bind(logsAccess).to(topicExchange).with("#.error.#");
    }
    @Bean
    public Binding bindingSmsQueue_02(@Qualifier("queue_02") Queue logsError, TopicExchange topicExchange) {
        return BindingBuilder.bind(logsError).to(topicExchange).with("#.info.#");
    }

}

生产者:

@SpringBootTest(classes=PrivilegeSystemMain.class)
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void simpleTest() {
        rabbitTemplate.convertAndSend("topic_exchange","test.error.test","helo world!");
    }
}

消费者:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ConsumeBean {
    @RabbitListener(queues={"queue_01"})
    public void consumer_01(String message){
        log.info("consumer_01 get message "+message);
    }
    @RabbitListener(queues={"queue_02"})
    public void consumer_02(String message){
        log.info("consumer_02 get message "+message);
    }
}

RPC

几乎不会使用,略......

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot RabbitMq 六大模式 的相关文章

  • 在 Red Hat 上安装 RabbitMQ - 错误的 Erlang 版本

    我正在尝试按照以下说明在 Red Hat Enterprise Linux 7 64 位工作站版本 的评估虚拟机上安装 RabbitMQhttps www rabbitmq com install rpm html https www ra
  • 如何使用自动装配的 Spring Boot 监听多个队列?

    我是 Spring Boot 的新手 正在尝试它 目前我已经构建了一些应用程序 我希望能够通过队列相互通信 我目前有一个侦听器对象 可以从特定队列接收消息 Configuration public class Listener final
  • 过期的消息不会从 RabbitMQ 中删除

    我通过生产者向 RabbitMQ 发送一条普通消息 然后发送第二条消息expiration属性分配给一个值 然后使用rabbitmqctl list queues命令我监视消息的状态 我发现如果我先发送一条普通消息 然后发送一条消息expi
  • Spring AMQP + RabbitMQ 3.3.5 ACCESS_REFUSED - 使用身份验证机制 PLAIN 拒绝登录

    我遇到以下异常 org springframework amqp AmqpAuthenticationException com rabbitmq client AuthenticationFailureException ACCESS R
  • 谁能告诉我 python 中的 pika 和 kombu 消息传递库有什么区别?

    我想在我的应用程序中使用消息传递库与rabbitmq交互 谁能解释一下 pika 和 kombu 库之间的区别吗 Kombu 和 pika 是两个不同的 Python 库 它们从根本上服务于相同的目的 向消息代理发布消息和使用消息代理发送消
  • 如何在nodejs中验证rabbitmq?

    错误 握手被服务器终止 403 ACCESS REFUSED 消息 ACCESS REFUSED 使用身份验证拒绝登录 旋转机制平原 有关详细信息 请参阅代理日志文件 我单独尝试了 authMechanism PLAIN AMQPLAIN
  • Celery 任务状态取决于 CELERY_TASK_RESULT_EXPIRES

    据我所知 任务状态完全取决于 CELERY TASK RESULT EXPIRES 设置的值 如果我在任务完成执行后检查此间隔内的任务状态 则返回的状态为 AsyncResult task id state 是正确的 如果没有 状态将不会更
  • 死信交换 RabbitMQ 丢弃消息

    我正在尝试在 RabbitMQ 中实现 dlx 队列 场景很简单 我有 2 个队列 1 活着 2 死亡 x dead letter exchange 立即 x message ttl 5000 以及 立即 交换 这必然是 1 活着 我尝试运
  • RabbitMQ 失败,错误:无法连接到节点rabbit@TPAJ05421843:nodedown

    在 Windows 7 Enterprise 计算机上 我全新安装了 Erlang 17 4 和 RabbitMQ 3 4 3 x64 安装成功且顺利 我还没有尝试创建我的第一个队列或交换器 但我已经看到了麻烦 这个问题类似于另一个SO帖子
  • 使用 RabbitMq 锁定和批量获取消息

    我正在尝试以一种更非常规的方式使用 RabbitMq 尽管此时我可以根据需要选择任何其他消息队列实现 消费者不会将 Rabbit 推送消息留给我的消费者 而是连接到一个队列并获取一批 N 条消息 在此期间它会消费一些消息 并可能拒绝一些消息
  • RabbitMQ 上的 Nack 和拒绝

    我想处理消费者从队列中获取的不成功的消息并将它们重新排队 想象一下我有这样的情况 P gt foo bar baz gt C 其中 foo bar 和 baz 是消息 如果消费者读到baz但出了问题 我可以使用basic reject or
  • 即使设置了 cookie,RabbitMQ 身份验证也会失败

    我最近在运行 lattePanda 的 Windows 10 上安装了带有 ErlanOTP 的rabbitmq 我运行rabbitmqctl status并收到以下错误 C Program Files RabbitMQ Server ra
  • 基于多线程的 RabbitMQ 消费者

    我们有一个 Windows 服务 它监听单个 RabbitMQ 队列并处理消息 我们希望扩展相同的 Windows 服务 以便它可以监听 RabbitMQ 的多个队列并处理消息 不确定使用多线程是否可以实现这一点 因为每个线程都必须侦听 阻
  • ECONNREFUSED:无法连接到集群内默认端口上的 RabbitMQ pod

    我的本地集群中有一个运行 RabbitMQ 的 pod 我已经将其配置为 apiVersion v1 kind Service metadata name service rabbitmq spec selector app service
  • Celery 广播 vs RabbitMQ 扇出

    我最近一直在使用 Celery 但我不喜欢它 它的配置很混乱 过于复杂并且文档记录很少 我想用 Celery 从单个生产者向多个消费者发送广播消息 让我困惑的是 Celery 术语和底层传输 RabbitMQ 术语之间的差异 在 Rabbi
  • RabbitMQ 3.1.3 和丢失的时间戳头

    如果消息中缺少时间戳头 是否可以将代理配置为插入时间戳头 因此 如果发布客户端没有添加时间戳标头 代理是否可以插入与交易所收到消息的时刻相匹配的时间戳值 我应该在哪里寻找该配置 或者这是一个坏主意 截至2015年 原来的问题有了新的答案 这
  • 公共交通错误队列正在消耗,但仍然不为空

    我正在使用 Mastransit 3 5 0 和 RabbitMq 如果队列消费者抛出异常 则默认由 MoveExceptionToTransportFilter 处理异常并移至 error 队列 对于 error 队列 我有单独的消费者
  • 如何停止本地主机上的 RabbitMQ 服务器

    我在 OS X 上安装了 RabbitMQ 服务器 并在命令行上启动它 现在 我应该如何阻止它运行还不清楚 我这样做之后 sudo rabbitmq server detached I get Activating RabbitMQ plu
  • 具有重新排队功能的 BasicReject 实际上去了哪里?

    这似乎是一个简单的问题 但我很难找到明确的答案 如果在 RabbitMQ 3 6 1 中我有一个如下所示的队列 5 4 3 2 1 lt head 我使用消息 1 然后执行以下操作 channel BasicReject ea Delive
  • 使用 Spring Boot 的多个 Rabbitmq 队列

    来自 Spring Boot 教程 https spring io guides gs messaging rabbitmq https spring io guides gs messaging rabbitmq 他们给出了创建 1 个队

随机推荐