rabbitmq-通配符模式

2023-05-16

【README】

本文介绍 通配符模式,及代码示例

【1】intro to rabbitmq通配符模式

0)通配符模式-交换机类型为 Topic
1)与路由模式相比,相同点是 两者都可以通过 routingkey 把消息转发到不同的队列;
不同点是通配符模式-topic类型的exchange可以让队列在绑定routing key的时候使用通配符;
2)通配符模式的routingkey 通常使用多个单词并用点号连接,如 item.insert ;
3)通配符规则:
# 匹配一个或多个词;
* 匹配不多不少一个词;  
荔枝:
item.# 能够匹配 item.insert.abc 或 item.insert  ; (可以多层)
item.* 能够匹配 item.insert ;  (只能一层)

refers2 https://www.rabbitmq.com/tutorials/tutorial-five-java.html 

4)新建队列

5)把队列绑定到交换机 

6)生产者发送消息到队列,路由key 分别是 item.insert , item.update, item.delete ; 如下:

【2】代码

生产者

/**
 * 通配符模式-交换机类型为TOPIC
 */
public class WildProducer {
	/* 交换机名称 */
	static final String TOPIC_EXCHANGE = "topic_exchange"; 
	/*队列名称1*/ 
	static final String TOPIC_QUEUE_1 = "topic_queue_1";
	/*队列名称2*/
	static final String TOPIC_QUEUE_2 = "topic_queue_2";
	
	public static void main(String[] args) throws Exception {
		/*获取连接*/
		Connection conn = RBConnectionUtil.getConn();
		// 创建频道 
		Channel channel = conn.createChannel();
		/**
		 * 声明交换机
		 * 参数1-交换机名称  
		 * 参数2-交换机类型(fanout, topic, direct, headers)
		 */
		channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);  
		/**
		 * routingkey-路由键 
		 */
		String itemInsertRoutingKey = "item.insert";  
		String itemUpdateRoutingKey = "item.update";
		String itemDeleteRoutingKey = "item.delete";
		
		/* 发送消息-insert */  
		/**
		 * 参数1 交换机名称 如果没有指定则使用默认 default exchange 
		 * 参数2 routingkey-路由key, 简单模式可以传递队列名称 
		 * 参数3 消息其他属性
		 * 参数4 消息内容 
		 */
		String insertMsg = "我是消息,通配符模式,routingkey=" + itemInsertRoutingKey + MyDateUtil.getNow();
		channel.basicPublish(TOPIC_EXCHANGE, itemInsertRoutingKey, null, insertMsg.getBytes());
		System.out.println("已发送消息=" + insertMsg); 
		
		String updMsg = "我是消息,通配符模式,routingkey=" + itemUpdateRoutingKey + MyDateUtil.getNow();
		channel.basicPublish(TOPIC_EXCHANGE, itemUpdateRoutingKey, null, updMsg.getBytes());
		System.out.println("已发送消息=" + updMsg);
		
		String deleteMsg = "我是消息,通配符模式,routingkey=" + itemDeleteRoutingKey + MyDateUtil.getNow();
		channel.basicPublish(TOPIC_EXCHANGE, itemDeleteRoutingKey, null, deleteMsg.getBytes());
		System.out.println("已发送消息=" + deleteMsg);
		
		/* 关闭连接和信道 */ 
		channel.close();
		conn.close(); 
	}
}

消费者1  topic_queue_1

/**
 * 通配符模式消费者-routingkey 
 */
public class RouteConsumerWild1 {
	/* 交换机名称 */
	static final String TOPIC_EXCHANGE = "topic_exchange"; 
	/*队列名称1*/ 
	static final String TOPIC_QUEUE_1 = "topic_queue_1";
	
	public static void main(String[] args) throws Exception {
		/*创建连接 */
		Connection conn = RBConnectionUtil.getConn();
		/*创建队列*/
		Channel channel = conn.createChannel(); 
		/*声明交换机*/
		channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
		
		/**
		 * routingkey-路由键 
		 */
		String itemInsertRoutingKey = "item.insert";  
		String itemUpdateRoutingKey = "item.update";
		String itemDeleteRoutingKey = "item.delete";
		
		/**
		 * 声明/创建队列 
		 * 参数1 队列名称 
		 * 参数2 是否持久化
		 * 参数3 是否独占本连接 
		 * 参数4 是否在不使用的时候自动删除队列
		 * 参数5 队列其他参数 
		 */
//		channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);  // ui界面可以创建队列 
		/**
		 * 队列绑定交换机
		 * 参数1 队列名称
		 * 参数2 交换机
		 * 参数3 routingkey-路由键 
		 */
//		channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHANGE, "item.#"); // ui界面可以把队列绑定到交换机 
		
		/* 创建消费者,设置消息处理逻辑 */
		Consumer consumer = new DefaultConsumer(channel) {
			/**
			 * @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   
			 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) 
			 * @param properties 基本属性
			 * @param body 消息字节数组  
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				System.out.println("=== 消费者1 start ===");
				
				System.out.println("路由key=" + envelope.getRoutingKey());
				System.out.println("交换机=" + envelope.getExchange());
				System.out.println("消息id=" + envelope.getDeliveryTag()); 
				String message = new String(body, "UTF-8");
				System.out.println(String.format("消费者收到的消息【%s】", message)); 
				System.out.println("=== 消费者1 end ===\n"); 
			} 
		};
		/**
		 * 监听消息  
		 * 参数1 队列名称 
		 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; 
		 * 参数3 消息接收后的回调 
		 */
		channel.basicConsume(TOPIC_QUEUE_1, true, consumer); 
	}
}

消费者2 topic_queue_2

/**
 * 通配符模式消费者-routingkey 
 */
public class RouteConsumerWild2 {
	/* 交换机名称 */
	static final String TOPIC_EXCHANGE = "topic_exchange"; 
	/*队列名称1*/ 
	static final String TOPIC_QUEUE_2 = "topic_queue_2";
	
	public static void main(String[] args) throws Exception {
		/*创建连接 */
		Connection conn = RBConnectionUtil.getConn();
		/*创建队列*/
		Channel channel = conn.createChannel(); 
		/*声明交换机*/
		channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
		
		/**
		 * routingkey-路由键 
		 */
		String itemInsertRoutingKey = "item.insert";  
		String itemUpdateRoutingKey = "item.update";
		String itemDeleteRoutingKey = "item.delete";
		
		/**
		 * 声明/创建队列 
		 * 参数1 队列名称 
		 * 参数2 是否持久化
		 * 参数3 是否独占本连接 
		 * 参数4 是否在不使用的时候自动删除队列
		 * 参数5 队列其他参数 
		 */
//		channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);  // ui界面可以创建队列 
		/**
		 * 队列绑定交换机
		 * 参数1 队列名称 
		 * 参数2 交换机
		 * 参数3 routingkey-路由键  
		 */
//		channel.queueBind(TOPIC_QUEUE_2 TOPIC_EXCHANGE, "*.delete"); // ui界面可以把队列绑定到交换机  
		
		/* 创建消费者,设置消息处理逻辑 */
		Consumer consumer = new DefaultConsumer(channel) {
			/** 
			 * @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   
			 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) 
			 * @param properties 基本属性
			 * @param body 消息字节数组  
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				System.out.println("=== 消费者1 start ===");
				
				System.out.println("路由key=" + envelope.getRoutingKey());
				System.out.println("交换机=" + envelope.getExchange());
				System.out.println("消息id=" + envelope.getDeliveryTag()); 
				String message = new String(body, "UTF-8");
				System.out.println(String.format("消费者收到的消息【%s】", message)); 
				System.out.println("=== 消费者1 end ===\n"); 
			} 
		};
		/**
		 * 监听消息  
		 * 参数1 队列名称 
		 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; 
		 * 参数3 消息接收后的回调 
		 */
		channel.basicConsume(TOPIC_QUEUE_2, true, consumer); 
	}
}

【3】 rabbitmq 模式总结  

8.1)模式1 简单模式 helloworld

一个生产者,一个消费者,不需要设置交换机,使用默认交换机;

8.2)模式2 工作队列模式 work queue

一个生产者,多个消费者(竞争关系),不需要设置交换机(使用默认交换机);

8.3)发布订阅模式  publish/subscribe

需要设置类型为 fanout-广播的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列;

8.4)路由模式 routing

需要设置类型为 direct的交换机, 交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key 将消息发送到对应队列;

8.5)通配符模式 topic

需要设置类型为 topic的交换机, 交换机和队列进行绑定, 并且指定通配符方式的routing key, 当发送消息到交换机后,交换机会根据 routing key将消息发送到对应的队列;

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rabbitmq-通配符模式 的相关文章

  • 使用AWS SQS作为Aurora数据库的写入队列来提高系统性能是否有效

    我正在 AWS 上开发一个 Web 应用程序服务器 需要支持高吞吐量的读写 我的老板给了我这样的高级设计 我被困在 写入队列 上 团队告诉我 我们需要它来提高写入性能 因为我们只能有 1 个可以写入的主副本 我对 SQS 和 RabbitM
  • 使用Camel的spring-rabbitmq组件时如何自动声明交换?

    我正在尝试从 Camel 3 x 迁移到 Camel 4 x 版本 因此我需要从rabbitmq替换组件spring rabbitmq With rabbitmq我正在使用的组件declare https camel apache org
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • rabbitmq 通道因 PRECONDITION_FAILED 关闭 - 快速回复消费者不存在

    当我们从 Spring Boot 服务向rabbitmq 发布消息时 出现以下错误 而且这是间歇性的 我们无法重现这一点 AMQP 连接 123 11 xxx xx 5672 错误 org springframework amqp rabb
  • RabbitMQ 启动失败

    RabbitMQ Windows 服务将无法启动 C Program Files x86 RabbitMQ Server rabbitmq server 3 0 4 sbin gt rabbitmq service bat start C
  • 在rabbitmq配置spring boot中在AMQP中配置多个Vhost

    我正在实现一个项目 我必须在rabbitmq中的不同虚拟主机之间发送消息 使用 SimpleRoutingConnectionFactory 但得到 java lang IllegalStateException 无法确定查找键的目标 Co
  • RabbitMQ 失败,错误:无法连接到节点rabbit@TPAJ05421843:nodedown

    在 Windows 7 Enterprise 计算机上 我全新安装了 Erlang 17 4 和 RabbitMQ 3 4 3 x64 安装成功且顺利 我还没有尝试创建我的第一个队列或交换器 但我已经看到了麻烦 这个问题类似于另一个SO帖子
  • Celery 与rabbitmq 创建结果多个队列

    我已经用 RabbitMQ 安装了 Celery 问题是 对于返回的每个结果 Celery 都会在 Rabbit 中创建队列 并在交换 celeryresults 中使用任务 ID 我仍然想得到结果 但在一个队列上 我的芹菜配置 from
  • 即使设置了 cookie,RabbitMQ 身份验证也会失败

    我最近在运行 lattePanda 的 Windows 10 上安装了带有 ErlanOTP 的rabbitmq 我运行rabbitmqctl status并收到以下错误 C Program Files RabbitMQ Server ra
  • RabbitMQ - 无法联系统计数据库。消息速率和队列长度将不会显示

    我已经设置了一个兔子经纪人集群 并且在管理门户插件中我收到以下消息 无法联系统计数据库 消息速率和队列长度将不会显示 我已经搜索过这个错误 但谷歌并不友善 任何人都可以阐明这一点吗 我最近在旧安装的RabbitMQ 2 8 7 上遇到了同样
  • 服务器在 pika.exceptions.StreamLostError: Stream 连接丢失后关闭

    我的队列中有一些图像 我将每个图像传递到我的 Flask 服务器 在其中完成图像处理 并在我的rabbitmq 服务器中收到响应 收到响应后 我收到此错误 pika exceptions StreamLostError 流连接丢失 104
  • 如何使用 Java 在 RabbitMQ 中实现标头交换?

    我是一个新手 试图在java客户端中实现标头交换 我知道这就是 x match 绑定参数的用途 当 x match 参数设置为 any 时 只需一个匹配的标头值就足够了 或者 将 x match 设置为 all 强制所有值必须匹配 但任何人
  • 从 RabbitMQ 迁移到 Amazon SQS [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我们的初创公司目前正在使用RabbitMQ with Python Django 对于消息队列 现在我们计划转移到Amazon SQS其高可用性
  • Celery 广播 vs RabbitMQ 扇出

    我最近一直在使用 Celery 但我不喜欢它 它的配置很混乱 过于复杂并且文档记录很少 我想用 Celery 从单个生产者向多个消费者发送广播消息 让我困惑的是 Celery 术语和底层传输 RabbitMQ 术语之间的差异 在 Rabbi
  • Django Celery 和多个数据库(Celery、Django 和 RabbitMQ)

    是否可以设置与 Django Celery 一起使用的不同数据库 我有一个配置了多个数据库的项目 并且不希望 Django Celery 使用默认数据库 如果我仍然可以使用 django celery 管理页面并读取存储在这个不同数据库中的
  • celery任务eta已关闭,使用rabbitmq

    我使用教程中的默认设置和在 ubuntu 上运行的rabbitmq 使 Celery 任务正常进行 当我毫不延迟地安排任务时 一切都很好 但是当我给他们一个预计时间时 他们会被安排在未来 就好像我的时钟在某个地方关闭了一样 下面是一些请求任
  • 使用 Spring Boot 的多个 Rabbitmq 队列

    来自 Spring Boot 教程 https spring io guides gs messaging rabbitmq https spring io guides gs messaging rabbitmq 他们给出了创建 1 个队
  • 在 Spring 中设计复杂的通知系统

    我想设计和实现一个复杂的通知系统 其中我有一个用户将自动和手动订阅不同的事件 经过一番研究后 我决定使用 websockets 将通知推送到客户端 并订阅事件 我会选择 RabbitMQ 特别是直接交换 最初的想法是 在建立 websock
  • rabbitmq和spring-rabbitmq中的DLX——拒绝消息的一些注意事项

    我确实读过这个参考资料 https www rabbitmq com dlx html https www rabbitmq com dlx html 但这并不能解决我的疑问 即 如果接受消息没有问题 spring rabbitmq发送 a
  • 具有延迟的简单可扩展工作/消息队列

    我需要设置一个作业 消息队列 并可以选择为任务设置延迟 以便空闲工作人员不会立即拾取它 而是在一定时间后 可能因任务而异 我研究了几个 Linux 队列解决方案 rabbitmq gearman memcacheq 但它们似乎都没有提供开箱

随机推荐

  • vscode1.65.2 + anaconda 在python拓展为2022.2.1924087327版本上的问题及解决

    在vscode 提示更新的时候我更新到了最新的版本 xff0c 但是遇到了下面的问题 xff1a 即在ananonda环境内安装了pytorch xff0c 但是在vscode里面一直提示no module named torch 一开始以
  • cpolar内网穿透:自动化登录服务器

    使用cpolar内网穿透工具自动化登录服务器 1 本文简介2 实现2 1 安装与卸载cpolar2 2 使用cpolar登录服务器 3 注意 1 本文简介 家里没有公网ipv4 xff0c 公网ipv6默认不能直接访问 xff0c 懒得找电
  • Ubuntu的sources.list文件

    更新后用apt get update更新源 1 Ubuntu 20 10 sources list deb http span class token operator span span class token comment old r
  • 关于模型训练中显存占用过大的或直接报显存爆炸的解决方法

    模型训练显存爆炸解决方法 在模型训练中 xff0c 应该理解梯度 反向传播 图层 显存这些概念 xff0c 在模型训练过程中 xff0c 一般会分为训练 43 验证 43 测试 xff0c 在这些过程中 xff0c 一般在训练过程中会比较占
  • 关于archlinux的安装

    安装系统时间 xff1a 2022年9月18日 镜像版本 xff1a archlinux 2022 09 03 x86 64 iso Linux内核版本 xff1a Linux version 5 15 68 1 lts linux lts
  • SpringBoot项目在使用Maven打包war中遇到的问题

    问题描述 在使用maven打包 xff08 package xff09 springboot项目为war项目后 xff0c 在本地机器上使用Tomcat跑这个项目 xff0c 访问资源时出现下面的错误 xff1a span class to
  • 做一个完整网站的流程(独立完成个人网站)

    注 xff1a 我写这个博客的目的只是为了分享我自己做网站的流程 xff0c 不是教大家一行一行敲代码的 xff0c 如果是想学语言的基本操作 xff0c 只能自己去找视频学习了 1 购买服务器 xff08 根据自己的需求以及爱好购买 xf
  • 维修1台联想SR550服务器亮黄灯 感叹号 开不了机

    客户信息 xff1a 一个省外客户朋友公司 设备型号 xff1a Lenovo ThinkSystem SR550 故障问题 xff1a 主机前面板亮黄灯 xff0c 能开机但无法正常完成BIOS UEFI自检程序 xff0c 故障界面 x
  • Android 8.0 利用Settings.Global属性跨应用定义标志位

    需求 需要在不同应用中定义一个标志位 xff0c 这里介绍下系统级别的应用和非系统级别应用如何添加 当然这不一定是最好的办法 xff0c 因为不能够添加intent putExtra 属性 系统级别应用 在需要定义的地方使用 SystemP
  • k-近邻算法实现手写数字识别系统

    k 近邻算法实现手写数字识别系统 一 实验介绍 1 1 实验内容 本实验将会从电影题材分类的例子入手 xff0c 详细讲述k 近邻算法的原理 在这之后 xff0c 我们将会使用该算法实现手写数字识别系统 1 2 课程来源 本课程源自 图灵教
  • 调整eclipse控制台console的方法

    调整eclipse控制台console的方法 会把在用eclipse的过程中产生的问题和找到的解决方案记录一下 xff0c 以便之后再用到 今天在运行代码的时候 xff0c 突然控制台和代码并列了 然后百度了一下找到了方法 windows
  • Linux: 运行sh命令时command not found

    问题 xff1a 解决 xff1a 1 查看PATH变量 echo PATH 2 把查询出来的PATH放到sh文件中并导入
  • 修改git tag的描述信息

    今天手贱 xff0c 非要用TortoiseGit打tag xff0c 没用命令行 xff0c 结果这不是还没有学习么 xff0c 然后就出现问题了 不过好在是我自己的Toy代码 xff0c 那就看看如何解决吧 问题描述 使用Tortois
  • Linux 设置用户登录超时

    Linux 系统中使用SSH进行远程登录 xff0c 如果长时间不操作将自动注销用户的登录 原本以为在 etc ssh sshd config文件中配置 查了资料和测试只需要在shell环境变量中设置即可 span class hljs c
  • rime配置

    文件路径 AppData Rime 配置修改 default custom yaml span class hljs label customization span span class hljs label distribution c
  • matlab中(),[],与{}的区别认识

    转载自 http blog csdn net CV YOU article details 52873666 在matlab中 xff0c 常常会遇到 xff0c 和 这个3种符号怎么区分 xff0c 怎么用 xff0c 这里我来总结一下
  • WinServer2012 R2忘记密码的解决方案+远程连接另一种莫名其妙故障

    WinServer2012 R2忘记密码的解决方案 43 远程连接另一种莫名其妙故障 参考文章 xff1a xff08 1 xff09 WinServer2012 R2忘记密码的解决方案 43 远程连接另一种莫名其妙故障 xff08 2 x
  • 迅雷 应版权方要求,文件无法下载 解决方法

    迅雷 应版权方要求 xff0c 文件无法下载 解决方法 参考文章 xff1a xff08 1 xff09 迅雷 应版权方要求 xff0c 文件无法下载 解决方法 xff08 2 xff09 https www cnblogs com sui
  • redis集群搭建报错-(error) CLUSTERDOWN The cluster is down

    README 最近搭建一个redis集群 xff0c 参考博文 xff08 https www cnblogs com mafly p redis cluster html xff09 对集群配置后 xff0c master xff0c s
  • rabbitmq-通配符模式

    README 本文介绍 通配符模式 xff0c 及代码示例 1 intro to rabbitmq通配符模式 0 xff09 通配符模式 交换机类型为 Topic xff1b 1 xff09 与路由模式相比 xff0c 相同点是 两者都可以