AMQP 和IM的区别:
AMQP: 高级消息队列
1、可以一对多广播,也可以一对一广播
2、生产者和消费者不知道对方是谁
IM:
1、只能一对一广播
2、生产者和消费者知道对方是谁
RabbitMQ:只是消息代理
我们不生产消息,我们只是消息的搬运工
每条消息只会发送给一个订阅者(一个萝卜一个坑)
消息的传递过程:
生产者==>MQ==>消费者
消息
有效载荷:需要传输的数据内容本身
标签:用来告诉Rabbit消息应该投递给谁
MQ的相关概念
交换器:用于接收生产者发送过来的消息
队列:用来存储生产者发送过来的消息,供消费者消费,可以理解为装消息的容器
绑定:用来决策交换器将消息发送到哪个队列上
如果用乘坐火车打比方,可以这样理解RabbitMQ
乘客是消息,火车站进站安检口相当于交换器,负责接收乘客,并告诉乘客自己应该到哪个检票口等候
检票口排队的乘客相当于队列,乘坐同一班次列车的乘客在同一个检票口检票进站
乘客的手上的火车票相当于绑定(路由键),用来告诉乘客该坐哪一班次的列车,该从哪一个检票口进站。
火车车厢里的每一个座位就相当于消费者,一个座位只能给一个乘客坐
队列
什么是队列?
队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器。消息就一直在里面,
直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止
队列相关命令:
basic.consume:持续订阅消息,将信道设置为接受模式。消息一到达队列,消费者就接收消息,可以持续接收多条消息
basic.get:订阅单条消息,接收消息后,取消订阅。这样做是为了让消费者只接收下一条消息,如果想获得更多消息,需要再次发送basic.get。
basic.ack:消费者接收消息后,向RabbitMQ发送确认,表示已经收到消息
auto_ack:消费者在订阅队列是如果设置为auto_ack,表示消费者一旦接收消息后,RabbitMQ自动默认为消费者确认收到了消息。
basic.reject:消费者拒绝接收MQ发送过来的消息。RabbitMQ2.0版本后新增的命令
requeue:
true:如果requeue参数设置为true,则RabbitMQ将消息重新发送给下一位消费者
false:如果设置为false,则RabbitMQ将消息从队列移除
queue.declare:创建队列,如果不指定队列名称,则Rabbit会分配一个随机名称,并在命令的响应中返回
exclusive:如果设置为true,列队变为私有的,即一个队列只有一个消费者
aotu-delete:当最后一个消费者取消订阅后,队列会自动移除。如果需要临时创建一个队列,可以结合exclusive使用。消费者断开连接时,队列自动移除
队列工作原理
多个消费者订阅同一队列时,怎么发送消息?
消费者A和B同时订阅了seed_bin队列
1、消息Message_A到达队列seed_bin
2、RabbitMQ把Message_A发送给A
3、A确认接收到了消息Mesage_A
4、RabbitMQ将消息Message_A从队列seed_bin删除
5、消息Message_B到达队列seed_bin
6、RabbitMQ把Message_B发送给B
7、B确认接收到了消息Mesage_B
8、RabbitMQ将消息Message_B从队列seed_bin删除
注意:
如果消费者收到一条消息后,从RabbiMQ队列断开了(或者说取消了订阅),一直没有发送确认。那么MQ认为这条消息没有发送,
然后会重新将该消息发送给下一位消费者。这样做的好处是如果接收消息的程序出现问题,或者有bug,导致没有发送确认,
这样RabbitMQ不会再给该应用发送消息。因为RabbitMQ会认为你的程序还没有准备好接收下一条消息,这样可以防止消息
源源不断的涌向你的应用,到导致过载。
交换器
什么是交换器?
交换器是服务器接收到生产者发送过来的消息后,根据不同的规则,将消息投递到不同的队列的工具。这些规则被称为路由键。
队列是通过路由键绑定到交换器的。当生产者将消息发给RabbitMQ代理服务器时,消息将拥有一个路由键,
注: Rabbit会将其与绑定的路由键践行匹配,如果匹配则投递到该队列,如果匹配不到任何绑定模式,则消息进入黑洞同时rabbitMQ管理端Queues队列中对应的State是空闲状态如下图。
![在这里插入图片描述](https://img-blog.csdnimg.cn/20190827233054908.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwMjUwMTIy,size_16,color_FFFFFF,t_70)
交换器的类型
direct:如果路由键匹配,则直接投递到对应的队列
fanout:不处理路由键,向所有与之绑定的队列投递消息
topic:处理路由键,按模式匹配,向符合规则的队列投递消息
headers:允许匹配消息的header,而非路由键,除此之外,direct完全一致,但性能差很多,基本不用了。
direct:
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,
也不会转发dog.guard,只会转发dog。
fanout:
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与
该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。
Fanout交换机转发消息是最快的。
topic:
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。请参考下图
![在这里插入图片描述](https://img-blog.csdnimg.cn/20190827103028882.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwMjUwMTIy,size_16,color_FFFFFF,t_70)
代码体现 ( 采用 DirectExchange 交换机模式 一对一)
注:DirectExchange 实践中此模式启动多个项目则会( 服务轮询策略 )起到均衡负载效果
生产者配置
/*
* Copyright 2019 Wicrenet, Inc. All rights reserved.
*/
package com.xy.pay.main.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
/**
* rabbitMq 配置
*
* @author YJX
* Created on 2019-04-30 14:25
*/
@Configuration
@EnableRabbit
public class RabbitMQConfigurer {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
//必须是prototype类型
return new RabbitTemplate(this.connectionFactory);
}
/**
* 【对账异常事件】交换机
*
* @author YJX
* @date 2019-04-30 15:05
**/
@Bean
@Qualifier("noticeReconciliation")
public DirectExchange noticeReconciliationExchange() {
return new DirectExchange("noticeReconciliation", true, false);
}
}
生产者发送消息
/*
* Copyright 2019 Wicrenet, Inc. All rights reserved.
*/
package com.xy.pay.main.amqp;
import com.alibaba.fastjson.JSON;
import com.xy.pay.main.PayMainLogs;
import com.xy.pay.main.amqp.model.ReconciliationPubData;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* 利用RabbitMQ 对账异常通知
*
* @author YJX
* Created on 2019-04-30 15:41
*/
@Component
@Primary
@ConditionalOnProperty(name = "pay.reconciliation-notice.channel", havingValue = "rabbit", matchIfMissing = true)
public class RabbitReconciliationNoticePub extends AbsReconciliationNoticePub implements RabbitTemplate.ConfirmCallback {
private static final AtomicLong COUNTER = new AtomicLong(1L);
private static final Set<Long> SEEDS = new HashSet<>();
private final RabbitTemplate rabbitTemplate;
@Autowired
@Qualifier("noticeReconciliation")
private DirectExchange noticeReconciliation;
@Autowired
public RabbitReconciliationNoticePub(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
Long correlationId = Long.valueOf(correlationData.getId());
if (ack) {
//消息已经被队列接收了,rabbitMQ内部没有发生异常
SEEDS.remove(correlationId);
return;
}
if (SEEDS.contains(correlationId)) {
// 如果丢失可以重发 2019/4/25 15:14
PayMainLogs.MQ_LOG.error("【对账异常事件】rabbitMQ消息丢失:{}", StringUtils.abbreviate(cause, 200));
} else {
PayMainLogs.MQ_LOG.warn("【对账异常事件】:{}", StringUtils.abbreviate(cause, 200));
}
}
/*
* 【执行发布】
*
* @author YJX
* @date 2019/04/30 22:01
**/
@Override
void sendMsg(ReconciliationPubData notice) {
final CorrelationData correlationId = new CorrelationData(getCorrelationId());
//发送消息
this.rabbitTemplate.convertAndSend(this.noticeReconciliation.getName(),
"noticeReconciliation",
JSON.toJSONString(notice), correlationId);
}
private synchronized String getCorrelationId() {
long seed = COUNTER.getAndIncrement();
SEEDS.add(seed);
if (SEEDS.size() > 30000) {
PayMainLogs.MQ_LOG.error("【对账异常事件】事件已经累计超过{}未处理了", SEEDS.size());
//如果队列长于30000,要修剪了。移除最晚的10000条记录
SEEDS.removeAll(SEEDS.stream()
.sorted(Long::compareTo)
.limit(10000)
.collect(Collectors.toList()));
}
return String.valueOf(seed);
}
}
消费者配置
package com.xy.pay.risk.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
/**
* rabbitMq 配置
*
* @author Canaan
* @date 2019/4/24 10:46
*/
@Configuration
@EnableRabbit
public class RabbitMQConfigurer {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
//必须是prototype类型
return new RabbitTemplate(this.connectionFactory);
}
/**
* 【交易事件】交换机
*
* @author Canaan
* @date 2019/4/24 21:04
*/
@Bean
@Qualifier("tradeEventExchange")
public DirectExchange tradeEventExchange() {
return new DirectExchange("tradeEvent", true, false);
}
@Bean
public Queue tradeEventQueue() {
return new Queue("tradeEvent");
}
@Bean
public Binding tradeEventQueueBinding() {
return BindingBuilder.bind(tradeEventQueue()).to(tradeEventExchange()).with("tradeEvent");
}
/**
* 【对账异常事件】交换机
*
* @author YJX
* @date 2019-04-30 15:05
**/
@Bean
@Qualifier("noticeReconciliationExchange")
public DirectExchange noticeReconciliationExchange() {
return new DirectExchange("noticeReconciliation", true, false);
}
@Bean
public Queue noticeReconciliationQueue() {
return new Queue("noticeReconciliation");
}
@Bean
public Binding noticeReconciliationBinding() {
return BindingBuilder.bind(noticeReconciliationQueue()).to(noticeReconciliationExchange()).with("noticeReconciliation");
}
}
消费者消费队列里的消息
/*
* Copyright 2019 Wicrenet, Inc. All rights reserved.
*/
package com.xy.pay.risk.amqp;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.xy.common.verify.Asserts;
import com.xy.pay.dao.risk.BriefNoticeDao;
import com.xy.pay.dao.risk.BriefNoticeOwnerDao;
import com.xy.pay.dao.risk.ReconciliationNoticeDao;
import com.xy.pay.dao.risk.dict.NoticeStatusEnum;
import com.xy.pay.dao.risk.dict.NoticeTypeEnum;
import com.xy.pay.dao.risk.entity.BriefNotice;
import com.xy.pay.dao.risk.entity.BriefNoticeOwner;
import com.xy.pay.dao.risk.entity.ReconciliationNotice;
import com.xy.pay.risk.amqp.model.ReconciliationSubData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* 对账异常消息订阅
*
* @author YJX
* Created on 2019/04/30 23:07
*/
@Component
@RabbitListener(queues = "#{noticeReconciliationQueue}")
public class RabbitReconciliationNoticeSub {
private static final Logger logger = LoggerFactory.getLogger(RabbitReconciliationNoticeSub.class);
@Autowired
private BriefNoticeDao briefNoticeDao;
@Autowired
private BriefNoticeOwnerDao noticeOwnerDao;
@Autowired
private ReconciliationNoticeDao reconciliationNoticeDao;
@RabbitHandler
public void process(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
ReconciliationSubData subData;
try {
subData = JSON.parseObject(payload, ReconciliationSubData.class);
this.persistent(subData);
} catch (Exception e) {
logger.error("对账异常消息持久化失败:{},{}", payload, e);
}
}
private void persistent(ReconciliationSubData data) {
Asserts.notNull(data);
BriefNotice briefNotice = briefNoticeDao.selectByNoticeKey(data.getNoticeKey());
//消息重复
if (briefNotice != null) {
return;
}
/*添加消息摘要*/
briefNotice = new BriefNotice();
briefNotice.setTitle(data.getTitle());
briefNotice.setNoticeStatus(NoticeStatusEnum.NEW);
briefNotice.setNoticeKey(data.getNoticeKey());
briefNotice.setPower(data.getPower());
briefNotice.setNoticeType(NoticeTypeEnum.RECONCILIATION);
if (data.getDiffType() != null) {
briefNotice.setSubType(data.getDiffType().toString());
}
int effectRow = this.briefNoticeDao.insert(briefNotice);
if (effectRow == 0) {
logger.error("对账异常消息持久化失败");
return;
}
/*消息归属添加*/
BriefNoticeOwner briefNoticeOwner = new BriefNoticeOwner();
briefNoticeOwner.setNotice(briefNotice);
briefNoticeOwner.setBrandId(data.getBrandId());
briefNoticeOwner.setPointId(data.getStoreId());
briefNoticeOwner.setRole(32);
briefNoticeOwner.setPower(data.getPower());
this.noticeOwnerDao.insert(briefNoticeOwner);
/*添加对账详情消息*/
ReconciliationNotice reconciliationNotice = new ReconciliationNotice();
reconciliationNotice.setDiffType(data.getDiffType());
reconciliationNotice.setLocalTradeInfo(data.getLocalTradeInfo());
reconciliationNotice.setReconciliationTime(data.getReconciliationTime());
reconciliationNotice.setStoreId(data.getStoreId());
reconciliationNotice.setNotice(briefNotice);
reconciliationNotice.setProvider(data.getProvider());
reconciliationNotice.setRemoteTradeInfo(data.getRemoteTradeInfo());
reconciliationNotice.setNoticeKey(data.getNoticeKey());
this.reconciliationNoticeDao.insert(reconciliationNotice);
}
}
TopicExchange 模式 一对多 ( 该模式为通知效果 )
生产者
package com.xy.pay.api.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
/**
* rabbitMq 配置
*
* @author Canaan
* @date 2019/4/24 10:46
*/
@Configuration
@EnableRabbit
public class RabbitMQConfigurer {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
//必须是prototype类型
return new RabbitTemplate(this.connectionFactory);
}
/**
* 【交易事件】交换机
*
* @author Canaan
* @date 2019/4/24 21:04
*/
@Bean
@Qualifier("tradeEventExchange")
public DirectExchange tradeEventExchange() {
return new DirectExchange("tradeEvent", true, false);
}
/**
* 【交易通知】交换机,当用户调用接口时带有【notifyUrl】 推送地址时,进行异步推送
*
* @author Canaan
* @date 2019/4/24 21:07
*/
@Bean
@Qualifier("tradeNotifyExchange")
public TopicExchange tradeNotifyExchange() {
return new TopicExchange("tradeNotify", true, false);
}
}
生产者发消息
package com.xy.pay.api.amqp;
import com.alibaba.fastjson.JSON;
import com.xy.common.verify.Asserts;
import com.xy.pay.api.PayApiLogs;
import com.xy.pay.api.amqp.notify.TradeNotify;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
/**
* rabbit 交易通知
*
* @author Canaan
* @date 2019/4/26 11:30
*/
@Component
public class RabbitTradeNotifyPub implements TradeNotifyPubEngine, RabbitTemplate.ConfirmCallback {
private static final Map<String, TradeNotify> POOL = new ConcurrentHashMap<>(1000);
private final RabbitTemplate rabbitTemplate;
@Autowired
@Qualifier("tradeNotifyExchange")
private TopicExchange tradeNotifyExchange;
@Autowired
@Qualifier("notifyExecutor")
private Executor notifyExecutor;
@Autowired
private Environment environment;
@Autowired
public RabbitTradeNotifyPub(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
/**
* 发布通知
*
* @param notify
*/
@Override
public void pub(final TradeNotify notify) {
Asserts.notNull(notify);
if (environment.acceptsProfiles("dev", "test")) {
notify.verifyThrow();
}
Asserts.notNull(notify.getJointProvider());
String routeKey = notify.getAct().routeKey.toString() + "." + notify.getJointProvider();
POOL.putIfAbsent(notify.getNotifyId(), notify);
//异步发送
try {
this.notifyExecutor.execute(() -> {
this.clipPool(10000); //通知池里的个数最多10000个
this.rabbitTemplate.convertAndSend(
this.tradeNotifyExchange.getName(),
routeKey,
JSON.toJSONString(notify),
new CorrelationData(notify.getNotifyId()));
});
} catch (Exception e) {
PayApiLogs.MQ_LOG.error("【交易通知发布】发送失败", StringUtils.abbreviate(e.getMessage(), 100));
}
}
/**
* 根据函数计算后返回要发布的内容
*
* @param supplier
* @author Canaan
* @date 2019/5/21 20:06
*/
@Override
public void computePub(Supplier<TradeNotify> supplier) {
Asserts.notNull(supplier);
try {
this.notifyExecutor.execute(() -> {
TradeNotify notify = supplier.get();
if (notify == null) {
return;
}
if (environment.acceptsProfiles("dev", "test")) {
notify.verifyThrow();
}
Asserts.notNull(notify.getJointProvider());
String routeKey = notify.getAct().routeKey.toString() + "." + notify.getJointProvider();
POOL.putIfAbsent(notify.getNotifyId(), notify);
this.clipPool(10000); //通知池里的个数最多10000个
this.rabbitTemplate.convertAndSend(
this.tradeNotifyExchange.getName(),
routeKey,
JSON.toJSONString(notify),
new CorrelationData(notify.getNotifyId()));
});
} catch (Exception e) {
PayApiLogs.MQ_LOG.error("【交易通知发布】发送失败", StringUtils.abbreviate(e.getMessage(), 100));
}
}
/**
* 修剪池。防止过多
*
* @author Canaan
* @date 2019/4/26 16:24
*/
private void clipPool(int maxSize) {
if (POOL.size() < maxSize) {
return;
}
PayApiLogs.MQ_LOG.error("【交易通知发布】 POOL 数据超过{}个", POOL.size());
//删除第1000个开始的数据,共1000个
POOL.keySet().stream()
.skip(1000)
.limit(1000)
.forEach(POOL::remove);
}
/**
* Confirmation callback.
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String noticeId = correlationData.getId();
if (ack) {
//消息已经被队列接收了,rabbitMQ内部没有发生异常
POOL.remove(noticeId);
return;
}
if (POOL.containsKey(noticeId)) {
this.pub(POOL.get(noticeId)); //重新发送
PayApiLogs.MQ_LOG.warn("【交易通知发布】消息重新发送:{}", StringUtils.abbreviate(cause, 200));
} else {
PayApiLogs.MQ_LOG.warn("【交易通知发布】可能丢失-{}", StringUtils.abbreviate(cause, 200));
}
}
}
消费者配置
/*
* Copyright 2019 Wicrenet, Inc. All rights reserved.
*/
package com.xy.pay.transfer.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.core.env.Environment;
import java.util.Objects;
/**
* rabbitMq 配置
*
* @author YJX
* Created on 2019-04-30 14:25
*/
@Configuration
@EnableRabbit
public class RabbitMQConfigurer {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private Environment environment;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
//必须是prototype类型
return new RabbitTemplate(this.connectionFactory);
}
/**
* 【交易通知】交换机,当用户调用接口时带有【notifyUrl】 推送地址时,进行异步推送
*
* @author Canaan
* @date 2019/4/24 21:07
*/
@Bean
@Qualifier("tradeNotifyExchange")
public TopicExchange tradeNotifyExchange() {
return new TopicExchange("tradeNotify", true, false);
}
@Bean
public Queue tradeNotifyQueue() {
String profiles = environment.getActiveProfiles()[0];
boolean isPro = Objects.equals("pro", profiles.toLowerCase());
return new Queue("tradeNotify-" + profiles, isPro, false, !isPro);
}
@Bean
public Binding tradeEventQueueBinding() {
return BindingBuilder.bind(tradeNotifyQueue()).to(tradeNotifyExchange()).with("tradeNotify.#");
}
/**
* 交易通知广播
*
* @author Canaan
* @date 2019/5/6 16:16
*/
@Bean
@Qualifier("transferTradeExchange")
public DirectExchange transferTradeExchange() {
return new DirectExchange("transferTrade", true, false);
}
}
消费者消费队列消息
package com.xy.pay.api.amqp;
import com.alibaba.fastjson.JSON;
import com.xy.common.verify.Asserts;
import com.xy.pay.api.PayApiLogs;
import com.xy.pay.api.amqp.notify.TradeNotify;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
/**
* rabbit 交易通知
*
* @author Canaan
* @date 2019/4/26 11:30
*/
@Component
public class RabbitTradeNotifyPub implements TradeNotifyPubEngine, RabbitTemplate.ConfirmCallback {
private static final Map<String, TradeNotify> POOL = new ConcurrentHashMap<>(1000);
private final RabbitTemplate rabbitTemplate;
@Autowired
@Qualifier("tradeNotifyExchange")
private TopicExchange tradeNotifyExchange;
@Autowired
@Qualifier("notifyExecutor")
private Executor notifyExecutor;
@Autowired
private Environment environment;
@Autowired
public RabbitTradeNotifyPub(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
/**
* 发布通知
*
* @param notify
*/
@Override
public void pub(final TradeNotify notify) {
Asserts.notNull(notify);
if (environment.acceptsProfiles("dev", "test")) {
notify.verifyThrow();
}
Asserts.notNull(notify.getJointProvider());
String routeKey = notify.getAct().routeKey.toString() + "." + notify.getJointProvider();
POOL.putIfAbsent(notify.getNotifyId(), notify);
//异步发送
try {
this.notifyExecutor.execute(() -> {
this.clipPool(10000); //通知池里的个数最多10000个
this.rabbitTemplate.convertAndSend(
this.tradeNotifyExchange.getName(),
routeKey,
JSON.toJSONString(notify),
new CorrelationData(notify.getNotifyId()));
});
} catch (Exception e) {
PayApiLogs.MQ_LOG.error("【交易通知发布】发送失败", StringUtils.abbreviate(e.getMessage(), 100));
}
}
/**
* 根据函数计算后返回要发布的内容
*
* @param supplier
* @author Canaan
* @date 2019/5/21 20:06
*/
@Override
public void computePub(Supplier<TradeNotify> supplier) {
Asserts.notNull(supplier);
try {
this.notifyExecutor.execute(() -> {
TradeNotify notify = supplier.get();
if (notify == null) {
return;
}
if (environment.acceptsProfiles("dev", "test")) {
notify.verifyThrow();
}
Asserts.notNull(notify.getJointProvider());
String routeKey = notify.getAct().routeKey.toString() + "." + notify.getJointProvider();
POOL.putIfAbsent(notify.getNotifyId(), notify);
this.clipPool(10000); //通知池里的个数最多10000个
this.rabbitTemplate.convertAndSend(
this.tradeNotifyExchange.getName(),
routeKey,
JSON.toJSONString(notify),
new CorrelationData(notify.getNotifyId()));
});
} catch (Exception e) {
PayApiLogs.MQ_LOG.error("【交易通知发布】发送失败", StringUtils.abbreviate(e.getMessage(), 100));
}
}
/**
* 修剪池。防止过多
*
* @author Canaan
* @date 2019/4/26 16:24
*/
private void clipPool(int maxSize) {
if (POOL.size() < maxSize) {
return;
}
PayApiLogs.MQ_LOG.error("【交易通知发布】 POOL 数据超过{}个", POOL.size());
//删除第1000个开始的数据,共1000个
POOL.keySet().stream()
.skip(1000)
.limit(1000)
.forEach(POOL::remove);
}
/**
* Confirmation callback.
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String noticeId = correlationData.getId();
if (ack) {
//消息已经被队列接收了,rabbitMQ内部没有发生异常
POOL.remove(noticeId);
return;
}
if (POOL.containsKey(noticeId)) {
this.pub(POOL.get(noticeId)); //重新发送
PayApiLogs.MQ_LOG.warn("【交易通知发布】消息重新发送:{}", StringUtils.abbreviate(cause, 200));
} else {
PayApiLogs.MQ_LOG.warn("【交易通知发布】可能丢失-{}", StringUtils.abbreviate(cause, 200));
}
}
}