项目实战:基于Rabbitmq实现数据总线多队列串行运行模型(附完整源码)

2023-05-16

文章目录

  • 一,需求缘起
  • 二,总体思路
  • 三,核心源码讲解
    • 3.1,配置类:RabbitConfig
    • 3.2,消息发送类:MsgSender
    • 3.3,注解业务处理类:StandardRabbitHandler
    • 3.4,注解业务处理方法类:StandardRabbitListener
    • 3.5,消息消费分发处理基类:BaseMessageListener
  • 四,测试使用
    • 4.1,测试发送消息类:TestController
    • 4.2,测试消息消费类:TestListener
  • 五,总结

一,需求缘起

今天讲一讲之前在做支付系统时利用rabbitmq实现数据总线,来进行用户账户资金更新操作。我们知道在支付系统中最最重要的操作就是操作账户资金,比如用户进行充值,提现,或者系统进行对账,返现,返利等等,这些操作是不允许有一分一毫的误差的。在高并发的支付系统中,某一瞬间可能同时有很多线程对同一账户进行操作,有同学说这种情况可以对资金账户加线程锁,乐观锁或悲观锁等进行资源同步保护,但是在高并发的互联网系统中加锁并不是最优的解决方案,加锁意味着性能的降低(或多或少),下面介绍如何利用rabbitmq实现数据总线来进行资金账户操作。

二,总体思路

我们核心的目的是利用rabbitmq消息队列的串行特性,将前端涌入的并发请求转换成多维度的串行请求(这也是处理高并发请求的根本目的),如下图:

在这里插入图片描述

我们通过rabbitmq消息队列的串行特性,以用户为维度即一个用户使用一个线程进行账户操作,实现多管道串行运行的模型。

项目结构图如下:
在这里插入图片描述

三,核心源码讲解

3.1,配置类:RabbitConfig

package com.standard.databus.config;
import com.google.common.collect.Lists;
import com.standard.commonutil.util.AnnotationScanHelper;
import com.standard.databus.annotation.StandardRabbitHandler;
import com.standard.databus.annotation.StandardRabbitListener;
import com.standard.databus.constant.RabbitConstant;
import com.standard.databus.frame.BaseMessageListener;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;

/**
 * rabbit配置中心
 *
 * @author weiyuan
 * @version 1.0
 */
@Configuration
public class RabbitConfig {

    private final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);

    @Value("${rabbitmq.host}")
    private String host;

    @Value("${rabbitmq.port}")
    private Integer port;

    @Value("${rabbitmq.username}")
    private String username;

    @Value("${rabbitmq.password}")
    private String password;

    @Value("${rabbitmq.connectionCacheSize}")
    private Integer connectionCacheSize;

    @Value("${rabbitmq.connectionLimit}")
    private Integer connectionLimit;

    @Value("${rabbitmq.channelCacheSize}")
    private Integer channelCacheSize;

    @Value("${rabbitmq.connectionTimeout}")
    private Integer connectionTimeout;

    @Value("${rabbitmq.channelCheckoutTimeout}")
    private Long channelCheckoutTimeout;

    @Value("${rabbitmq.messageAutoAck}")
    private Boolean messageAutoAck;

    @Value("${rabbitmq.virtualHost}")
    private String virtualHost;

    @Autowired
    private BaseMessageListener baseMessageListener;

    /**
     * 配置连接
     *
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
        connectionFactory.setConnectionCacheSize(connectionCacheSize);
        connectionFactory.setConnectionLimit(connectionLimit);
        connectionFactory.setChannelCacheSize(channelCacheSize);
        connectionFactory.setConnectionTimeout(connectionTimeout);
        connectionFactory.setChannelCheckoutTimeout(channelCheckoutTimeout);
        //开启confirmCallback 确认模式
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        //开启returnCallback模式
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    /**
     * 配置消息监听容器
     *
     * @return
     */
    @Bean
    @ConditionalOnClass
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setAcknowledgeMode(messageAutoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
        return factory;
    }

    /**
     * 配置管理类
     *
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    /**
     * direct类型交换器声明
     *
     * @param rabbitAdmin
     * @return
     */
    @Bean
    DirectExchange directExchange(RabbitAdmin rabbitAdmin) {
        DirectExchange directExchange = new DirectExchange(RabbitConstant.DIRECT_EXCHANGE_NAME, true, false);
        rabbitAdmin.declareExchange(directExchange);
        return directExchange;
    }

    /**
     * topic类型交换器声明
     *
     * @param rabbitAdmin
     * @return
     */
    @Bean
    TopicExchange topicExchange(RabbitAdmin rabbitAdmin) {
        TopicExchange topicExchange = new TopicExchange(RabbitConstant.TOPIC_EXCHANGE_NAME, true, false);
        rabbitAdmin.declareExchange(topicExchange);
        return topicExchange;
    }

    /**
     * fanout类型交换器声明
     *
     * @param rabbitAdmin
     * @return
     */
    @Bean
    FanoutExchange fanoutExchange(RabbitAdmin rabbitAdmin) {
        FanoutExchange fanoutExchange = new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE_NAME, true, false);
        rabbitAdmin.declareExchange(fanoutExchange);
        return fanoutExchange;
    }


    /**
     * 配置RabbitTemplate
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        //使用jackson消息转换器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");
        //开启强制委托模式
        rabbitTemplate.setMandatory(true);
        //使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞
        rabbitTemplate.setUsePublisherConnection(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                String correlationId = message.getMessageProperties().getCorrelationId();
                logger.info("MQ消息ID[" + correlationId + "]投递到队列失败,应答码[" + replyCode + "],原因[" + replyText + "],交换机[" + exchange + "],路由键[" + routingKey + "]");
            }
        });
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (!ack) {
                    logger.info("MQ消息ID[" + correlationData.getId() + "]发送到路由器失败,原因: [" + cause.toString() + "]");
                } else {
                    logger.info("MQ消息ID[" + correlationData.getId() + "] 发送到路由器成功");
                }
            }
        });
        return rabbitTemplate;
    }


    /**
     * 配置消息监听
     *
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer container(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory, RabbitAdmin rabbitAdmin, DirectExchange directExchange) {
        List<String> queueList = Lists.newArrayList();
        SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
        container.setConcurrentConsumers(1);//设置消息串行分发
        container.setMaxConcurrentConsumers(1); //设置消息串行分发
        container.setPrefetchCount(3);
        container.setMessageListener(new MessageListenerAdapter(baseMessageListener));

        List<Class<?>> findAnnotationByType = AnnotationScanHelper.findAnnotationByType(StandardRabbitHandler.class);
        if (!CollectionUtils.isEmpty(findAnnotationByType)) {
            for (Class<?> clazz : findAnnotationByType) {
                Method[] declaredMethods = clazz.getDeclaredMethods();
                if (ArrayUtils.isNotEmpty(declaredMethods)) {
                    for (Method method : declaredMethods) {
                        StandardRabbitListener deamRabbitListenerAnnotation = method.getAnnotation(StandardRabbitListener.class);
                        if (deamRabbitListenerAnnotation != null) {
                            if (ArrayUtils.isNotEmpty(deamRabbitListenerAnnotation.routingKeys())) {
                                String[] routingKeys = deamRabbitListenerAnnotation.routingKeys();
                                queueList.addAll(Arrays.asList(routingKeys));
                                container.addQueueNames(routingKeys);
                            }
                        }
                    }
                }
            }
        }
        if (!CollectionUtils.isEmpty(queueList)) {
            for (String queueName : queueList) {
                Queue queue = new Queue(queueName, true, false, false);
                rabbitAdmin.declareQueue(queue);
                Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);
                rabbitAdmin.declareBinding(binding);
            }
        }
        return container;
    }

}

rabbitmq的连接池配置启动类,主要注意点如下:
在这里插入图片描述

3.2,消息发送类:MsgSender

package com.standard.databus.frame;

import com.standard.commonutil.util.UUIDGenerator;
import com.standard.databus.constant.RabbitConstant;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * MQ消息发送工具类
 *
 * @author weiyuan
 * @version 1.0
 */
@Component
public class MsgSender {

    private final Logger logger = LoggerFactory.getLogger(MsgSender.class);
    /**
     * 消息是否持久化,2持久化
     */
    @Value("${rabbitmq.messageDeliveryMode}")
    private Integer messageDeliveryMode;

    @Autowired
    RabbitAdmin rabbitAdmin;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private SimpleMessageListenerContainer simpleMessageListenerContainer;

    @Autowired
    private DirectExchange directExchange;

    @Autowired
    private TopicExchange topicExchange;

    /**
     * 发送direct模式消息
     *
     * @param msg
     * @param routingKey
     */
    public void sendDirectMsg(Object msg, String routingKey) {
        //判断simpleMessageListenerContainer 监听的队列是否存在
        if (!ArrayUtils.contains(simpleMessageListenerContainer.getQueueNames(), routingKey)) {
            Queue queue = new Queue(routingKey, true, false, false);
            rabbitAdmin.declareQueue(queue);
            Binding binding = BindingBuilder.bind(queue).to(directExchange).with(routingKey);
            rabbitAdmin.declareBinding(binding);
            simpleMessageListenerContainer.addQueueNames(routingKey);
        }
        //生成消息ID
        String msgId = UUIDGenerator.getUUID();
        rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE_NAME, routingKey, msg, message -> {
            message.getMessageProperties().setDeliveryMode(messageDeliveryMode == 2 ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
            return message;
        }, new CorrelationData(msgId));
        logger.info("BEGIN-MQ消息ID[" + msgId + "]已发送, 发送消息内容:" + msg.toString());
    }

    /**
     * TODO 暂不实现
     * 发送topic模式消息
     *
     * @param msg
     * @param routingKey
     */
    public void sendTopicMsg(Object msg, String routingKey) {
        if (!ArrayUtils.contains(simpleMessageListenerContainer.getQueueNames(), routingKey)) {
            Queue queue = new Queue(routingKey, true, false, false);
            rabbitAdmin.declareQueue(queue);
            Binding binding = BindingBuilder.bind(queue).to(topicExchange).with(routingKey);
            rabbitAdmin.declareBinding(binding);
            simpleMessageListenerContainer.addQueueNames(routingKey);
        }
        //生成消息ID
        String msgId = UUID.randomUUID().toString();
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_NAME, routingKey, msg, message -> {
            message.getMessageProperties().setDeliveryMode(messageDeliveryMode == 2 ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
            return message;
        }, new CorrelationData(msgId));
        logger.info("BEGIN-MQ消息ID[" + msgId + "]已发送");
    }

    /**
     * TODO 暂不实现
     * 发送fanout模式消息
     *
     * @param msg
     */
    public void sendFanoutMsg(String msg) {

    }
}

Rabbitmq消息发送工具类,也是采用动态声明队列,绑定,添加到监听容器。

3.3,注解业务处理类:StandardRabbitHandler

package com.standard.databus.annotation;

import org.springframework.stereotype.Component;

import java.lang.annotation.*;

/**
 * 注解MQ消息监听类
 *
 * @author weiyuan
 * @version 1.0
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Component
public @interface StandardRabbitHandler {

    String name() default "";
}

注解接收消息,处理业务逻辑的类

3.4,注解业务处理方法类:StandardRabbitListener

package com.standard.databus.annotation;


import com.standard.databus.en.ExchangeType;

import java.lang.annotation.*;

/**
 * 注解mq消息处理方法
 *
 * @author weiyuan
 * @version 1.0
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface StandardRabbitListener {

    String[] routingKeys() default {};//为空代表处理所有

    ExchangeType exchangeType() default ExchangeType.DIRECT;//交换器类型
}

注解接收消息处理业务逻辑的具体方法,参数为队列名称

3.5,消息消费分发处理基类:BaseMessageListener

package com.standard.databus.frame;

import com.google.common.collect.Maps;
import com.rabbitmq.client.Channel;
import com.standard.databus.entity.MessageInfo;
import com.standard.databus.entity.QueueInfo;
import com.standard.databus.service.MessageInfoService;
import com.standard.databus.service.QueueInfoService;
import com.standard.databus.service.ServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 消息消费基类
 *
 * @author weiyuan
 * @version 1.0
 */
@Component
public class BaseMessageListener implements ChannelAwareMessageListener {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    ConsumerHandler consumerHandler;

    @Autowired
    private ServiceFactory serviceFactory;

    private Map<String, ThreadPoolExecutor> threadPoolMap = Maps.newHashMap();

    @Override
    public void onMessage(Message message, Channel channel) {
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//确认收到消息

            String msgId = message.getMessageProperties().getHeaders().get("spring_returned_message_correlation").toString();
            String queueName = message.getMessageProperties().getConsumerQueue();
            logger.info("MQ-开始消费[" + queueName + "]队列的消息[IdGenerator=" + msgId + "],当前线程ID[" + Thread.currentThread().getId() + "]");

            MessageInfo messageInfo = serviceFactory.getMessageInfoService().findInfoById(msgId);
            if (messageInfo != null) {
                if (messageInfo.getStatus() == -1) {// 消息存在,并且状态是失败,代表消息处理失败重发处理
                    serviceFactory.getMessageInfoService().updateInfoToHandling(messageInfo.getMsgId()); // 处理失败则重新处理
                } else {// 状态不是失败,则立即返回
                    logger.info("消息[id=" + msgId + "]已经被消费");
                    return;
                }
            } else {
                serviceFactory.getMessageInfoService().createInfo(msgId, message, queueName);
            }
            QueueInfo queueInfo = serviceFactory.getQueueInfoService().findByQueueName(queueName);
            if (queueInfo == null) {
                serviceFactory.getQueueInfoService().createInfo(queueName);
            }
            ThreadPoolExecutor threadPool = threadPoolMap.get(queueName);
            if (threadPool == null) {
                //线程池设置核心线程数为1,最大运行线程数也为1,保证线程池只有一个线程,这是保证串行运行的关键,并且允许有100个等待队列
                threadPool = new ThreadPoolExecutor(1, 1, 3,
                        TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.DiscardOldestPolicy());
                threadPoolMap.put(queueName, threadPool);
            }
            threadPool.execute(new ConsumerMsgTask(this.consumerHandler, message, msgId));

        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        } finally {

        }
    }
}

消费消息的基类,进行消息的确认,分发,以及数据模型多队列串行的构建,要点如下:
在这里插入图片描述

四,测试使用

4.1,测试发送消息类:TestController

package com.standard.databus.controller;

import com.standard.databus.util.RabbitUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * 测试发送消息
 *
 * @author weiyuan
 * @version 1.0
 */
@RestController
public class TestController {

    @Autowired
    RabbitUtil rabbitUtil;

    @RequestMapping(value = "/test",method = RequestMethod.GET)
    public String test() {
        for (int i = 0; i < 50; i++) {
            rabbitUtil.sendDirectMsg("myqueue-1", "队列myqueue-1,发送消息:hllo wrold" + i);//发送消息
        }
        for (int i = 0; i < 50; i++) {
            rabbitUtil.sendDirectMsg("myqueue-2", "队列myqueue-2 ,发送消息:hllo wrold" + i);//发送消息
        }
        return "";
    }
}

测试分别同时向两个队列 myqueue-1, myqueue-2发送100条消息。

4.2,测试消息消费类:TestListener

package com.standard.databus.listener;

import com.standard.databus.annotation.StandardRabbitHandler;
import com.standard.databus.annotation.StandardRabbitListener;
import org.springframework.amqp.core.Message;

/**
 * 测试监听消息
 *
 * @author weiyuan
 * @version 1.0
 */
@StandardRabbitHandler
public class TestListener {

    @StandardRabbitListener(routingKeys = "myqueue-1")
    public void testListener1(Message message) throws Exception{
        String msg = new String(message.getBody());
        System.out.println("*********************队列【myqueue-1】,接收到消息: " + msg);
        Thread.sleep(99999999999l);

    }

    @StandardRabbitListener(routingKeys = "myqueue-2")
    public void testListener2(Message message) {
        String msg = new String(message.getBody());
        System.out.println("队列【myqueue-2】接收到消息: " + msg);
    }
}

定义两个监听队列方法,分别消费myqueue-1,myqueue-2的消息。其中testListener1方法中,消费了一条消息后就让线程sleep, 测试影不影响myqueue-2队列的消息消费。

好了,启动项目,Run DataBusApplication,

浏览器中访问:http://127.0.0.1:8081/test

观察后台控台输出,发现myqueue-1接收到第一条消息后就停止处理了,而myqueue-2则正常处理,并且是按照消息的发送顺序处理消息,这正是我们想要的效果。

五,总结

该模型有待优化的一点就是业务逻辑处理失败了,需要消息重发的机制。欢迎大家关注我的公众号,留言讨论。完整源码获取方式:关注以下公众号,回复 “MQ” ,即可获取完整源码。
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

项目实战:基于Rabbitmq实现数据总线多队列串行运行模型(附完整源码) 的相关文章

随机推荐