RabbitMQ–基础–7.4–工作模式–路由模式(Direct)
代码位置
https://gitee.com/DanShenGuiZu/learnDemo/tree/master/rabbitMq-learn/rabbitMq-03
1、介绍
- Direct 模式在 Fanout 模式之上做了一个路由键 RoutingKey,对发送给交换机 Exchange 的消息进行筛选
- 每个消费者监听自己的队列,并且设置带统配符的 routingkey
- 生产者将消息发给broker,由交换机根据 routingkey 来转发消息到指定的队列
1.1、路由模式结构图
1.1.1、P
- 生产者
- 向交换机(Exchange)发送消息,发送消息时,会指定一个路由键 routing key
1.1.2、X
- 交换机(Exchange)
- 接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列
1.1.3、Q1
- 队列(消费者监听此队列)
- 指定了绑定键(BindingKey)为warning,且该绑定键与交换机(Exchange)进行了绑定
1.1.4、Q2
- 队列(消费者监听此队列)
- 指定了绑定键(BindingKey)为warning,info,且该绑定键与交换机(Exchange)进行了绑定
1.1.5、Q3
- 队列(消费者监听此队列)
- 指定了绑定键(BindingKey)为debug、info,且该绑定键与交换机(Exchange)进行了绑定
1.2、生产者处理流程
声明队列并声明交换机 -> 创建连接 -> 创建通道 -> 通道声明交换机 -> 通道声明队列 -> 通过通道使队列绑定到交换机并指定该队列的routingkey(通配符) -> 制定消息 -> 发送消息并指定routingkey(通配符)
1.3、生消费者处理流程
声明队列并声明交换机 -> 创建连接 -> 创建通道 -> 通道声明交换机 -> 通道声明队列 -> 通过通道使队列绑定到交换机并指定routingkey(通配符) -> 重写消息消费方法 -> 执行消息方法
1.4、举例
- 该交换机的类型为direct,生产
- 如果生产者发送一条消息,如果在发送消息时,设置路由键为 warning
- 消息会路由到 Q1、Q2
- 如果生产者发送一条消息,如果在发送消息时,设置路由键为 info
- 消息会路由到 Q2、Q3
- 如果生产者发送一条消息,如果在发送消息时,设置路由键为 debug
- 消息会路由到 Q3
2、MQ实现
2.1、创建队列
direct_queue1
direct_queue2
direct_queue3
2.2、新建一个交换机
- 新建一个交换机
- 类型为 direct
direct_exchange
2.3、绑定交换机与队列的关系
- 绑定时,明确绑定键 BindingKey
direct_queue1
direct_queue2
direct_queue3
warning
info
debug
最终效果
2.4、在交换机 direct_exchange 中发送消息
- 需要指定一个路由键
2.5、查看消息
上述的路由键为"warning",则会收到的队列有:direct_queue1、direct_queue2
3、代码实现
3.1、代码结构
3.2、生产者
3.2.1、相对于工作队列模式
- 交换机类型不同
- 工作队列模式的交换机类型为 默认的交换机
- 路由模式的交换机类型为 direct
- 给交换机发送消息时,指定了一个路由键 RountingKey
3.2.2、代码
package com.example.rabbitmq03.business.test5;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
private static final String DIRECT_EXCHANGE_NAME = "code_direct_exchange";
public static void main(String[] args) throws Exception {
// 1. 获取连接
Connection connection = RabbitMqUtil.getConnection("生产者");
// 2. 通过连接获取通道 Channel
Channel channel = connection.createChannel();
// 3. 通过通道声明交换机,以及交换机类型为 direct
/**
* @param1:交换机名称
* @param2:交换机的类型。常见的有:fanout、direct、topic
* @param3:durable:设置是否持久化。设置为 true,表示持久化。持久化可以将交换机存盘,在服务器重启的时候不会丢失相关信息
* @param4:autoDelete:设置是否自动删除。 设置为true,表示自动删除。自动删除的前提是至少有一个队列或者交换机与这个交换机绑定,之后所有与这个交换机绑定的队列或者交换机都与此解绑
* @param5:internal:设置是否内置。设置为 true,则表示是内置的交换机。客户端程序无法直接发送消息到这个交换机中,只能通过交换器路由到交换机这种方式
* @param6:其它一些结构化参数
*/
// declareOk 用来标识成功声明了一个交换机。
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, "direct", false, false, false,
null);
// 4. 消息内容
String message = "Hello direct warning";
String routingKey = "warning";
// 5. 发送消息到交换机,并指定路由键 RoutingKey 为 warning
channel.basicPublish(DIRECT_EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("消息发送完成~~~发送的消息为:" + message);
// 6. 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
3.3、消费者
3.3.1、相对于工作队列模式
- Consumer 绑定队列和交换机,指定了1个绑定键 BindingKey
- Consumer2 绑定队列和交换机,指定了2个绑定键 BindingKey
3.3.2、代码
package com.example.rabbitmq03.business.test5;
import java.io.IOException;
import com.rabbitmq.client.*;
public class Consumer {
private static final String DIRECT_QUEUE_NAME = "direct_queue1";
private static final String DIRECT_EXCHANGE_NAME = "code_direct_exchange";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = RabbitMqUtil.getConnection("消费者");
// 获取通道
Channel channel = connection.createChannel();
// 绑定队列到交换机
// param1:队列名称
// param2:交换机名称
// param3:绑定键
// param4:定义绑定时的一些参数
channel.queueBind(DIRECT_QUEUE_NAME, DIRECT_EXCHANGE_NAME, "warning", null);
// 定义消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 消息体
String msg = new String(body, "utf-8");
System.out.println("收到消息:" + msg);
}
};
// 监听队列
channel.basicConsume(DIRECT_QUEUE_NAME, true, consumer);
System.out.println("开始接收消息~~~");
System.in.read();
// 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
package com.example.rabbitmq03.business.test5;
import java.io.IOException;
import com.rabbitmq.client.*;
public class Consumer2 {
private static final String DIRECT_QUEUE_NAME = "direct_queue2";
private static final String DIRECT_EXCHANGE_NAME = "code_direct_exchange";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = RabbitMqUtil.getConnection("消费者");
// 获取通道
Channel channel = connection.createChannel();
String bindingKey = "warning";
String bindingKey2 = "info";
// 绑定队列到交换机,并指定一个绑定键 BindingKey
channel.queueBind(DIRECT_QUEUE_NAME, DIRECT_EXCHANGE_NAME, bindingKey);
channel.queueBind(DIRECT_QUEUE_NAME, DIRECT_EXCHANGE_NAME, bindingKey2);
// 定义消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 消息体
String msg = new String(body, "utf-8");
System.out.println("收到消息:" + msg);
}
};
// 监听队列
channel.basicConsume(DIRECT_QUEUE_NAME, true, consumer);
System.out.println("开始接收消息~~~");
System.in.read();
// 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
3.4、测试