RabbitMQ之交换机类型

2023-11-19

一、交换机类型

在 RabbitMQ 中,交换机主要用来将生产者生产出来的消息,传送到对应的队列中,即交换机是一个消息传送的媒介,其英文被称为 exchange 。交换机在 RabbitMQ 中起着承上启下的作用。

交换机主要有四种类型:

  • direct: 直连
  • topic: 主题
  • fanout:广播
  • headers: 请求头

常用的只有前面三种:direct、topic、fanout

二、direct 交换机

所有发送到 direct 交换机的消息都被转发到RouteKey中指定的队列 queue。RouteKey必须完全匹配才会被队列接收。

RabbitMQ自带的Exchange:default Exchange就是一个 direct 类型的交换机。当生产端发送消息没有指定交换机的名称时,使用的就是该交换机。当队列名称和 routingKey 一样时,也可以不用将exchange和queue进行绑定(binding)操作。

如下代码所示:

    // 生产端:发送消息
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String queueName = "direct_queue";
        // 这里发布消息时,第一个参数为交换机名称,为空,则使用默认交换机 default exchange
        // 这里的第二个参数本应该是 routingKey,这里直接使用了队列名称,则就不需要再执行 exchange和queue通过 routingKey 的绑定操作了
        channel.basicPublish("", queueName, null, "didiok send a direct message".getBytes());
        channel.close();
        connection.close();
    }

    // 消费端:消费消息
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String queueName = "direct_queue";
        channel.queueDeclare(queueName, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println(new String(delivery.getBody()));
        }
    }

三、topic 交换机

所有发送到 topic 交换机 的消息被转发到所有 RouteKey匹配到的Queue上。这里的 routingKey 可以使用通配符进行模糊匹配:

  • 符号 # 匹配一个或多个词
  • 符号 * 匹配一个词

例如:“didiok.#” 能够匹配到 “didiok.hello”、“didiok.hello.world”、“didiok.hello.world.abc”等。而
“didiok.*”只会匹配到“didiok.hello”这类后面只带有一个词的。

代码如下:

    // 生产端:发送消息
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "topic_exchange";
        String routingKey1 = "user.aa.dev";
        String routingKey2 = "user.bb";
        String routingKey3 = "user.cc";

        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .contentEncoding("UTF-8")
                .headers(new HashMap<>())
                .deliveryMode(2)
                .build();

        String message = "didiok send a topic_message~~~";
        channel.basicPublish(exchangeName, routingKey1, props, message.getBytes());
        channel.basicPublish(exchangeName, routingKey2, props, message.getBytes());
        channel.basicPublish(exchangeName, routingKey3, props, message.getBytes());
    }

    // 消费端一:消费消息
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        /**** 以下对 交换机和队列的声明和绑定 操作,最好不要再代码中执行,建议在 rabbitMQ控制台中进行操作,因为代码可能重复执行,导致出现异常 **/
        String queueName = "topic_queue1";
        String exchangeName = "topic_exchange";
        String exchangeType = "topic";
        String routingKey = "user.#";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.exchangeDeclare(exchangeName, exchangeType, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        System.err.println("consumer1 starting...");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("消息内容:" + new String(delivery.getBody()) + ",routingKey:" + delivery.getEnvelope().getRoutingKey());
        }
    }

    // 消费端二:消费消息
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        /**** 以下对 交换机和队列的声明和绑定 操作,最好不要再代码中执行,建议在 rabbitMQ控制台中进行操作,因为代码可能重复执行,导致出现问题 **/
        String queueName = "topic_queue2 ";
        String exchangeName = "topic_exchange";
        String exchangeType = "topic";
        String routingKey = "user.*";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.exchangeDeclare(exchangeName,exchangeType,false, false,null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName,true, consumer);
        System.err.println("consumer2 starting...");

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("消息内容:"+new String(delivery.getBody())+", routingKey:"+delivery.getEnvelope().getRoutingKey());
        }
    }
    

消费端一可以收到生产端发的三条消息,消费端二只能收到两条消息。

四、fanout 交换机

fanout 交换机 类似于广播,不走 routingKey,所以可以不设置 routingKey,设置了也没用。只需要简单的将队列绑定在交换机上。发送到交换机上的消息都会被转发到与该交换机绑定的所有队列上。Fanout交换机转发消息是最快的,其次是 direct 交换机,topic 交换机最慢,因为需要根据匹配规则寻找队列(通配符找起来速度慢)。

示例代码如下:

    // 生产端:发送消息
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);


        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "fanout_exchange";
        String routingKey = "";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                 .headers(new HashMap<>())
                 .contentEncoding("UTF-8")
                 .deliveryMode(2)
                 .build();

        for(int i=0; i<10; i++){
            String msg = "发送消息的序号:"+i;
            channel.basicPublish(exchangeName, routingKey, props, msg.getBytes());
        }
        channel.close();
        connection.close();
    }

    // 消费端一:消费消息
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String queueName = "fanout_queue";
        String exchangeName = "fanout_exchange";
        String routingKey = "";

        channel.queueDeclare(queueName, false, false, false, null);
        channel.exchangeDeclare(exchangeName, "fanout", false,false,null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        System.err.println("consumer1 starting...");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("受到消息:"+new String(delivery.getBody())+",routingKey:"+delivery.getEnvelope().getRoutingKey());
        }
    }

    // 消费端二:消费消息
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String queueName = "fanout_queue2";
        String exchangeName = "fanout_exchange";
        String routingKey = "";

        channel.queueDeclare(queueName, false, false, false, null);
        channel.exchangeDeclare(exchangeName, "fanout", false,false,null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        System.err.println("consumer2 starting...");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("受到消息:"+new String(delivery.getBody())+",routingKey:"+delivery.getEnvelope().getRoutingKey());
        }
    }

因为交换机和队列进行了绑定,消费端一和消费端二都会收到生产端发送的消息。 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ之交换机类型 的相关文章

随机推荐

  • php随机生成密码函数

    随机生成密码函数 param int length 密码长度 return string function generate password length 8 密码字符集 可任意添加你需要的字符 abc abcdefghijklmnopq
  • python常用编译器和解释器的区别_详解python编译器和解释器的区别

    详解python编译器和解释器的区别 高级语言不能直接被机器所理解执行 所以都需要一个翻译的阶段 解释型语言用到的是解释器 编译型语言用到的是编译器 编译型语言通常的执行过程是 源代码 预处理器 编译器 目标代码 链接器 可执行程序 某种意
  • Linux(CentOS 或者 Ubuntu都可以)安装docker

    Linux CentOS 或者 Ubuntu都可以 安装docker 介绍下如何在Linux下面安装docker 安装方式如下 1 关闭防火墙 systemctl stop firewalld systemctl disable firew
  • 【Unity Optimize】使用对象池(Object Pooling)优化项目

    目录 1 对象池 Object Pooling 介绍 2 实现对象池脚本 3 使用对象池生成Cube 4 效果展示 5 Unity资源商店的对象池插件 1 对象池 Object Pooling 介绍 Unity中的对象池 Object Po
  • 单例模式(小小单例,一点也不小)

    小小单例 一点也不小 今天终于发现了原来单例模式还有这么多道道 单例模式解决了两个基本问题 全局访问和实例化控制 出自 大话设计模式 懒汉式单例模式 定义 要在第一次被引用时 才会将自己实例化 所以就被称为懒汉式单例模式 也就是我们常用的单
  • C 标准库 - 《assert.h》

    原文链接 https www runoob com cprogramming c standard library assert h html 简介 C 标准库的 assert h头文件提供了一个名为 assert 的宏 它可用于验证程序做
  • R: R版本更新及R包迁移(详细步骤)

    在安装R包的过程中 有时候会提醒R版本不够等情况 当需要更新R版本 又需要保证旧版本安装的R包可以完整迁移到新版本R时 可通过 installr 包实现 install packages installr library installr
  • python使用SMTP发送邮件

    SMTP是发送邮件的协议 Python内置对SMTP的支持 可以发送纯文本邮件 HTML邮件以及带附件的邮件 Python对SMTP支持有smtplib和email两个模块 email负责构造邮件 smtplib负责发送邮件 首先 我们来构
  • ARC105

    C Camels and Bridge 题意 一堆骆驼过桥 每个桥有承重和长度 问骆驼从头到尾的最近距离 假设这时候骆驼的过桥顺序已经安排好 每一个桥相当于一个限制条件 限制了 l r 的最近距离 也就是说 对于每一个骆驼 j 要保证 好了
  • OJ题目8--动态规划问题

    1 509 斐波那契数 力扣 LeetCode leetcode cn com 过去一直用递归法 但由于栈区空间的限制 当递归过深时容易发生栈溢出 int fib int n if n 0 return 0 else if n 1 retu
  • css按钮样式

    创建漂亮的 CSS 按钮的 10 个代码片段 IT程序狮子烨 1 个月前 如果你正在寻找一些高质量的 CSS 按钮的示例 那么这篇文章一定是你的 菜 在本文中 我们从 CodePen 上收集了 10 个独特的 CSS 按钮合集 并附有它们的
  • linux文件系统初始化过程(4)---加载initrd(中)

    一 目的 上文详细介绍了CPIO格式的initrd文件 本文从源代码角度分析加载并解析initrd文件的过程 initrd文件和linux内核一般存储在磁盘空间中 在系统启动阶段由bootload负责把磁盘上的内核和initrd加载到指定的
  • 苹果steam手机令牌未能连接服务器,steam手机令牌登不上怎么办(6种原因方法轻松解除)...

    引用自 平底锅揽件指南 随着 绝地求生 游戏的回温 最近芝士君收到了好多小伙伴关于 令牌 的问题 在这里为大家专门出一篇文章科普一下 好好看完这篇文章 以后妈妈再也不用担心我 绝地求生 游戏令牌出问题啦 在这里 芝士把大家遇到的问题总结为5
  • Java 中Arrays工具类的使用

    博主前些天发现了一个巨牛的人工智能学习网站 通俗易懂 风趣幽默 忍不住也分享一下给大家 点击跳转到网站 介绍 java util Arrays类即为操作数组的工具类 包含了用来操作数组 比如排序和搜索 的各种算法 下面我用代码给大家演示一下
  • 十二. Kubernetes Pod 与 探针

    目录 一 Pod Pod 中的多容器协同 Pod 的组成与paush 重要 Pod 的生命周期 Pod状态与重启策略 静态Pod 二 探针 1 livenessProbe存活探针 2 readinessProbe就绪探针 3 startup
  • 页面滚动动画库,快看看

    本文属xxKarina原创 转载请注明 个人博客地址 https xxkarina github io 前端涉及的领域真的很广 但是粗略的划分的话 其实就是简单的三要素 html css js 当然 这些基本的Web前端技术是远远不足以让你
  • SpringBoot+mybatis+thymeleaf实现登录功能

    项目文件目录一栏 2 开始工作 先按照上图建立好相应的controller mapper等文件 接着进行一个配置 首先是application properties server port 8080 启动端口 加载Mybatis配置文件 m
  • 2023 年如何将您的应用提交到 App Store

    您夜以继日地工作来创建您的梦想应用程序 最后 是时候向全世界宣布您的应用程序了 但不知道如何将您的应用提交到 App Store 为您的商店获取现成的移动应用程序 将应用程序提交到 App Store 可能是一项复杂的任务 但在本指南的帮助
  • 揭开智能卡的面纱

    一 概述 ICC是Integrated Circuit Card的缩写 意思是集成电路卡 我们通常把它称为智能卡 Smart Card 智能卡应用广泛 它可以用来保存私人密码 银行账号 个人资料等 那么如何编写应用程序 从智能卡上读出或向其
  • RabbitMQ之交换机类型

    一 交换机类型 在 RabbitMQ 中 交换机主要用来将生产者生产出来的消息 传送到对应的队列中 即交换机是一个消息传送的媒介 其英文被称为 exchange 交换机在 RabbitMQ 中起着承上启下的作用 交换机主要有四种类型 dir