发送消息到mq 流程
- 用户下订单创建订单信息,且创建一条订单冗余信息 status 为 0
- 发送订单信息到mq , 使用ack 消息确认机制,确认消息发送成功修改订单状态为 1(表示消息已发送)
- 启动一个定时任务 排查 订单状态为 0 的订单 发送消息到mq ack 确认修改状态status 为1(同上)
- 以上来确保消息成功发送给mq
接收mq消息(消费)
- 接收消息业务正常执行完成手动确认消息
- 抛出异常 使用 死信队列 将消息交给 死信队列完成
- 配置死信队列 将业务接收消息的queue 与死信队列 绑定
-
死信队列 在执行一次业务代码,如果抛出异常,则发送短信,或者写入数据库,或人工干预
代码示例
service 1 发送消息
配置信息
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/distribution
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=root
spring.rabbitmq.port=5672
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
# 开启手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 重试时间
spring.rabbitmq.listener.simple.retry.initial-interval=2000ms
server.port=9001
配置基础的rabbitmq
@Configuration
public class RabbitMQConfig {
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String TOPIC_EXCHANGE = "topic.exchange";
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Autowired
private ConnectionFactory connectionFactory;
@Bean(name = "connectionFactory")
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
bean
/**
* 订单数据
*/
@Data
public class OrderInfo {
private String orderId;
private String userId;
private String orderContent;
private String createTime;
}
// 订单冗余表
@Data
public class OrderInfoMessage {
private String orderId;
private int status; // 0 表示为发送消息到mq 1 表示消息已发送到mq
private String order_content;
private String unique_id;
}
发送消息 到mq
这里 创建订单表数据 ,创建订单冗余表数据 默认 status 0 表示为未发送消息到 mq
@Service
public class MqOrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private OrderInfoMessageMapper orderInfoMessageMapperl;
@Autowired
private OrderMqService orderMqService;
public void mqSaveOrder(OrderInfo orderInfo){
// 订单信息-- 插入订单系统 ,订单事务数据库
saveOrderMessage(orderInfo);
// 通过http 接口发送订单到运单系统 mq 发送
orderMqService.sendMessage(orderInfo);
}
/**
* 事务数据库
* @param orderInfo
*/
private void saveOrderMessage(OrderInfo orderInfo){
// 插入数据到订单表
orderMapper.insert(orderInfo);
// 插入数据到订单事务表
OrderInfoMessage orderInfoMessage = new OrderInfoMessage();
orderInfoMessage.setOrder_content(orderInfo.getOrderContent());
orderInfoMessage.setStatus(0);
orderInfoMessage.setUnique_id(orderInfo.getUserId());
orderInfoMessage.setOrderId(orderInfo.getOrderId());
orderInfoMessageMapperl.insert(orderInfoMessage);
}
}
发送消息到mq 使用 ack 确认机制 修改status 为 1 表示 消息已发送到 mq
如果抛出异常 等待 定时任务发送消息到mq
@Service
public class OrderMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderInfoMessageMapper orderInfoMessageMapperl;
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* PostConstruct 不是spring 提供的 是java 提供
* 该注解用来修饰一个非静态的 void 方法被次注解修饰的方法会在服务器加载 servlet的时候执行
* 并且只会执行一次 PostConstruct在 构造函数之后执行在 init 之前执行
*/
@PostConstruct
public void regCaliback() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("cause " + cause);
// 如果 ack 为true 表示已收到消息
String orderId = correlationData.getId();
if (!ack) {
// 这里需要进行其他方式存储
System.out.println("MQ 队列响应失败 orderId 为 " + orderId);
return;
}
int update = jdbcTemplate.update("update order_info_message set status = 1 where order_id =?", orderId);
// 使用mybatisplus 修改失败报错 HikariDataSource HikariDataSource (HikariPool-1) has been closed.
// OrderInfoMessage orderInfoMessage = new OrderInfoMessage();
// orderInfoMessage.setStatus(1);
// QueryWrapper<OrderInfoMessage> queryWrapper = new QueryWrapper();
// queryWrapper.lambda().eq(OrderInfoMessage::getOrderId,orderId);
// int update = orderInfoMessageMapperl.update(orderInfoMessage, queryWrapper);
if (update == 1) {
System.out.println("本地消息状态修改成功,消息成功投递到消息队列中 ");
} else {
System.out.println("本地消息状态修改失败,出现异常 " + 88888888);
}
}
});
}
@PostConstruct
public void test() {
System.out.println("-----------PostConstruct执行-----------");
}
public void sendMessage(OrderInfo orderInfo) {
// 通过http 接口发送订单到运单系统 mq 发送
CorrelationData correlationData = new CorrelationData();
correlationData.setId(orderInfo.getOrderId());
rabbitTemplate.convertAndSend("order_fanout_exchange", "", JSONObject.toJSONString(orderInfo), correlationData);
}
}
这里我有时候会报错 HikariDataSource HikariDataSource (HikariPool-1) has been closed. 获取不到数据库连接 不太明白,有大佬可以留言帮忙解决一下.
定时任务处理 status 为0 的 发送消息到mq
@EnableScheduling
@Component
public class RabbitmqTask {
@Autowired
private OrderInfoMessageMapper orderInfoMessageMapper;
@Autowired
private OrderMqService orderMqService;
@Autowired
private OrderMapper orderMapper;
@Scheduled(cron = "0 0/1 * * * ?")
public void rabbitmqTask() {
// 获取所有的 status 为 0的 数据重新发送到mq
QueryWrapper<OrderInfoMessage> queryWrapper = new QueryWrapper<>();
queryWrapper.lambda().eq(OrderInfoMessage::getStatus,0);
List<OrderInfoMessage> orderInfoMessageList = orderInfoMessageMapper.selectList(queryWrapper);
for (OrderInfoMessage orderInfoMessage : orderInfoMessageList) {
if (Objects.nonNull(orderInfoMessage)){
String orderId = orderInfoMessage.getOrderId();
System.out.println("task 定時任務 order 为 "+ orderId);
OrderInfo orderInfo = orderMapper.selectById(orderId);
orderMqService.sendMessage(orderInfo);
}
}
}
}
以上发送消息就完成了
接收消息代码 --> 从mq 消费消息
声明死信队列 且将 order.queue 队列与之绑定
@Configuration
public class RabbitMQConfiguration {
// 死信队列
// 声明注册 direct 模式交换机
@Bean
public DirectExchange deadExchange(){
return new DirectExchange("dead_direct_exchange",true,false);
}
@Bean
public Queue deadqueue(){
HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl",5000);
return new Queue("dead.direct.queue",true);
}
@Bean
public Binding deadbind(){
return BindingBuilder.bind(deadqueue()).to(deadExchange()).with("dead");
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("order_fanout_exchange",true,false);
}
@Bean
public Queue orderQueue(){
HashMap<String, Object> args = new HashMap<>();
// 绑定死信队列 交换机
args.put("x-dead-letter-exchange","dead_direct_exchange");
args.put("x-dead-letter-routing-key","dead"); // fanout 不需要配置 路由key
return new Queue("order.queue",true,false,false,args);
}
@Bean
public Binding bindorder(){
return BindingBuilder.bind(orderQueue()).to(fanoutExchange());
}
}
消费者消费消息–>
order.queue 这个队列是监听这 正常消费消息
dead.direct.queue 这个队列是死信队列 当order.queue抛出异常之后会将消息发送到这个队列中来.
/**
* 消费者
*/
@Service
public class OrderMqConsumer {
@Autowired
private DistributionCenterService distributionCenterService;
private int count = 1;
/**
* 解决消息重试方法
* 1. 控制重发的次数 ( 如果使用了 try catch 配置的重试次数会失效 互逆操作)
* 2. try catch + 手动ack
* 3. try catch + 手动ack + 死信队列 + 加上(完美)
*/
@RabbitListener(queues = {"order.queue"})
public void messageConsumer(String ordermsg, Channel channel,
CorrelationData correlationData,
@Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws Exception {
try {
// 1. 获取消息队列中的消息
System.out.println("收到mq 的消息 是 "+ordermsg+" count "+count++);
// 2. 获取订单信息
int a= 0/0;
OrderInfo orderInfo = JSONObject.parseObject(ordermsg).toJavaObject(OrderInfo.class);
// 3.获取订单id
String orderId = orderInfo.getOrderId();
// 4.派单
distributionCenterService.dispatch(orderId);
// 手动确认消息 正常情况
channel.basicAck(tag,false);
}catch (Exception e){
/**
* 如果出现异常情况,根据实际情况去重发
* 参数 1 :消息的tag 参数 2 false 多条处理 参数 3 requeque 重发
* false 不会重发,会把消息打入死信队列
* true 的话会死循环重发,建议使用true 的话 不要使用 try catch 否则会出现死循环
*/
channel.basicNack(tag,false,false);
}
}
/**
* 死信队列
* @param ordermsg
* @param channel
* @param correlationData
* @param tag
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dead.direct.queue",durable = "true"),
exchange = @Exchange(name = "dead_direct_exchange",
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true"),
key = {"dead"}
))
public void deadMessageConsumer(String ordermsg, Channel channel,
CorrelationData correlationData,
@Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws Exception {
try {
// 1. 获取消息队列中的消息
System.out.println("收到mq 的消息 是 "+ordermsg+" count "+count++);
// 2. 获取订单信息
OrderInfo orderInfo = JSONObject.parseObject(ordermsg).toJavaObject(OrderInfo.class);
// 3.获取订单id
String orderId = orderInfo.getOrderId();
// 冪等问题
// 4.派单
distributionCenterService.dispatch(orderId);
// 手动确认消息 正常情况
channel.basicAck(tag,false);
}catch (Exception e){
// 异常情况 引入人工 发送短信,存储数据库
}
}
}
关于幂等性
- 我们可以考虑 使用数据库的主键唯一,或者使用分布式锁
基础的就完成了,大家觉得有改进的,或有问题的地方可以在评论区留言.
HikariDataSource HikariDataSource (HikariPool-1) has been closed. 这个有大佬帮忙解决一下么,想知道原因0.0