0. 环境
- nacos版本:1.4.1
- Spring Cloud : 2020.0.2
- Spring Boot :2.4.4
- Spring Cloud alibaba: 2.2.5.RELEASE
测试代码:github.com/hsfxuebao/s…
nacos临时节点的存储是放到内存中的,然后节点之间的数据同步也是那种异步同步的peer to peer方式,属于cap里面的ap模型,但是它的永久节点的存储就不一样了,节点的数据同步是基于raft协议来实现数据一致性的,第一小节我们会简单介绍下nacos中raft协议,然后直接进入源码看看nacos 是怎样实现这个raft协议的。
1. nacos中raft协议简单介绍
《In search of an Understandable Consensus Algorithm》论文
- 集群中的角色分为
LEADER(leader领导者)
, FOLLOWER(follower 跟随者)
,CANDIDATE(candidate候选人)
- 正常情况下集群中只能一个
leader
, 然后一群follower
,是没有这个candidate
,当leader
挂了,这个时候follower
才会变成candidate
进行选举。
- 当
leader
挂了或者集群刚启动的时候要进行选举,选出一个节点当leader
,其他节点变成follower
。
- 选举是节点自己先投自己一票,然后向其他节点拉票,希望其他节点能够投它,票数过集群节点半数+1,就可以成为
leader
-
leader
接收客户端的数据操作请求,如果follower
接收到了客户端操作数据请求,就会将请求转发给leader
节点,由leader
节点进行数据同步,将数据同步给其他follower
节点,nacos中半数+1的节点同步成功就算是ok了。
-
leader
要向follower
发送心跳,告诉follower
自己还活着,如果follower
长时间没有收到这个leader
的心跳信息,就会认为leader
挂了,重新进行选举。
2. 选举
选举工作主要是在RaftCore
这个里面实现的,我们来看下这类的初始化动作:
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
final long start = System.currentTimeMillis();
// 加载数据
raftStore.loadDatums(notifier, datums);
// 加载nacos_home/data/naming/meta.properties
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
initialized = true;
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
// todo 注册一个选举任务,每500ms执行一次
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
// todo 注册一个心跳任务,每500ms执行一次
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
// 向版本切换注册一个观察者,如果切换了新版本,就会停止这个core
versionJudgement.registerObserver(isAllNewVersion -> {
stopWork = isAllNewVersion;
if (stopWork) {
try {
shutdown();
raftListener.removeOldRaftMetadata();
} catch (NacosException e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
}, 100);
// 注册一个监听器,订阅者
NotifyCenter.registerSubscriber(notifier);
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
复制代码
首先是将文件中的数据加载到内存中,文件就是那些服务信息,实例列表这些东西。在你nacos.home/data/naming/data/{namespace}/
下面。接着就是加载 term
,将 nacos_home/data/naming/data/meta.properties
文件中的term加载到内存中,需要注意的每个peer都有自己的term,这个term很重要,在nacos中能不能选上leader就是靠的term大小
。再就是创建了2个任务放到调度线程池中了,一个是选举任务,一个是心跳任务
。
接下来我们看下选举任务
(我们心中要有那么几台机器,然后跟个它这个代码运行 ,不然你看着堆代码还是不能理解它是怎样选出来的):
// 选举任务
public class MasterElection implements Runnable {
@Override
public void run() {
try {
if (stopWork) {
return;
}
// 判断是否准备好了
if (!peers.isReady()) {
return;
}
// 获取自己的peer
RaftPeer local = peers.local();
// 减去500
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
// 大于0 就什么也不干
if (local.leaderDueMs > 0) {
return;
}
// reset timeout
// 重置LeaderDue 15000 + (0-5000随机数)
local.resetLeaderDue();
// 重置HeartbeatDue 5000
local.resetHeartbeatDue();
// todo 发起选票
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
}
复制代码
首先是检查下状态,是否准备好等等,接着获取自己的peer,这个peer可以理解成自己的节点信息。拿自己的leaderDueMs
减500,这个leaderDueMs
一开始是0到15000的随机数,只有减到负数才能继续往下,否则就是return 等待下次的调度(也就是等500ms),这个随机数说实话是非常重要的,再往下走就是重置的这个leaderDue
与heartBeatDue
。leaderDue
重置到15000到20000之间,heartBeatDue
就是5000。接着就是调用sendVote
方法 发送选票了。
private void sendVote() {
// 获取本机的peer
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
local.term);
// 将所有的voteFor 都设置为null 这个voteFor就是将票投给了谁
peers.reset();
// term++ 这个东西比较重要,如果么有这个东西,大家都一样,就选不出来
// 选择比自己term大的那个
local.term.incrementAndGet();
// 先将票投给自己
local.voteFor = local.ip;
// 转变角色为候选人
local.state = RaftPeer.State.CANDIDATE;
Map<String, String> params = new HashMap<>(1);
// 封装参数,将自己扔过去
params.put("vote", JacksonUtils.toJson(local));
// 遍历那堆出了自己的Server
for (final String server : peers.allServersWithoutMySelf()) {
final String url = buildUrl(server, API_VOTE);
try {
HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
return;
}
// 这是ok的
// 远端传过来的那个peer
RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
// todo 决定领导者
peers.decideLeader(peer);
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
复制代码
先是获取自己的peer ,然后就是重置这个peer ,其实就是将voteFor
这个变量设置成null了,这个变量就是投票给谁的意思,接着自己的term自己加1,这个也很重要,接着投票给自己
,就是voteFor设置成了自己的ip,角色变成candidate 候选人
。最后就是给自己拉票了,将自己这个peer作为参数传到集群所有的节点中。
我们看下其他节点收到拉票请求是怎样处理的,这拉票处理是在RaftCore
的receivedVote
方法中处理的:
// 处理投票
public synchronized RaftPeer receivedVote(RaftPeer remote) {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
if (!peers.contains(remote)) {
throw new IllegalStateException("can not find peer: " + remote.ip);
}
// 获取本机peer
RaftPeer local = peers.get(NetUtils.localServer());
// 如果远端peer的term 小于等于 自己的peer的term
if (remote.term.get() <= local.term.get()) {
String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
Loggers.RAFT.info(msg);
// 如果voteFor是null 就设置成自己的返回
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip;
}
return local;
}
// 重置LeaderDue
local.resetLeaderDue();
// 改变状态
local.state = RaftPeer.State.FOLLOWER;
// 投票给这个ip
local.voteFor = remote.ip;
local.term.set(remote.term.get());
Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
return local;
}
复制代码
这个方法就是将自己的peer取出来,然后自己的term 与对端的term进行大小比较,如果是对方term比自己的小,就会将voteFor设置成自己,然后把自己的peer回复给拉票方,其实就是告诉对方你的term太小了,然后我要投票给我自己。如果是对方的term比自己的大,重置leaderDue
,重置leaderDue
后它的选举任务在15000到20000 毫秒之内就不再往下进行了,因为它一次减500,减不到小于0就return。接着就是自己角色设置成FOLLOWER, 投票给对端的,自己的term重置成对端的term,返回给拉票方。
接下来我们再来看下 拉票方收到对方的投票结果后是怎样处理的
:
// 选举Leader
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);
// 统计出现次数的bag
SortedBag ips = new TreeBag();
// 最多票
int maxApproveCount = 0;
// 最多票的peer
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
ips.add(peer.voteFor);
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
// 如果超过半数+1的话
if (maxApproveCount >= majorityCount()) {
// 获取最大票数的peer
RaftPeer peer = peers.get(maxApprovePeer);
// 将这个peer设置成Leader
peer.state = RaftPeer.State.LEADER;
// 如果leader不是选出来的那个peer,就把leader设置成选出来的那个peer
if (!Objects.equals(leader, peer)) {
leader = peer;
// 发布leader选举完成的事件
ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
return leader;
}
复制代码
这个计算自己多少票,看看谁能当leader的方法了,把收到的投票信息放到peers中,遍历找出最大票数与对应的那个peer,如果是最大票数超过了半数+1
public int majorityCount() {
return peers.size() / 2 + 1;
}
复制代码
就会从peer中将取出那个当选的peer ,将这个peer设置成leader,如果自己本地的leader不是这个新当选的leader ,就会替换下,并发布leader选举完成的事件。
好了,到这leader选举就完成了,关键点就是在这个term
上,term越大越能当选,如果是一开始term相等的话,那个leaderDue
是非常重要的,15000到20000 这之间的随机数,如果是15000的就比20000的早执行选举,term也就早+1 ,早发起拉票,早当选
。
3. 心跳
接下来看下发送心跳的任务:
// 心跳任务
public class HeartBeat implements Runnable {
@Override
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
// 重置心跳
local.resetHeartbeatDue();
// todo 发送心跳
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}
复制代码
跟选举差不多,也是先整上个0-5000让你减,减到小于0 就可以往下走了,接着就是重置这个心跳,重置成5000,最后调用sendBeat方法,发送心跳:
private void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
// 如果不是集群模式 或者 自己不是leader 就不往下执行
if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
return;
}
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
}
// 重置LeaderDue
local.resetLeaderDue();
// build data
ObjectNode packet = JacksonUtils.createEmptyJsonNode();
packet.replace("peer", JacksonUtils.transferToJsonNode(local));
ArrayNode array = JacksonUtils.createEmptyArrayNode();
if (switchDomain.isSendBeatOnly()) {
Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
}
// 不仅要发送心跳,还要带着数据的话 默认是false 也就是还得带着数据
if (!switchDomain.isSendBeatOnly()) {
for (Datum datum : datums.values()) {
ObjectNode element = JacksonUtils.createEmptyJsonNode();
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp.get());
array.add(element);
}
}
...
}
复制代码
如果当前不是集群模式或者自己不是leader
, 就不用再往下执行了,也就是只有集群模式的leader才能发送心跳,接着重置一下自己的leaderDue
,这个也是告诉自己的那个选举任务,在15000到20000之间不要选举了,如果不是仅仅发送心跳的话,就是还要带着数据的key与timestamp
:
private void sendBeat() throws IOException, InterruptedException {
...
packet.replace("datums", array);
// broadcast
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JacksonUtils.toJson(packet));
String content = JacksonUtils.toJson(params);
ByteArrayOutputStream out = new ByteArrayOutputStream();
// 压缩
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();
byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
compressedContent.length());
}
for (final String server : peers.allServersWithoutMySelf()) {
try {
// /v1/ns/raft/beat
final String url = buildUrl(server, API_BEAT);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("send beat to server " + server);
}
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
return;
}
// todo
peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("receive beat response from: {}", url);
}
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,
throwable);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
}
复制代码
上面这一大堆就是压缩数据,下面的for循环就是向所有的节点发送心跳(抛去自己),收到回应后更新peer。
我们再来看看follower
节点是怎样处理心跳的:
public RaftPeer receivedBeat(JsonNode beat) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
final RaftPeer local = peers.local();
final RaftPeer remote = new RaftPeer();
JsonNode peer = beat.get("peer");
remote.ip = peer.get("ip").asText();
remote.state = RaftPeer.State.valueOf(peer.get("state").asText());
remote.term.set(peer.get("term").asLong());
remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
remote.leaderDueMs = peer.get("leaderDueMs").asLong();
remote.voteFor = peer.get("voteFor").asText();
// 远端那个不是leader的话 抛异常
if (remote.state != RaftPeer.State.LEADER) {
Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
JacksonUtils.toJson(remote));
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}
// 本地大于远端的也不行
if (local.term.get() > remote.term.get()) {
Loggers.RAFT
.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",
remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);
throw new IllegalArgumentException(
"out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
}
// 本地不是FOLLOWER 角色的话 设置成FOLLOWER
if (local.state != RaftPeer.State.FOLLOWER) {
Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
// mk follower
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
final JsonNode beatDatums = beat.get("datums");
local.resetLeaderDue();
local.resetHeartbeatDue();
// todo
peers.makeLeader(remote);
...
}
复制代码
这个方法很长,我们只看下前半部分,后半部分都是处理那些leader带过来的key信息。首先是取出来leader传过来的数据,判断对端是不是leader,不是不行,判断对端的term与自己的term ,如果自己的大于的对端的话也不行,如果自己不是follower的话,就设置成follower ,并把自己的票给对端,重置这个leaderDue与heartBeatDue,其实你会发现,只要leader一直不挂,往follower发送心跳,follower 就一直会重置leaderDue,follower的选举任务就一直不会往下走。
接着看下这个makeLeader
:
public RaftPeer makeLeader(RaftPeer candidate) {
// 如果leader不是 远端这个 的话就设置成这个
if (!Objects.equals(leader, candidate)) {
leader = candidate;
// 通知MakeLeaderEvent事件
ApplicationUtils.publishEvent(new MakeLeaderEvent(this, leader, local()));
Loggers.RAFT
.info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JacksonUtils.toJson(local()),
JacksonUtils.toJson(leader));
}
for (final RaftPeer peer : peers.values()) {
Map<String, String> params = new HashMap<>(1);
// peer不是candidate 并且 peer是leader
if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) {
try {
// /v1/ns/raft/peer
String url = RaftCore.buildUrl(peer.ip, RaftCore.API_GET_PEER);
HttpClient.asyncHttpGet(url, null, params, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT
.error("[NACOS-RAFT] get peer failed: {}, peer: {}", result.getCode(), peer.ip);
peer.state = RaftPeer.State.FOLLOWER;
return;
}
// 更新下原来是leader的peer
update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
peer.state = RaftPeer.State.FOLLOWER;
Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);
}
}
}
return update(candidate);
}
复制代码
这个方法主要就是更新下自己本地维护的leader,如果本地维护的leader不是远端那个的话,就重新设置下本地的leader,并且发布leader改变的事件, 接着就是遍历所有的peer,找出以前那个leader ,发送请求,获取一下它的peer信息,然后更新下维护的它的peer信息,最后更新下本地委会的leader peer 信息。
好了,到这我们心跳这块也是ok了,心跳执行leader向其他的节点发送,让其他节点知道leader 还活着,如果leader长时间不发送,follower就认为leader挂了,继续走选举的逻辑,这里重置leaderDue是非常重要的。