RocketMQ消费者端消息列队六种负载均衡算法分析

2023-11-12

在RocketMQ启动的时候会启动负载均衡线程,过程如下:
//DefaultMQPullConsumerImpl.start()
 mQClientFactory.start();
 //上面点进去 ->MQClientInstance.start(),rebalanceService继承了ServiceThread,
 //ServiceThread实现了Runnable接口
 this.rebalanceService.start();
 //继续下一层,MQClientInstance.doRebalance()找到下面
 impl.doRebalance();
 //..在一层层点进去,最后找到RebalanceImpl.rebalanceByTopic方法,找到
 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

AllocateMessageQueueStrategy就是实现消费者消息队列负载均衡算法的接口。
该接口在rocketMq-4.3.0版本中有六种实现方法:

  • AllocateMessageQueueAveragely:平均算法
  • AllocateMessageQueueAveragelyByCircle:环形平均算法
  • AllocateMessageQueueByConfig:根据配置负载均衡算法
  • AllocateMessageQueueByMachineRoom:根据机房负载均衡算法
  • AllocateMessageQueueConsistentHash:一致性哈希负载均衡算法
  • AllocateMachineRoomNearby:靠近机房策略

在客户端没有指定的情况下,RocketMQ默认使用AllocateMessageQueueAveragely平均算法。

一、 AllocateMessageQueueAveragely平均负载均衡算法

平均算法顾名思义就是取平均值,该方法四个参数,consumerGroup(消费者组名称)、
currentCID(当前消费者的id)、mqAll(当前topic下面所有的消息队列)、cidAll(当前消费者组下面所有的消费者id)。算法思想就是把算出平均值然后将连续的队列分配给每个消费者。假设队列大小是8(编号0-7),消费者数量3(编号0-2),分配结果就是:
消费者0:队列0,1,2;
消费者1:队列3,4,5;
消费者2:队列6,7。
下面具体来看代码:


 @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }
//        cidAll.size() = 3; mqAll.size()=8;mod=2;
//      index=0;averageSize=3;startIndex=0;range=3,result={0,1,2}
//      index = 1;averageSize=3;startIndex=3;range = 3;result={3,4,5}
//      index = 2;averageSize=2;startIndex=6;range = 2;result={6,7}
        int index = cidAll.indexOf(currentCID);
        //取模a除以b的余数
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

参数校验的就不说了,index算出当前消费者的下标,mod是队列数量对消费者取模,算出来余数(在不能平均分的情况下,前面mod个消费者多一个队列)。averageSize就是算出当前消费者可以分配多少个消息队列。startIndex消息队列开始的下标。

二、AllocateMessageQueueAveragelyByCircle环形平均分配算法

环形分配就可以看成:所有消费者围成一个环,然后循环这个环分配队列。AllocateMessageQueueAveragely方法平均分配的是连续的队列,环形分配的就是间隔的队列。核心代码就一个for循环,也很好理解。假设mq8个,消费者3个,分配后的结果就是{0,3,6},{1,4,7},{2,5}

public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
    private final Logger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }
        // 假设:cidAll.size() = 3; mqAll.size()=8;index=0{0,3,6},index=1{1,4,7},index=2{2,5},转圈算法
        int index = cidAll.indexOf(currentCID);
        for (int i = index; i < mqAll.size(); i++) {
            if (i % cidAll.size() == index) {
                result.add(mqAll.get(i));
            }
        }
        return result;
    }

    @Override
    public String getName() {
        return "AVG_BY_CIRCLE";
    }
}

三、AllocateMessageQueueByMachineRoom机房分配算法

机房分配现根据MQ中的brokerName找出有效的机房信息(也就是消息队列)。然后在评分,这个算法的逻辑是先算出平均值和余数,它和AllocateMessageQueueAveragely平均算法的不同在于,它是先给每个消费者分配mod(平均值个数)个消息队列,然后余数在从头开始一个个分配,假设mq有8个,消费者3个,那么平均值mod = 2,余数2,分配方式就是每个消费者先分配两个mq,{0,1},{2,3},{4,5},然后余数2个在从头开始分配,最后就是{0,1,6},{2,3,7},{4,5}。

public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
    private Set<String> consumeridcs;

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        int currentIndex = cidAll.indexOf(currentCID);
        if (currentIndex < 0) {
            return result;
        }
        List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
        //根据brokerName解析出所有有效的机房信息(有效mq)
        for (MessageQueue mq : mqAll) {
            String[] temp = mq.getBrokerName().split("@");
            if (temp.length == 2 && consumeridcs.contains(temp[0])) {
                premqAll.add(mq);
            }
        }
         //平均结果
        int mod = premqAll.size() / cidAll.size();
//        平均之后的余数
        int rem = premqAll.size() % cidAll.size();
        int startIndex = mod * currentIndex;
        int endIndex = startIndex + mod;
        //所有消费者先平均分配,多的余数在依次分配,例如:队列8,消费者3,{0,1,6},{2,3,7},{4,5}
        for (int i = startIndex; i < endIndex; i++) {
            result.add(mqAll.get(i));
        }
        if (rem > currentIndex) {
            result.add(premqAll.get(currentIndex + mod * cidAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {
        return "MACHINE_ROOM";
    }

    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }

四、AllocateMessageQueueConsistentHash一致性哈希负载均衡算法

这个算法算是这几种中最复杂的一个吧。一致性哈希负载均衡的目的是要保证:相同的请求尽可能落在同一个服务器上。为什么是说尽可能?因为服务器会发生上下线,在少数服务器变化的时候不应该影响大多数的请求。再讲本节算法前,先简单的介绍一下一致性哈希算法,大家如果想有一个更深入的理解,可以去网上搜索更多相关资料。

普通hash算法存在的问题

普通hash算法我们可以简单理解为对key值进行hash之后对服务器取模,也就是hash(key) % n.这个时候如果我们的一台服务器宕机了,或者需要新增一台服务器,那么我们的n值就会变更,这样就会导致我们所有的请求都会变更。举个简单的例子,我们有个redis集群,部署了4台服务器,如果我们将key1使用随机存储,那么我们找key1的时候可能就需要遍历4服务器,效率差。在换种方式,对key1哈希操作后取模,将它定位到一台服务器上,这样在查找key1的时候我们就可以很快的定位到一台服务器上。可是这样还有种问题就是之前所说的如果我们redis集群增加了一台服务器,或者有一台服务器宕机了要从集群中去除。这样再通过hash算出了值就发生了变化。短时间发生缓存雪崩。

一致性hash算法

核心点:

  • 哈希环。刚才的hash算法是对服务器取模,一致性哈希算法使用的是对2^32取模,
    就是一致性哈希将整个hash空间组织成了一个圆环,0-2^32-1.
  • 物理节点:将服务器(ip+端口)进行hash,映射成环上的一个节点。当请求到来时,根据请求的key,hash映射到环上,顺时针选取最近的一个服务器进行请求。
  • 虚拟节点:当环上的服务器较少的时候,会出现分配不均匀的情况,即大量的请求落在同一台服务器上。为了避免这种情况,就引入了虚拟节点,比如通过添加后缀的方式给物理节点克隆出三个虚拟节点,如果两台物理节点,都克隆三个虚拟节点,那么环上就一共有8个节点。只是被克隆的虚拟节点最后还是会定位到实际物理节点上,但是可以有效的分摊请求。

一致性哈希相对于普通hash,优点在于映射到环上的其请求,是发送到环上离他最近的一个服务器,如果我们一台服务器宕机或者新增一台服务器,那么影响的请求只有这台服务器和前一个服务器节点之间的请求,其他的并不会影响。
接下来,我们再来看看RocketMq是如何实现一致性哈希负载均衡算法

//核心代码
 Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
        for (String cid : cidAll) {
            cidNodes.add(new ClientNode(cid));
        }
         //ConsistentHashRouter关键类
        final ConsistentHashRouter<ClientNode> router; //for building hash ring
        if (customHashFunction != null) {
            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
        } else {
            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
        }

        List<MessageQueue> results = new ArrayList<MessageQueue>();
        for (MessageQueue mq : mqAll) {
            ClientNode clientNode = router.routeNode(mq.toString());
            if (clientNode != null && currentCID.equals(clientNode.getKey())) {
                results.add(mq);
            }
        }

        return results;


    private static class ClientNode implements Node {
        private final String clientID;

        public ClientNode(String clientID) {
            this.clientID = clientID;
        }

        @Override
        public String getKey() {
            return clientID;
        }
    }

上面代码在ConsistentHashRouter这个类中构建了哈希环,算法的主要实现都是在这个类中实现的。先来看这个类的构造方法。构造方法中pNodes表示物理节点;vNodeCount表示虚拟节点个数,默认十个;HashFunction表示哈希算法接口,可以自己实现,默认使用MD5实现哈希算法。addNode方法将物理节点和虚拟节点映射到哈希环上。


    public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {
        if (hashFunction == null) {
            throw new NullPointerException("Hash Function is null");
        }
        this.hashFunction = hashFunction;
        if (pNodes != null) {
            for (T pNode : pNodes) {
                addNode(pNode, vNodeCount);
            }
        }
    }

哈希环的构建
这里构建哈希环是通过TreeMap来实现的。

private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<Long, VirtualNode<T>>();

将物理节点、虚拟节点放入treeMap里。通过treeMap的tailMap、firstKey()等方法来获取请求映射对应的节点。后面在细讲。

addNode方法的实现:

 public void addNode(T pNode, int vNodeCount) {
        if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
        //获取环上对应物理节点已经有的虚拟节点个数
        int existingReplicas = getExistingReplicas(pNode);
        for (int i = 0; i < vNodeCount; i++) {
            VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas);
            ring.put(hashFunction.hash(vNode.getKey()), vNode);
        }
    }

 public int getExistingReplicas(T pNode) {
        int replicas = 0;
        //循环ring,找出物理节点pNode在环上有多少虚拟节点
        for (VirtualNode<T> vNode : ring.values()) {
            if (vNode.isVirtualNodeOf(pNode)) {
                replicas++;
            }
        }
        return replicas;
    }

addNode先通过getExistingReplicas方法找出该pNode物理节点在环上已经存在的虚拟节点的个数。然后循环创建新的虚拟节点,下标依次+1。再将新的虚拟节点加入哈希环。哈希环构建好了,再回去继续看我们的主流程。
接着循环所有mq,通过router.routeNode方法找到mq映射的物理节点。

 ClientNode clientNode = router.routeNode(mq.toString());

//ConsistentHashRouter.routeNode
 public T routeNode(String objectKey) {
        if (ring.isEmpty()) {
            return null;
        }
        Long hashVal = hashFunction.hash(objectKey);
        //tailMap方法找到大于等于hashVal映射的集合
        SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);
        Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
        return ring.get(nodeHashVal).getPhysicalNode();
    }

将mq.toString哈希后,得出hashVal,使用TreeMap.tailMap方法得出map中key值大于等于hashVal的集合映射,就是得出了key值在hashVal之后的所有值。接着判断后面的map集合是不是空的,如果不是空的,就取它的第一个值(firstKey),也就是hashVal映射在环上的顺时针最近的节点。如果是空的,就说明该hashVal后面已经没有节点了,因为是环,所以取ring.first(),取环上的第一个节点。然后返回该节点对应的物理节点。

回到主流程,拿到物理节点后,和当前请求客户端id进行比较,是同一个,就把mq分配给他

if (clientNode != null && currentCID.equals(clientNode.getKey())) {
                results.add(mq);
            }
五、AllocateMessageQueueByConfig通过配置负载均衡

这个没啥好说的,自定义配置。

六、AllocateMachineRoomNearby靠近机房策略

休息一下

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ消费者端消息列队六种负载均衡算法分析 的相关文章

  • 面向对象编程(概念)

    面向对象编程 概念 面向过程 面向对象 面向过程思想 1 步骤清晰简单 第一步做什么 第二步做什么 2 面对过程是和处理一些较为简单的题目 面向对象思想 1 物以类聚 分类的思维模式 思考问题首先会解决问题需要哪些分类 然后对这些分类进行单

随机推荐

  • 解决ImportError: No module named zlib问题

    今天在创建django工程时 报了这个错 其实前几天也遇见过当时是在装一个setuptools时 报了这个错误zlib not available按照网上的方法 装好之后重新编译自己的Python 结果没有反应 还是报错 zlib模块是用来
  • java elasticsearch_Java操作ElasticSearch

    java客户端连接到ElasticSearch服务器 创建maven工程 添加坐标 最好跟服务器elasticsearch一致 org elasticsearch client transport 6 2 4 net sf json lib
  • type和interface的区别

    type和interface都可以用来表示接口 但实际用的时候会有写差异 1 type和interface的相同点 都是用来定义对象或函数的形状 interface example name string age number interf
  • 修复Chrome浏览器下载速度慢的问题

    前言 本人使用Mac端进行操作 Win端操作大体基本相同 差别不大 放心食用 特色 操作简单 配套软件链接齐全 传送门 链接 https pan baidu com s 1vcCcPlHaUQmYrQldVM8UVQ pwd 0000 提取
  • 2023第十四届蓝桥杯 C/C++大学生A组省赛 满分题解

    写在前面 以下代码 目前均可通过民间OJ数据 dotcpp New Online Judge 两个OJ题目互补 能构成全集 可以到对应链接下搜题提交 感谢OJ对题目的支持 如果发现任何问题 包含但不限于算法思路出错 OJ数据弱算法实际超时
  • linux固件以及与驱动的区别

    硬件越来越复杂 硬件的许多功能使用了程序实现 与直接硬件实现相比 固件拥有处理复杂事物的灵活性和便于升级 维护等优点 固件 firmware 就是这样的一段在设备硬件自身中执行的程序 通过固件标准驱动程序才能实现特定机器的操作 如 光驱 刻
  • Android Studio 真机调试与虚拟机

    虚拟机调试 1 创建一个简单的hello world例子 支持C 2 SDK manager 根据个人需要选择安装 勾选并点击右下角apply即会自动安装 3 AVD manager 4 运行android程序 弹出选择设备框时有可能无法查
  • python字典中键不允许重复_Python 字典中的“键”不允许重复。 (1.0分)_学小易找答案...

    填空题 已知 x 1 2 3 那么表达式 not set x 100 set x 的值为 1 0分 判断题 列表可以作为字典的 键 1 0分 填空题 已知 x 为非空列表 那么表达式 x reverse list reversed x 的值
  • python自动化课程笔记(十三)单例模型、代码性能、抛出异常

    单例模型 重要 class Person object instance None is first True def new cls args kwargs 如果类属性 instance的值为None 创建一个对象 并赋值为这个对象的引用
  • 一个双非计算机学生的长远规划(考研篇)

    一个双非计算机学生的长远规划 考研篇 本文于2022 5 14 第一次更新 本文于2022 12 07 第二次更新 第二次更新内容 作者此次更新已经是大三了 经历了三段实习 马上去第三家 实习 发现学历真的是让我们实现阶级跨越的鸿沟 没有更
  • 【YAML 学习】

    YAML是 JSON 的超集 因此是一种用于指定分层配置数据的便捷格式 YAML YAML Ain t Markup Language 的递归首字母缩写词 是一种数据序列化语言 旨在实现人性化 并与现代编程语言一起处理常见的日常任务 YAM
  • vue 使用axios 出现跨域请求

    设置一个代理服务器 使用proxyTable配置地方 1 项目文件目录的conf文件夹下的index js build dev 都可设置为一致 dev env require dev env port 80 assetsSubDirecto
  • 实用小工具(数据集转换...)

    xml2yolo py import xml etree ElementTree as ET import pickle import os from os import listdir getcwd from os path import
  • 精选腾讯技术干货200+篇,云加社区全年沙龙PPT免费下载!

    2019年已经过去 小编为大家整理了这一年以来云加社区发布的 200多篇腾讯干货 点击文章标题即可跳转到原文 请速速收藏哦 看腾讯技术 腾讯成本优化黑科技 整机CPU利用率最高提升至90 腾讯科技升级1000天 团战 登月与烟囱革命 看一看
  • @RestController注解作用

    作用 Controller和 ResponseBody的集合 Controller Controller标识的类 该类代表控制器类 控制层 表现层 这里控制层里面的每个方法 都可以去调用 Service标识的类 业务逻辑层 Response
  • java锁杂谈

    关于java锁 内容蛮多的 这篇文章只谈一部分见解 所以取名为杂谈 没有大纲 等后面锁的体系建立起来后再整理一下 那么开始吧 Java 锁有哪些 各种各样 网传15种有余 这些锁的底层大多是AQS实现的 比如 ReentrantLock可重
  • 搜集Shader一些参数(为自己)

    Shader ConfigurableShaders Rendering Properties Header Stencil Stencil Stencil ID 0 255 Float 0 ReadMask ReadMask 0 255
  • PLC控制运料小车往返运动

    实验要求 1 实验控制电路应该具有控制回路的总控制 其功能是启动和停止控制电路 它可以使小车停站的位置行程开关处于压合的位置 脱离延迟控制往返时为启动状态 及零压保护电路功能 2 小车沿轨道自动往返运动时 小车在行程内的任何位置时都可以起动
  • c++实现创建一个cocos2d-x的场景类

    文件 http pan baidu com s 1ntlu14H createVSClass cpp 定义控制台应用程序的入口点 include stdafx h include
  • RocketMQ消费者端消息列队六种负载均衡算法分析

    在RocketMQ启动的时候会启动负载均衡线程 过程如下 DefaultMQPullConsumerImpl start mQClientFactory start 上面点进去 gt MQClientInstance start rebal