RabbitMQ工作模式
简述:
RabbitMQ主要有五种工作模式,分别是:
1、简单模式(Hello World)
2、工作队列模式(Work Queue)
3、发布/订阅模式(publish/subscribe)
4、路由模式(routing)
5、主题模式(Topic)
而后面,我通过解释+代码实现的形式展现这五种模式:
先附上工具类代码,主要用于方便创建连接:
public class RabbitUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
static {
connectionFactory.setHost("81.71.140.7");
connectionFactory.setPort(5672);
connectionFactory.setUsername("bq123");
connectionFactory.setPassword("bq123");
connectionFactory.setVirtualHost("/zm");
}
public static Connection getConnection(){
Connection conn = null;
try {
conn = connectionFactory.newConnection();
return conn;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
一、简单模式(一对一)
![在这里插入图片描述](https://img-blog.csdnimg.cn/5b5e620a7b4d4f85a1f57de257bc7c72.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTI2NDk5Mg==,size_16,color_FFFFFF,t_70#pic_center)
消息的消费者(consumer) 监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患:消息可能没有被消费者正确处理,就因为消费者的虚拟连接通道自动返回确认信号,导致消息从队列中删除了,造成消息的丢失。所以建议这里可以设置成手动的ack,即确认信号我们手动确认。但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
流程:
1、建立生产者与服务器的TCP长连接。
2、在TCP连接中再建立独立的轻量级的虚拟连接Channel。
3、通过Channel连接去声明一个队列,如果队列已存在则直接使用存在的队列。
4、生产者通过Channel往队列中发送消息。
5、消费者同生产者一样建立与服务器的连接。
6、消费者声明虚拟连接Channel.
7、消费者通过Channel声明一个队列(和生产者声明的队列同名)
8、消费者从该队列中获取消息,并且返回确认。收到确认后,队列删除已读消息。
生产者:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection conn = RabbitUtils.getConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null);
String message = "hello666";
channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null,message.getBytes());
channel.close();
conn.close();
System.out.println("===发送成功===");
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection conn = RabbitUtils.getConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null);
channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel));
}
}
class Reciver extends DefaultConsumer {
private Channel channel;
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者接收到的消息:"+message);
System.out.println("消息的TagId:"+envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
、
二、工作队列模式(一对多,互相抢夺消息)
消息产生者将消息放入队列,消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1, C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。
![在这里插入图片描述](https://img-blog.csdnimg.cn/4dcfb7a0ba924f078e418a1b530b8e0d.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTI2NDk5Mg==,size_16,color_FFFFFF,t_70#pic_center)
订票下单系统(消息生产者)
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
for(int i = 1 ; i <= 100 ; i++) {
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
String jsonSMS = new Gson().toJson(sms);
channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
}
System.out.println("发送数据成功");
channel.close();
connection.close();
}
}
短信发送系统1号(消息消费者)
public class SMSSender1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender1-短信发送成功:" + jsonSMS);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
短信发送系统2号(消息消费者)
public class SMSSender2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender2-短信发送成功:" + jsonSMS);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
三、发布 / 订阅模式 (所有消费者共享消息)
生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
![在这里插入图片描述](https://img-blog.csdnimg.cn/c43d3e8610a240598de266c1c64ede24.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTI2NDk5Mg==,size_16,color_FFFFFF,t_70#pic_center)
流程:
1、生产者与消费者都与服务器建立好TCP连接,并建立各自的虚拟连接。
2、生产者通过Channel将消息发送至交换机中
3、消费者通过Channel声明一个队列,并且绑定对应的交换机
4、交换机会将消息发给每个与它绑定的队列中,所以队列都能获得全部消息。
5、消费者从队列中读取消息,并且返回确认。收到确认后,相关队列删除已读消息。
生产者
public class WeatherBureau {
public static void main(String[] args) throws Exception {
Connection connection = RabbitUtils.getConnection();
String input = new Scanner(System.in).next();
Channel channel = connection.createChannel();
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());
channel.close();
connection.close();
}
}
消费者1
public class BiaDu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消费者2
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
四、路由模式
![在这里插入图片描述](https://img-blog.csdnimg.cn/5eb14b48689a4140b5e8fdfc3c420abb.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTI2NDk5Mg==,size_16,color_FFFFFF,t_70#pic_center)
1.生产者发送的消息是以key-value的形式,当消息进入交换机,交换机根据key的不同,将其分配到不同的队列。
2.消费者通过Channel声明一个队列时,需要绑定给队列绑定一个key和一个交换机。
2.业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;
生产者
public class WeatherBureau {
public static void main(String[] args) throws Exception {
Map area = new LinkedHashMap<String, String>();
area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201128天气数据");
area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
消费者1
public class BiaDu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20201127");
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消费者2
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201128");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20201012");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
五、主题模式(路由模式的一种)
![在这里插入图片描述](https://img-blog.csdnimg.cn/daa5ed9bc9b44319ae7ca34edfb55169.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTI2NDk5Mg==,size_16,color_FFFFFF,t_70#pic_center)
![在这里插入图片描述](https://img-blog.csdnimg.cn/0d8b630563ce412db59844ecb48e6ba5.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTI2NDk5Mg==,size_16,color_FFFFFF,t_70#pic_center)
生产者
public class WeatherBureau {
public static void main(String[] args) throws Exception {
Map area = new LinkedHashMap<String, String>();
area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
消费者1
public class BiaDu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消费者2
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)