【README】
本文介绍 通配符模式,及代码示例
【1】intro to rabbitmq通配符模式
0)通配符模式-交换机类型为 Topic;
1)与路由模式相比,相同点是 两者都可以通过 routingkey 把消息转发到不同的队列;
不同点是通配符模式-topic类型的exchange可以让队列在绑定routing key的时候使用通配符;
2)通配符模式的routingkey 通常使用多个单词并用点号连接,如 item.insert ;
3)通配符规则:
# 匹配一个或多个词;
* 匹配不多不少一个词;
荔枝:
item.# 能够匹配 item.insert.abc 或 item.insert ; (可以多层)
item.* 能够匹配 item.insert ; (只能一层)
refers2 https://www.rabbitmq.com/tutorials/tutorial-five-java.html
4)新建队列
5)把队列绑定到交换机
6)生产者发送消息到队列,路由key 分别是 item.insert , item.update, item.delete ; 如下:
【2】代码
生产者
/**
* 通配符模式-交换机类型为TOPIC
*/
public class WildProducer {
/* 交换机名称 */
static final String TOPIC_EXCHANGE = "topic_exchange";
/*队列名称1*/
static final String TOPIC_QUEUE_1 = "topic_queue_1";
/*队列名称2*/
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws Exception {
/*获取连接*/
Connection conn = RBConnectionUtil.getConn();
// 创建频道
Channel channel = conn.createChannel();
/**
* 声明交换机
* 参数1-交换机名称
* 参数2-交换机类型(fanout, topic, direct, headers)
*/
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
/**
* routingkey-路由键
*/
String itemInsertRoutingKey = "item.insert";
String itemUpdateRoutingKey = "item.update";
String itemDeleteRoutingKey = "item.delete";
/* 发送消息-insert */
/**
* 参数1 交换机名称 如果没有指定则使用默认 default exchange
* 参数2 routingkey-路由key, 简单模式可以传递队列名称
* 参数3 消息其他属性
* 参数4 消息内容
*/
String insertMsg = "我是消息,通配符模式,routingkey=" + itemInsertRoutingKey + MyDateUtil.getNow();
channel.basicPublish(TOPIC_EXCHANGE, itemInsertRoutingKey, null, insertMsg.getBytes());
System.out.println("已发送消息=" + insertMsg);
String updMsg = "我是消息,通配符模式,routingkey=" + itemUpdateRoutingKey + MyDateUtil.getNow();
channel.basicPublish(TOPIC_EXCHANGE, itemUpdateRoutingKey, null, updMsg.getBytes());
System.out.println("已发送消息=" + updMsg);
String deleteMsg = "我是消息,通配符模式,routingkey=" + itemDeleteRoutingKey + MyDateUtil.getNow();
channel.basicPublish(TOPIC_EXCHANGE, itemDeleteRoutingKey, null, deleteMsg.getBytes());
System.out.println("已发送消息=" + deleteMsg);
/* 关闭连接和信道 */
channel.close();
conn.close();
}
}
消费者1 topic_queue_1
/**
* 通配符模式消费者-routingkey
*/
public class RouteConsumerWild1 {
/* 交换机名称 */
static final String TOPIC_EXCHANGE = "topic_exchange";
/*队列名称1*/
static final String TOPIC_QUEUE_1 = "topic_queue_1";
public static void main(String[] args) throws Exception {
/*创建连接 */
Connection conn = RBConnectionUtil.getConn();
/*创建队列*/
Channel channel = conn.createChannel();
/*声明交换机*/
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
/**
* routingkey-路由键
*/
String itemInsertRoutingKey = "item.insert";
String itemUpdateRoutingKey = "item.update";
String itemDeleteRoutingKey = "item.delete";
/**
* 声明/创建队列
* 参数1 队列名称
* 参数2 是否持久化
* 参数3 是否独占本连接
* 参数4 是否在不使用的时候自动删除队列
* 参数5 队列其他参数
*/
// channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null); // ui界面可以创建队列
/**
* 队列绑定交换机
* 参数1 队列名称
* 参数2 交换机
* 参数3 routingkey-路由键
*/
// channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHANGE, "item.#"); // ui界面可以把队列绑定到交换机
/* 创建消费者,设置消息处理逻辑 */
Consumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag 消费者标签,在 channel.basicConsume 可以指定
* @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送)
* @param properties 基本属性
* @param body 消息字节数组
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
System.out.println("=== 消费者1 start ===");
System.out.println("路由key=" + envelope.getRoutingKey());
System.out.println("交换机=" + envelope.getExchange());
System.out.println("消息id=" + envelope.getDeliveryTag());
String message = new String(body, "UTF-8");
System.out.println(String.format("消费者收到的消息【%s】", message));
System.out.println("=== 消费者1 end ===\n");
}
};
/**
* 监听消息
* 参数1 队列名称
* 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack;
* 参数3 消息接收后的回调
*/
channel.basicConsume(TOPIC_QUEUE_1, true, consumer);
}
}
消费者2 topic_queue_2
/**
* 通配符模式消费者-routingkey
*/
public class RouteConsumerWild2 {
/* 交换机名称 */
static final String TOPIC_EXCHANGE = "topic_exchange";
/*队列名称1*/
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws Exception {
/*创建连接 */
Connection conn = RBConnectionUtil.getConn();
/*创建队列*/
Channel channel = conn.createChannel();
/*声明交换机*/
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
/**
* routingkey-路由键
*/
String itemInsertRoutingKey = "item.insert";
String itemUpdateRoutingKey = "item.update";
String itemDeleteRoutingKey = "item.delete";
/**
* 声明/创建队列
* 参数1 队列名称
* 参数2 是否持久化
* 参数3 是否独占本连接
* 参数4 是否在不使用的时候自动删除队列
* 参数5 队列其他参数
*/
// channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null); // ui界面可以创建队列
/**
* 队列绑定交换机
* 参数1 队列名称
* 参数2 交换机
* 参数3 routingkey-路由键
*/
// channel.queueBind(TOPIC_QUEUE_2 TOPIC_EXCHANGE, "*.delete"); // ui界面可以把队列绑定到交换机
/* 创建消费者,设置消息处理逻辑 */
Consumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag 消费者标签,在 channel.basicConsume 可以指定
* @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送)
* @param properties 基本属性
* @param body 消息字节数组
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
System.out.println("=== 消费者1 start ===");
System.out.println("路由key=" + envelope.getRoutingKey());
System.out.println("交换机=" + envelope.getExchange());
System.out.println("消息id=" + envelope.getDeliveryTag());
String message = new String(body, "UTF-8");
System.out.println(String.format("消费者收到的消息【%s】", message));
System.out.println("=== 消费者1 end ===\n");
}
};
/**
* 监听消息
* 参数1 队列名称
* 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack;
* 参数3 消息接收后的回调
*/
channel.basicConsume(TOPIC_QUEUE_2, true, consumer);
}
}
【3】 rabbitmq 模式总结
8.1)模式1 简单模式 helloworld
一个生产者,一个消费者,不需要设置交换机,使用默认交换机;
8.2)模式2 工作队列模式 work queue
一个生产者,多个消费者(竞争关系),不需要设置交换机(使用默认交换机);
8.3)发布订阅模式 publish/subscribe
需要设置类型为 fanout-广播的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列;
8.4)路由模式 routing
需要设置类型为 direct的交换机, 交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key 将消息发送到对应队列;
8.5)通配符模式 topic
需要设置类型为 topic的交换机, 交换机和队列进行绑定, 并且指定通配符方式的routing key, 当发送消息到交换机后,交换机会根据 routing key将消息发送到对应的队列;
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)