在RocketMQ启动的时候会启动负载均衡线程,过程如下:
//DefaultMQPullConsumerImpl.start()
mQClientFactory.start();
//上面点进去 ->MQClientInstance.start(),rebalanceService继承了ServiceThread,
//ServiceThread实现了Runnable接口
this.rebalanceService.start();
//继续下一层,MQClientInstance.doRebalance()找到下面
impl.doRebalance();
//..在一层层点进去,最后找到RebalanceImpl.rebalanceByTopic方法,找到
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
AllocateMessageQueueStrategy就是实现消费者消息队列负载均衡算法的接口。
该接口在rocketMq-4.3.0版本中有六种实现方法:
- AllocateMessageQueueAveragely:平均算法
- AllocateMessageQueueAveragelyByCircle:环形平均算法
- AllocateMessageQueueByConfig:根据配置负载均衡算法
- AllocateMessageQueueByMachineRoom:根据机房负载均衡算法
- AllocateMessageQueueConsistentHash:一致性哈希负载均衡算法
- AllocateMachineRoomNearby:靠近机房策略
在客户端没有指定的情况下,RocketMQ默认使用AllocateMessageQueueAveragely平均算法。
一、 AllocateMessageQueueAveragely平均负载均衡算法
平均算法顾名思义就是取平均值,该方法四个参数,consumerGroup(消费者组名称)、
currentCID(当前消费者的id)、mqAll(当前topic下面所有的消息队列)、cidAll(当前消费者组下面所有的消费者id)。算法思想就是把算出平均值然后将连续的队列分配给每个消费者。假设队列大小是8(编号0-7),消费者数量3(编号0-2),分配结果就是:
消费者0:队列0,1,2;
消费者1:队列3,4,5;
消费者2:队列6,7。
下面具体来看代码:
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// cidAll.size() = 3; mqAll.size()=8;mod=2;
// index=0;averageSize=3;startIndex=0;range=3,result={0,1,2}
// index = 1;averageSize=3;startIndex=3;range = 3;result={3,4,5}
// index = 2;averageSize=2;startIndex=6;range = 2;result={6,7}
int index = cidAll.indexOf(currentCID);
//取模a除以b的余数
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
参数校验的就不说了,index算出当前消费者的下标,mod是队列数量对消费者取模,算出来余数(在不能平均分的情况下,前面mod个消费者多一个队列)。averageSize就是算出当前消费者可以分配多少个消息队列。startIndex消息队列开始的下标。
二、AllocateMessageQueueAveragelyByCircle环形平均分配算法
环形分配就可以看成:所有消费者围成一个环,然后循环这个环分配队列。AllocateMessageQueueAveragely方法平均分配的是连续的队列,环形分配的就是间隔的队列。核心代码就一个for循环,也很好理解。假设mq8个,消费者3个,分配后的结果就是{0,3,6},{1,4,7},{2,5}
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
private final Logger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 假设:cidAll.size() = 3; mqAll.size()=8;index=0{0,3,6},index=1{1,4,7},index=2{2,5},转圈算法
int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {
if (i % cidAll.size() == index) {
result.add(mqAll.get(i));
}
}
return result;
}
@Override
public String getName() {
return "AVG_BY_CIRCLE";
}
}
三、AllocateMessageQueueByMachineRoom机房分配算法
机房分配现根据MQ中的brokerName找出有效的机房信息(也就是消息队列)。然后在评分,这个算法的逻辑是先算出平均值和余数,它和AllocateMessageQueueAveragely平均算法的不同在于,它是先给每个消费者分配mod(平均值个数)个消息队列,然后余数在从头开始一个个分配,假设mq有8个,消费者3个,那么平均值mod = 2,余数2,分配方式就是每个消费者先分配两个mq,{0,1},{2,3},{4,5},然后余数2个在从头开始分配,最后就是{0,1,6},{2,3,7},{4,5}。
public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
private Set<String> consumeridcs;
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {
return result;
}
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
//根据brokerName解析出所有有效的机房信息(有效mq)
for (MessageQueue mq : mqAll) {
String[] temp = mq.getBrokerName().split("@");
if (temp.length == 2 && consumeridcs.contains(temp[0])) {
premqAll.add(mq);
}
}
//平均结果
int mod = premqAll.size() / cidAll.size();
// 平均之后的余数
int rem = premqAll.size() % cidAll.size();
int startIndex = mod * currentIndex;
int endIndex = startIndex + mod;
//所有消费者先平均分配,多的余数在依次分配,例如:队列8,消费者3,{0,1,6},{2,3,7},{4,5}
for (int i = startIndex; i < endIndex; i++) {
result.add(mqAll.get(i));
}
if (rem > currentIndex) {
result.add(premqAll.get(currentIndex + mod * cidAll.size()));
}
return result;
}
@Override
public String getName() {
return "MACHINE_ROOM";
}
public Set<String> getConsumeridcs() {
return consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
四、AllocateMessageQueueConsistentHash一致性哈希负载均衡算法
这个算法算是这几种中最复杂的一个吧。一致性哈希负载均衡的目的是要保证:相同的请求尽可能落在同一个服务器上。为什么是说尽可能?因为服务器会发生上下线,在少数服务器变化的时候不应该影响大多数的请求。再讲本节算法前,先简单的介绍一下一致性哈希算法,大家如果想有一个更深入的理解,可以去网上搜索更多相关资料。
普通hash算法存在的问题
普通hash算法我们可以简单理解为对key值进行hash之后对服务器取模,也就是hash(key) % n.这个时候如果我们的一台服务器宕机了,或者需要新增一台服务器,那么我们的n值就会变更,这样就会导致我们所有的请求都会变更。举个简单的例子,我们有个redis集群,部署了4台服务器,如果我们将key1使用随机存储,那么我们找key1的时候可能就需要遍历4服务器,效率差。在换种方式,对key1哈希操作后取模,将它定位到一台服务器上,这样在查找key1的时候我们就可以很快的定位到一台服务器上。可是这样还有种问题就是之前所说的如果我们redis集群增加了一台服务器,或者有一台服务器宕机了要从集群中去除。这样再通过hash算出了值就发生了变化。短时间发生缓存雪崩。
一致性hash算法
核心点:
- 哈希环。刚才的hash算法是对服务器取模,一致性哈希算法使用的是对2^32取模,
就是一致性哈希将整个hash空间组织成了一个圆环,0-2^32-1.
- 物理节点:将服务器(ip+端口)进行hash,映射成环上的一个节点。当请求到来时,根据请求的key,hash映射到环上,顺时针选取最近的一个服务器进行请求。
- 虚拟节点:当环上的服务器较少的时候,会出现分配不均匀的情况,即大量的请求落在同一台服务器上。为了避免这种情况,就引入了虚拟节点,比如通过添加后缀的方式给物理节点克隆出三个虚拟节点,如果两台物理节点,都克隆三个虚拟节点,那么环上就一共有8个节点。只是被克隆的虚拟节点最后还是会定位到实际物理节点上,但是可以有效的分摊请求。
一致性哈希相对于普通hash,优点在于映射到环上的其请求,是发送到环上离他最近的一个服务器,如果我们一台服务器宕机或者新增一台服务器,那么影响的请求只有这台服务器和前一个服务器节点之间的请求,其他的并不会影响。
接下来,我们再来看看RocketMq是如何实现一致性哈希负载均衡算法
//核心代码
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
cidNodes.add(new ClientNode(cid));
}
//ConsistentHashRouter关键类
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
ClientNode clientNode = router.routeNode(mq.toString());
if (clientNode != null && currentCID.equals(clientNode.getKey())) {
results.add(mq);
}
}
return results;
private static class ClientNode implements Node {
private final String clientID;
public ClientNode(String clientID) {
this.clientID = clientID;
}
@Override
public String getKey() {
return clientID;
}
}
上面代码在ConsistentHashRouter这个类中构建了哈希环,算法的主要实现都是在这个类中实现的。先来看这个类的构造方法。构造方法中pNodes表示物理节点;vNodeCount表示虚拟节点个数,默认十个;HashFunction表示哈希算法接口,可以自己实现,默认使用MD5实现哈希算法。addNode方法将物理节点和虚拟节点映射到哈希环上。
public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {
if (hashFunction == null) {
throw new NullPointerException("Hash Function is null");
}
this.hashFunction = hashFunction;
if (pNodes != null) {
for (T pNode : pNodes) {
addNode(pNode, vNodeCount);
}
}
}
哈希环的构建
这里构建哈希环是通过TreeMap来实现的。
private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<Long, VirtualNode<T>>();
将物理节点、虚拟节点放入treeMap里。通过treeMap的tailMap、firstKey()等方法来获取请求映射对应的节点。后面在细讲。
addNode方法的实现:
public void addNode(T pNode, int vNodeCount) {
if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
//获取环上对应物理节点已经有的虚拟节点个数
int existingReplicas = getExistingReplicas(pNode);
for (int i = 0; i < vNodeCount; i++) {
VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas);
ring.put(hashFunction.hash(vNode.getKey()), vNode);
}
}
public int getExistingReplicas(T pNode) {
int replicas = 0;
//循环ring,找出物理节点pNode在环上有多少虚拟节点
for (VirtualNode<T> vNode : ring.values()) {
if (vNode.isVirtualNodeOf(pNode)) {
replicas++;
}
}
return replicas;
}
addNode先通过getExistingReplicas方法找出该pNode物理节点在环上已经存在的虚拟节点的个数。然后循环创建新的虚拟节点,下标依次+1。再将新的虚拟节点加入哈希环。哈希环构建好了,再回去继续看我们的主流程。
接着循环所有mq,通过router.routeNode方法找到mq映射的物理节点。
ClientNode clientNode = router.routeNode(mq.toString());
//ConsistentHashRouter.routeNode
public T routeNode(String objectKey) {
if (ring.isEmpty()) {
return null;
}
Long hashVal = hashFunction.hash(objectKey);
//tailMap方法找到大于等于hashVal映射的集合
SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);
Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
return ring.get(nodeHashVal).getPhysicalNode();
}
将mq.toString哈希后,得出hashVal,使用TreeMap.tailMap方法得出map中key值大于等于hashVal的集合映射,就是得出了key值在hashVal之后的所有值。接着判断后面的map集合是不是空的,如果不是空的,就取它的第一个值(firstKey),也就是hashVal映射在环上的顺时针最近的节点。如果是空的,就说明该hashVal后面已经没有节点了,因为是环,所以取ring.first(),取环上的第一个节点。然后返回该节点对应的物理节点。
回到主流程,拿到物理节点后,和当前请求客户端id进行比较,是同一个,就把mq分配给他
if (clientNode != null && currentCID.equals(clientNode.getKey())) {
results.add(mq);
}
五、AllocateMessageQueueByConfig通过配置负载均衡
这个没啥好说的,自定义配置。
六、AllocateMachineRoomNearby靠近机房策略
休息一下