Zookeeper leader选举源码分析(超详细)

2023-05-16

选举介绍

在开始分析选举的原理之前,先了解几个重要的参数

  • 服务器 ID(myid)

    比如有三台服务器,编号分别是 1,2,3。

    编号越大在选择算法中的权重越大。

  • zxid 事务 id-(ZooKeeper transaction ID)

    值越大说明数据越新,在选举算法中的权重也越大

  • 逻辑时钟(epoch – logicalclock)

    或者叫投票的次数,同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比,根据不同的值做出不同的判断。

  • 选举状态

    LOOKING,竞选状态。

    FOLLOWING,随从状态,同步 leader 状态,参与投票。

    OBSERVING,观察状态,同步 leader 状态,不参与投票。LEADING,领导者状态。

服务器启动时的 leader 选举

每个节点启动的时候状态都是 LOOKING,处于观望状态,接下来就开始进行选主流程。

若进行 Leader 选举,则至少需要两台机器,一般都是选取 3 台机器组成的服务器集群。

(在下面的例子中,为了表述清晰,我们使用两台进行讲解)

在集群初始化阶段,当有一台服务器 Server1 启动时,其单独无法进行和完成 Leader 选举,当第二台服务器 Server2 启动时,此时两台机器可以相互通信,每台机器都试图找到 Leader,于是进入 Leader选举过程。选举过程如下:

(1) 每个 Server 发出一个投票。由于是初始情况,Server1 和 Server2 都会将自己作为 Leader 服务器来进行投票,每次投票会包含所推举的服务器的 myid ZXIDepoch,使用(myid, ZXID,epoch)来表示。

此时 Server1 的投票为(1, 0),Server2 的投票为(2, 0),然后各自将这个投票发给集群中其他机器。

(2) 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票(epoch)、是否来自LOOKING 状态的服务器。

(3) 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行 PK.
PK 规则如下:

i. 优先比较 epochii. 其次检查 ZXID。ZXID 比较大的服务器优先作为 Leader
iii. 如果 ZXID 相同,那么就比较 myid。myid 较大的服务器作为Leader 服务器。

对于 Server1 而言,它的投票是(1, 0),接收 Server2 的投票为(2, 0)

首先会比较两者的 ZXID,均为 0,再比较 myid,此时 Server2 的myid 最大,于是更新自己的投票为(2, 0),然后重新投票。对于Server2 而言,其无须更新自己的投票。

(4) 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于 Server1、Server2 而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了 Leader。

(5) 改变服务器状态。一旦确定了 Leader,每个服务器就会更新自己的状态,如果是 Follower,那么就变更为 FOLLOWING,如果是 Leader,就变更为 LEADING

准备

下载源码:https://codeload.github.com/apache/zookeeper/tar.gz/refs/tags/release-3.7.1

附注:3.7.1发布日期:2022-05-11

编译源码

mvn clean install -DskpTests=true

我这边非常顺利,没报找不到依赖的问题

导入idea

  1. 正常open即可

  2. 修改zookeeper-server/pom.xml

    • 增加log4j依赖(为了显示日志)

    • 重要: 注释掉metrics-core、snappy-java的provide(不注释掉运行时会报找不到class错误)

     <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.6.4</version>
    </dependency>
    <dependency>
           <groupId>io.dropwizard.metrics</groupId>
           <artifactId>metrics-core</artifactId>
    <!--       <scope>provided</scope>-->
    </dependency>
    <dependency>
          <groupId>org.xerial.snappy</groupId>
          <artifactId>snappy-java</artifactId>
    <!--      <scope>provided</scope>-->
    </dependency>
    
  3. zookeeper/srs/main/resources下增加log4j.properties

    zookeeper.root.logger=DEBUG,CONSOLE
    zookeeper.console.threshold=DEBUG
    log4j.rootLogger=${zookeeper.root.logger}
    
    log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
    #log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
    log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
    log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
    

入口

  • 打开zkServer.sh,找到start选项;然后注意一下$ZOOMAIN(启动类)

  • 全局搜索ZOOMAIN。找到入口类org.apache.zookeeper.server.quorum.QuorumPeerMain

运行

准备

  1. 三份zk配置z1.cfg,z2.cfg,z3.cfg;(注意dataDir的路径)

    #z1.cfg
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/Users/twx/code-space/internet/zookeeper-release-3.7.1/quorum/data1
    clientPort=2181
    server.1=127.0.0.1:2222:2223
    server.2=127.0.0.1:3333:3334
    server.3=127.0.0.1:4444:4445
    #z2.cfg
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/Users/twx/code-space/internet/zookeeper-release-3.7.1/quorum/data2
    clientPort=2182
    server.1=127.0.0.1:2222:2223
    server.2=127.0.0.1:3333:3334
    server.3=127.0.0.1:4444:4445
    #z3.cfg
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/Users/twx/code-space/internet/zookeeper-release-3.7.1/quorum/data3
    clientPort=2183
    server.1=127.0.0.1:2222:2223
    server.2=127.0.0.1:3333:3334
    server.3=127.0.0.1:4444:4445
    
  2. 创建dataDir目录;并分别echo 1 > data1/myid,echo 2 > data2/myid,echo 3 > data3/myid

开始

打开IDEA/Run Configure;按照下图配置3个实例;

  • Modify options -> 勾选Allow multiple innstances
  • Program args 中填入zookeeper的配置文件地址

解析配置

配置解析我讲着重讲一下parseDynamicConfig()方法,即QuorumVerifier的创建。

其他的大家可以自行跟进下代码。

public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
          ...
            
protected void initializeAndRun(String[] args) throws... {
  QuorumPeerConfig config = new QuorumPeerConfig();
  if (args.length == 1) {
    config.parse(args[0]);
  }
  ...

public void parse(String path) throws ConfigException {
        ...
   /* Read entire config file as initial configuration */
   initialConfig = new String(Files.readAllBytes(configFile.toPath()));

   parseProperties(cfg); 
   .... 

public void parseProperties(Properties zkProp) throws ... {   
    ...
    //在循环里对z1.cfg里的属性进行解析
    for (Entry<Object, Object> entry : zkProp.entrySet()) {
        String key = entry.getKey().toString().trim();
        String value = entry.getValue().toString().trim();
        if (key.equals("dataDir")) {
            dataDir = vff.create(value);
        }
        ...
    }
    ...
    if (dynamicConfigFileStr == null) {
        //看这里,比较重要
        //这里主要是解析
        //server.1=127.0.0.1:2222:2223
        //server.2=127.0.0.1:3333:3334
        //server.3=127.0.0.1:4444:4445
        setupQuorumPeerConfig(zkProp, true);
        if (isDistributed() && isReconfigEnabled()) {
            // we don't backup static config for standalone mode.
            // we also don't backup if reconfig feature is disabled.
            backupOldConfig();
        }
    }
}

void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) ... {
    //quorumVerifier是一个比较重要的对象,必须进去看一下
    quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
    //读取dataN/myid里的值,赋给成员变量serverId
    setupMyId();
    setupClientPort();
    setupPeerType();
    checkValidity();
}

public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings, boolean configBackwardCompatibilityMode)...{
    boolean isHierarchical = false;
    //这个循环主要是为了给isHierarchical赋值
    for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
        ...
    }
    //默认创建QuorumMaj,很重要的一个对象;参数isHierarchical=false
    QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical);
}

private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException {
    if (isHierarchical) {
        return new QuorumHierarchical(dynamicConfigProp);
    } else {
        /*
            * The default QuorumVerifier is QuorumMaj
            */
        //LOG.info("Defaulting to majority quorums");
        return new QuorumMaj(dynamicConfigProp);//重要:解析服务器地址&&设定过半half参数
    }
}

//继续看看QuorumMaj的创建过程
//注意QuorumMaj类有三个成员变量,非常重要,后续经常使用到
private Map<Long, QuorumServer> allMembers = new HashMap<Long, QuorumServer>();
private Map<Long, QuorumServer> votingMembers = new HashMap<Long, QuorumServer>();
private Map<Long, QuorumServer> observingMembers = new HashMap<Long, QuorumServer>();
//该构造方法遍历配置文件的server.N,将连接信息构造成一个QuorumServer对象,放到allMembers、votingMembers集合中
//根据前面给出的z*.cfg文件,这里的observingMembers.size()==0
//还有点必须注意: half = votingMembers.size() / 2;
//这里的half就是过半提交一个比较关键的点
public QuorumMaj(Properties props) throws ConfigException {
    for (Entry<Object, Object> entry : props.entrySet()) {
        String key = entry.getKey().toString();
        String value = entry.getValue().toString();

        if (key.startsWith("server.")) {
            int dot = key.indexOf('.');
            long sid = Long.parseLong(key.substring(dot + 1));
            //QuorumServer格式: 1==>127.0.01:2222:2223
            QuorumServer qs = new QuorumServer(sid, value);
            allMembers.put(Long.valueOf(sid), qs);
            //qs.type 默认就是LearnerType.PARTICIPANT
            if (qs.type == LearnerType.PARTICIPANT) {
                votingMembers.put(Long.valueOf(sid), qs);
            } else {
                observingMembers.put(Long.valueOf(sid), qs);
            }
        } else if (key.equals("version")) {
            version = Long.parseLong(value, 16);
        }
    }
    //过半 3/2=1;
    half = votingMembers.size() / 2;
}

QuorumMaj类有三个成员变量allMembers、votingMembers、observingMembers,非常重要!QuorumMaj的构造方法遍历配置文件的server.N,将连接信息构造成一个QuorumServer对象,放到allMembers、votingMembers集合中。还有个成员变量half,在构造方法中被赋值votingMembers.size() / 2;half是后续跳过leader选举一个非常关键的点。

启动

刚刚结束了配置文件的解析(config.parse(args[0]);),接下来看看正式的启动流程(runFromConfig(config);)。



protected void initializeAndRun(String[] args)...{
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }
    // Start and schedule the the purge task
    ...
    //集群模式
    if (args.length == 1 && config.isDistributed()) {
        runFromConfig(config);
    } else {
        //单击模式
        LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}

public void runFromConfig(QuorumPeerConfig config)...{
try {
        ...
        if (config.getClientPortAddress() != null) {
            //默认使用NIOServerCnxnFactory连接工厂,反射创建实例
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
        }

        if (config.getSecureClientPortAddress() != null) {
            secureCnxnFactory = ServerCnxnFactory.createFactory();
            secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
        }
        //new QuorumPeer();
        quorumPeer = getQuorumPeer();
        //FileTxnSnapLog用于操作快照和事务日志的帮助类;挺重要(暂时对主线没影响)
        quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
        ...
        //quorumPeer.setQuorumPeers(config.getAllMembers());
        //配置文件没设置的话,默认就是3
        quorumPeer.setElectionType(config.getElectionAlg());
        //刚刚解析配置文件的时候说过了
        quorumPeer.setMyid(config.getServerId());
        ...
        //启动的时候ZKDatabase会从事务日志和快照文件中读取数据构造成我们熟悉的树结构
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        //解析配置的时候着重讲了QuorumVerifier的创建过程
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
        if (config.getLastSeenQuorumVerifier() != null) {
            quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
        }
        quorumPeer.initConfigInZKDatabase();
        //设置连接工厂:NIOServerCnxnFactory
        quorumPeer.setCnxnFactory(cnxnFactory);
        quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
        ...
        //设置peerType:若配置文件未指定,成员变量默认值是LearnerType.PARTICIPANT
        quorumPeer.setLearnerType(config.getPeerType());
        quorumPeer.setSyncEnabled(config.getSyncEnabled());
        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
        ...
        //从这里正式开始了
        quorumPeer.start();
        ZKAuditProvider.addZKStartStopAuditLog();
        quorumPeer.join();
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Quorum Peer interrupted", e);
    } finally {
        try {
            metricsProvider.stop();
        } catch (Throwable error) {
            LOG.warn("Error while stopping metrics", error);
        }
    }
}

public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    //调用zkDb.loadDataBase();加载数据;与主线无关,暂时不看
    loadDataBase();
    //还记得之前的cnxnFactory = ServerCnxnFactory.createFactory();
    //这里将会调用NIOServerCnxnFactory的start()方法
    startServerCnxnFactory();
    ...
    //leader选举的重点
    startLeaderElection();
    startJvmPauseMonitor();
    //QuorumPeer继承了Thread,这里启动的是线程
    //所以往后我们要去看run()方法
    super.start();
}


public synchronized void startLeaderElection() {
    try {
        //刚启动的时候state默认是LOOKING
        //因为成员变量state = ServerState.LOOKING;
        if (getPeerState() == ServerState.LOOKING) {
            //构造一个投票(myid,zxid,epoch)
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    //开始选举算法,这里的electionType=3
    this.electionAlg = createElectionAlgorithm(electionType);
}

上面代码片段中比较重要的两个方法createElectionAlgorithm(electionType);start()->super.start();

createElectionAlgorithm(electionType)具体内容稍候再看,这里先简单概括一下它干了些什么事:

  1. 启动QuorumCnxManager.Listener线程
  2. 启动FastLeaderElection.Messenger.WorkerReceiver线程
  3. 启动FastLeaderElection.Messenger.WorkerSender线程

所以createElectionAlgorithm(electionType)只是启动了3个线程而已,所以它会很快返回。

接着就会执行start()->super.start();,即QuorumPeer.run()方法。我们先来看看这个方法:

public void run() {
    ...
    try {
        /*
            * Main loop
            */
        while (running) {
            ...
            switch (getPeerState()) {
            case LOOKING:
                LOG.info("LOOKING");
                ServerMetrics.getMetrics().LOOKING_COUNT.add(1);

                if (Boolean.getBoolean("readonlymode.enabled")) {
                    ...
                } else {
                    try {
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        //makeLEStrategy().lookForLeader()内部也是个while(true)
                        //直接跳到FLE.lookForLeader()
                        //直到leader选举结束
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                }
                //leader选举结束后,确定了本节点是什么角色,然后进入对应的switch case
                break;
            case OBSERVING:
                ...
                break;
            case FOLLOWING:
                ...
                break;
            case LEADING:
                ..
                break;
            }
        }//end while
    } finally {
        ...
    }
}

QuorumPeer.run()真正需要关心的是这一句setCurrentVote(makeLEStrategy().lookForLeader());再跟进一步,其实我们最关心的是FasterLeaderElection.lookForLeader()方法。该方法内部是个while(true)循环,直到leader选举结束!

此时,我们暂且不看lookForLeader方法。

我们先来看看下面这张图。

要理解上面这张图,我们把刚刚跳过的createElectionAlgorithm(electionType)方法代码贴一下:

protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    case 3:
        //QuorumCnxManager也是一个非常重要的类
        //负责发起网络请求(将投票发送出去或者接受别的节点发送过来的投票)
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if (oldQcm != null) {
            LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
            oldQcm.halt();
        }
        //QuorumCnxManager.Listener这个内部类Listener也是继承Thread
        //所以listener.start();之后我们去看Listener的run()方法
        //该Listener的作用是开启一个选举端口,比如我们在z1.cfg配置的2223端口
        //具体位置在QuorumCnxManager.Listener.ListenerHandler.run()->acceptConnections()->createNewServerSocket();
        //当elect Server收到投票票据后,按以下流程处理网络数据
        //QuorumCnxManager.receiveConnection()->handleConnection()
        //在handleConnection()内部new RecvWorker(sock, din, sid, sw)线程。(注意din是我们收到的网络投票数据)
        //RecvWorker线程把收到的投票数据扔到QuorumCnxManager的recvQueue阻塞队列
        //(FastLeaderElection.Messenger.WorkerReceiver线程会循环从recvQueue队列中拉数据,表现如下
        // manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);)至此,形成了一个闭环
        QuorumCnxManager.Listener listener = qcm.listener;
        if (listener != null) {
            listener.start();
            //FastLeaderElection是真正选举的地方
            //注意在这个构造方法里new了一个内部类new Messenger(manager)对象
            //同时Messenger又有两个内部类WorkerReceiver和WorkerSender(他们都继承了Thread)
            //后面看图讲解
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            //这个start()方法调用了messenger的start()方法
            //messenger.start()内部启动了WorkerReceiver和WorkerSender线程
            fle.start();
            //start()调用之后,立马返回;随后结束switch,跳出createElectionAlgorithm()方法
            le = fle;
            //总结一下: 退出createElectionAlgorithm()方法后,将会有3个线程在运行
            //1. QuorumCnxManager.Listener
            //2. FastLeaderElection.Messenger.WorkerReceiver
            //3. FastLeaderElection.Messenger.WorkerSender
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

上面那张图里的QuorumCnxManager是一个非常重要的类。它负责发起网络请求(将投票发送出去或者接受别的节点发送过来的投票)。

QuorumCnxManager有三个内部类Listener(线程)、RecvWoker(线程)、SendWorker(线程)

Listener(线程)负责创建ServerSocket,用来接收投票信息。具体创建ServerSokcet的过程看代码注释。

当收到网络投票的时候QuorumCnxManager.Listener.ListenerHandler.acceptConnections()client = serverSocket.accept();就会继续运行下去。最终如图中所示,会将网络投票数据添加到成员变量recvQueue<Message>(阻塞队列)中。

同时FastLeaderElection.start()内部启动了WorkerReceiver线程(见代码注释),在不间断地poll QuorumCnxManager的recvQueue<Message>。拉到消息后,然后把消息封装成Notification放到FastLeaderElection中的recvQueue<Notification>中.

详见如下代码:

//WorkerReceiver.run()
public void run() {
    Message response;
    while (!stop) {
        // Sleeps on receive
        try {
            response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
            if (response == null) {
                continue;
            }
            ...
            // Instantiate Notification and set its attributes
            Notification n = new Notification();

            int rstate = response.buffer.getInt();
            long rleader = response.buffer.getLong();
            long rzxid = response.buffer.getLong();
            long relectionEpoch = response.buffer.getLong();
            long rpeerepoch;

            int version = 0x0;
            QuorumVerifier rqv = null;
            ...
            if (!validVoter(response.sid)) {
                ...
            } else {
                // Receive new message
                LOG.debug("Receive new notification message. My id = {}", self.getId());
                // State of peer that sent this message
                QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                switch (rstate) {
                case 0:
                    ackstate = QuorumPeer.ServerState.LOOKING;
                    break;
                case 1:
                    ackstate = QuorumPeer.ServerState.FOLLOWING;
                    break;
                case 2:
                    ackstate = QuorumPeer.ServerState.LEADING;
                    break;
                case 3:
                    ackstate = QuorumPeer.ServerState.OBSERVING;
                    break;
                default:
                    continue;
                }

                n.leader = rleader;
                n.zxid = rzxid;
                n.electionEpoch = relectionEpoch;
                n.state = ackstate;
                n.sid = response.sid;
                n.peerEpoch = rpeerepoch;
                n.version = version;
                n.qv = rqv;
                ...
                //如果是LOOKING状态
                if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                    recvqueue.offer(n);
                    ....
                    if ((ackstate == QuorumPeer.ServerState.LOOKING)
                        && (n.electionEpoch < logicalclock.get())) {
                        ...
                    }
                } else {
                    ...
                }
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted Exception while waiting for new message", e);
        }
    }//end while
}

lookForLeader

现在我们回过头来看看FasterLeaderElection.lookForLeader()(在贴图之前已经粗略地介绍过了).

下面是lookForLeader()源码:

public Vote lookForLeader() throws InterruptedException {
    ...
    try {
        //用来存储接收到的投票,非常重要
        Map<Long, Vote> recvset = new HashMap<Long, Vote>();
        ...
        synchronized (this) {
            //逻辑时钟+1
            logicalclock.incrementAndGet();
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        //把初始的投票数据发送出去(即第一轮投票),假设当前是z1节点,投票数据(1,0,1)
        //Vote(1,0,1)会发往[1,2,3]
        //发往1(即本身)的投票会被直接投递到QuorumCnxManager的recvQueue<Message>中
        //发往2、3的投票会经由网络
        sendNotifications();
        SyncedLearnerTracker voteSet;

        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            //从recvqueue拉数据,前面介绍过recvqueue的数据是通过FastLeaderElection内部的WorkerReceiver线程offer的
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
            if (n == null) {
                //检查queueSendMap所有的队列是否为空
                //不为空说明还有投票没发出去
                if (manager.haveDelivered()) {
                    sendNotifications();
                } else {
                    //再次尝试去发起连接
                    manager.connectAll();
                }
                int tmpTimeOut = notTimeout * 2;
                notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                LOG.info("Notification time out: {}", notTimeout);
            } else if (validVoter(n.sid) && validVoter(n.leader)) {
                //校验下收到的网络投票是否来自配置文件中的server列表中的服务器
                switch (n.state) {
                case LOOKING:
                    ...
                    //收到的选举朝代(也可以理解为轮回)比本身的逻辑时钟要大,说明本身的投票轮回落后了
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) {
                        //收到的选举朝代(也可以理解为轮回)比本身的逻辑时钟要小,说明从网络上收到的投票消息过期了,自然就不用处理
                            LOG.debug(
                                "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                Long.toHexString(n.electionEpoch),
                                Long.toHexString(logicalclock.get()));
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                        //totalOrderPredicate()是一个比较重要的方法:
                        //通过与收到的网络投票相比较用于更新投票
                        //如果返回true,(即进入到了这里)说明网络投票优先本身
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        //重新把投票发送出去
                        sendNotifications();
                    }
                    ...
                    //将收到的网络投票记录下来
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    //重要的方法: 将当前的Vote与recvset中的所有投票进行比对。(待会重点来看一下这个方法)
                    voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                    //判断vote是否已过半
                    if (voteSet.hasAllQuorums()) {
                        //虽然已经完成过半的节点赞成某个sid成为leader,但是还是得尝试一下recvqueue有没有数据(可能由于网络原因z3节点的投票数据这时候才到达)
                        while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }
                        //如果recvqueue真的没有任务投票数据了,就可以确认节点状态,退出leader选举了
                        if (n == null) {
                            //更新节点状态: leading | following | observing
                            setPeerState(proposedLeader, voteSet);
                            Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                            leaveInstance(endVote);
                            //退出while循环,程序回到QuorumPeer的setCurrentVote(makeLEStrategy().lookForLeader());
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    ...
                default:
                    break;
                }
            } else {
                ...
            }
        }//end while
        return null;
    } finally {
        ...
    }
}

首先,Map<Long, Vote> recvset = new HashMap<Long, Vote>();是非常重要的,它会将网络(z2、z3节点)上发送过来的投票数据保存下来(包括它自己)。

synchronized (this) {
  //逻辑时钟+1
  logicalclock.incrementAndGet();
  updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//把初始的投票数据发送出去(即第一轮投票),假设当前是z1节点,投票数据(1,0,1)
//Vote(1,0,1)会发往[1,2,3]
//发往1(即本身)的投票会被直接投递到QuorumCnxManager的recvQueue<Message>中
//发往2、3的投票会经由网络
sendNotifications();

上面这段代码是zk刚启动的时候发送的第一轮投票;

将投票轮次logicalclock设置为1。

初始的Vote是(1,0,0) --(proposedLeader,proposedZxid,proposedEpoch).

通过sendNotifications();将初始投票发送出去。发送流程大家可以参照我上面提供的那张图,去跟一下代码。

需要注意的点是sendNotifications()也会将投票数据发送给自身。参考QuorumCnxManager.toSend()方法。


//从recvqueue拉数据,前面介绍过recvqueue的数据是通过FastLeaderElection内部的WorkerReceiver线程offer的
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
if (n == null) {
  //检查queueSendMap所有的队列是否为空
  //不为空说明还有投票没发出去
  if (manager.haveDelivered()) {
    sendNotifications();
  } else {
    //再次尝试去发起连接
    manager.connectAll();
  }
  int tmpTimeOut = notTimeout * 2;
  notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
  LOG.info("Notification time out: {}", notTimeout);
}

recvqueue.poll()前面已经介绍过recvqueue的数据是通过FastLeaderElection内部的WorkerReceiver线程offer的。刚刚我们通过sendNotifications()将第一轮投票发送出去。这时候recvqueue应该就有3份Vote数据(z1 z2 z3 包括自身,上面解释过了)。

什么情况下n==null? 如果z2 z3没有启动,那么再第二轮poll的时候就会拉不到数据。

看上面代码注释,如果queueSendMap所有的队列为空,说明z2 z3没连接上,重新发起连接。

这段代码对主流程没影响,我们暂且不细究。


else if (validVoter(n.sid) && validVoter(n.leader)) {
  //校验下收到的网络投票是否来自配置文件中的server列表中的服务器
  switch (n.state) {
    case LOOKING:
      ...
        //收到的选举朝代(也可以理解为轮回)比本身的逻辑时钟要大,说明本身的投票轮回落后了
        if (n.electionEpoch > logicalclock.get()) {
          logicalclock.set(n.electionEpoch);
          recvset.clear();
          if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
            updateProposal(n.leader, n.zxid, n.peerEpoch);
          } else {
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
          }
          sendNotifications();
        } else if (n.electionEpoch < logicalclock.get()) {
          //收到的选举朝代(也可以理解为轮回)比本身的逻辑时钟要小,说明从网络上收到的投票消息过期了,自然就不用处理
          LOG.debug(
            "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
            Long.toHexString(n.electionEpoch),
            Long.toHexString(logicalclock.get()));
          break;
        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
          //totalOrderPredicate()是一个比较重要的方法:
          //通过与收到的网络投票相比较用于更新投票
          //如果返回true,(即进入到了这里)说明网络投票优先本身
          updateProposal(n.leader, n.zxid, n.peerEpoch);
          //重新把投票发送出去
          sendNotifications();
        }

validVoter(n.sid) && validVoter(n.leader)用于校验收到的网络投票是否来自配置文件中的server列表中的服务器。如果不是,就不用处理了。

由于z1 z2 z3 都刚刚启动,所有n.state肯定都是LOOKING状态。

if (n.electionEpoch > logicalclock.get()):收到的选举朝代(也可以理解为轮回)比本身的逻辑时钟要大,说明本身的投票轮回落后了。更新一下投票,然后再发送出去。(更新待会再说)

else if (n.electionEpoch < logicalclock.get()):收到的选举朝代(也可以理解为轮回)比本身的逻辑时钟要小,说明从网络上收到的投票消息过期了,自然就不用处理

else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch))来到这里,说明收到的选举朝代和当前的选举朝代(等同于逻辑时钟)是一致的,两者是同一轮投票。

好,现在要插播一下totalOrderPredicate()源码,它是一个比较重要的方法,定义了投票比较规则,我们在看文章、书籍时经常会被介绍到的那段规则:

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    //QuorumMaj.getWeight()总是返回1
    if (self.getQuorumVerifier().getWeight(newId) == 0) {
        return false;
    }

    /*
        * We return true if one of the following three cases hold:
        * 1- New epoch is higher
        * 2- New epoch is the same as current epoch, but new zxid is higher
        * 3- New epoch is the same as current epoch, new zxid is the same
        *  as current zxid, but server id is higher.
        */

    return ((newEpoch > curEpoch)
            || ((newEpoch == curEpoch)
                && ((newZxid > curZxid)
                    || ((newZxid == curZxid)
                        && (newId > curId)))));
}

比较顺序,总结一下:

  1. 优先比较epoch(朝代)
  2. 然后比较zxid(事务id)
  3. 最后比较serverId(即myid里的那个值)

totalOrderPredicate()返回true表明网络投票战胜了自身的票据。后续将把自身的票据替换为网络票据,然后重新发送出去。(待会再讲)。如果返回false,保留自身票据。(不用再发送出去了)


//将收到的网络投票记录下来
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

recvset将收到的网络投票记录下来。后续会用到。

voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

getVoteTracker()返回的是一个SyncedLearnerTracker对象。我们先看下SyncedLearnerTracker的结构图:

SyncedLearnerTracker有个内部类QuorumVerifierAcksetPair,内部类又有两个属性QuorumVerifier qvHashSet<Long> ackset

QuorumVerifier qv我们之前在解析配置文件的时候介绍过,有三个成员变量allMembers、votingMembers、observingMembers存储了各个节点的连接信息。另外还有一个half变量,默认是votingMembers数量的一半。

HashSet<Long> ackset 存储的是Long类型的数据,实际上最终存储的就是serverId(myid里的那个值)。

现在我们来看一下getVoteTracker()源码:

protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    
    LOG.info(">>>getVoteTracker: {}",votes.entrySet());
    for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())) {
            LOG.info(">>>entryKey: {}, currentVote: {}", entry.getKey(), vote);
            voteSet.addAck(entry.getKey());
        }
    }
    LOG.info(">>>getVoteTracker end<<<<<<");
    return voteSet;
}
//SyncedLearnerTracker.class
public boolean addAck(Long sid) {
  boolean change = false;
  for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
    if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {
      qvAckset.getAckset().add(sid);
      LOG.info(">>>qvAckSet: {}",qvAckset.getAckset());
      change = true;
    }
  }
  return change;
}

上面代码和SyncedLearnerTracker有关的是这几行:

  1. SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
  2. voteSet.addQuorumVerifier(self.getQuorumVerifier());
  3. voteSet.addAck(entry.getKey());

第一行不用解释。

第二行实际上是将self.getQuorumVerifier()赋值给了QuorumVerifierAcksetPair的成员变量qv

第三行解释起来比较麻烦。所以我加了几行log日志。先运行z1节点,然后运行z2节点。通过观察日志给大家讲解。

下面是运行日志:

#这是z1日志
//由于z1节点先启动,第一轮投票recvset只有[1=(1, 0, 0)]
>>>getVoteTracker: [1=(1, 0, 0)]
//当前票据是自身(1,0,0)
>>>entryKey: 1, currentVote: (1, 0, 0)
>>>qvAckSet: [1]
>>>getVoteTracker end<<<<<<

//这时候z2节点启动了
//z1收到了来自z2节点的票据:2=(2, 0, 0),所以recvset是[1=(1, 0, 0), 2=(2, 0, 0)]
//z1将当前票据更新成了(2, 0, 0) 【在方法updateProposal中】,
//同时将票据1->(2,0,0)发送出去(我z1第二轮投票给(2,0,0))
>>>getVoteTracker: [1=(1, 0, 0), 2=(2, 0, 0)]
>>>entryKey: 2, currentVote: (2, 0, 0)
>>>qvAckSet: [2]
>>>getVoteTracker end<<<<<<

//第三轮投票
//收到了自己发送的票据1->(2,0,0)
//所以recvset是[1=(2, 0, 0), 2=(2, 0, 0)] (recvset是个map,key相同覆盖了)
//本来不会进入updateProposal()方法,当前的票据还是(2, 0, 0)
>>>getVoteTracker: [1=(2, 0, 0), 2=(2, 0, 0)]
//拿当前的票据(2, 0, 0)与recvset一一比对,发现节点1,2的value与(2,0,0)都相等
//所以qvAckSet等于[1, 2]
>>>entryKey: 1, currentVote: (2, 0, 0)
>>>qvAckSet: [1]
>>>entryKey: 2, currentVote: (2, 0, 0)
>>>qvAckSet: [1, 2]
>>>getVoteTracker end<<<<<<
#这是节点z2的日志
//节点2刚启动,接收到自己发出的票据
>>>getVoteTracker: [2=(2, 0, 0)]
>>>entryKey: 2, currentVote: (2, 0, 0)
>>>qvAckSet: [2]
>>>getVoteTracker end<<<<<<

//节点2启动后,收到了节点1第一轮的投票,所以此时recvset=[1=(1, 0, 0), 2=(2, 0, 0)]
//此轮不会更新投票(节点2也不会发出投票)
>>>getVoteTracker: [1=(1, 0, 0), 2=(2, 0, 0)]
>>>entryKey: 2, currentVote: (2, 0, 0)
>>>qvAckSet: [2]
>>>getVoteTracker end<<<<<<

//收到节点1第二轮的投票1=(2, 0, 0)
//所以recvset=[1=(2, 0, 0), 2=(2, 0, 0)]
>>>getVoteTracker: [1=(2, 0, 0), 2=(2, 0, 0)]
>>>entryKey: 1, currentVote: (2, 0, 0)
>>>qvAckSet: [1]
>>>entryKey: 2, currentVote: (2, 0, 0)
>>>qvAckSet: [1, 2]
>>>getVoteTracker end<<<<<<

仔细分析一下日志执行结果,应该就能理解getVoteTracker()方法了。


voteSet = getVoteTracker(...)返回后,紧接着我们来看hasAllQuorums()这个方法,非常重要。是跳出外层while循环(即结束leading选举)的关键。

//判断vote是否已过半
if (voteSet.hasAllQuorums()) {
  ...
}
break;
public boolean hasAllQuorums() {
  for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
    //判断(qvAckset.getAckset().size() > half);
    if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())) {
      return false;
    }
  }
  return true;
}

hasAllQuorums()方法里最核心的一句是:

!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()),点进containsQuorum里看看,其实是判断qvAckset.getAckset()的大小是否超过集群数量的一半。

qvAckset.getAckset在上面的getVoteTracker()已经介绍过了。它存储的是那些对当前Vote()认同的serverId。所以只需简单的判断一下这个set集合的数量是否过半,即可以决定是否退出leading选举。


//判断vote是否已过半
if (voteSet.hasAllQuorums()) {
  //虽然已经完成过半的节点赞成某个sid成为leader,但是还是得尝试一下recvqueue有没有数据(可能由于网络原因z3节点的投票数据这时候才到达)
  while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
    if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
      recvqueue.put(n);
      break;
    }
  }
  //如果recvqueue真的没有任务投票数据了,就可以确认节点状态,退出leader选举了
  if (n == null) {
    //更新节点状态: leading | following | observing
    setPeerState(proposedLeader, voteSet);
    Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
    leaveInstance(endVote);
    //退出while循环,程序回到QuorumPeer的setCurrentVote(makeLEStrategy().lookForLeader());
    return endVote;
  }
}
break;

虽然voteSet.hasAllQuorums()已过半,但是进入后还是得尝试拉一下recvqueue有没有数据(可能由于网络原因z3节点的投票数据这时候才到达)。如果recvqueue真的没有任务投票数据了,就可以确认节点状态,退出leader选举了。

setPeerState(proposedLeader, voteSet);方法中更新当前节点的状态。然后就可以退出while循环了。

退出while循环,程序会回到QuorumPeer的setCurrentVote(makeLEStrategy().lookForLeader());继续执行。

在QuorumPeer的run()方法中进行下一轮while,根据state状态进行相应的初始化。

至此,lead选举的主流程介绍完毕!

欢迎访问个人博客: https://tangwx.site/archives/zookeeperleader%E9%80%89%E4%B8%BE%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90md

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Zookeeper leader选举源码分析(超详细) 的相关文章

  • 链路追踪Sleuth入门

    前言 在一个大型的分布式项目中存在各种各样的模块调用 每个模块负责不同的功能 组合成系统 在这种架构下的系统 一次请求往往会调用到许许多多的微服务 这样的跨度对于维护也是存在一定的问题 1 如何快速发现问题 2 如何判断故障影响范围 3 如
  • ZooKeeper(一):基础介绍

    文章目录 什么是 ZooKeeper ZooKeeper 发展历史 ZooKeeper 应用场景 ZooKeeper 服务的使用 ZooKeeper 数据模型 data tree 接口 znode 分类 总结 什么是 ZooKeeper Z
  • Zookeeper报错Will not attempt to authenticate using SASL解决办法

    Will not attempt to authenticate using SASL unknown error 经过查资料 这个问题与zookeeper里面的zoo cfg配置有关 在程序填写的zookeper的路径 一定与zoo cf
  • zookeeper入门到精通03——zookeeper集群搭建

    zookeeper集群搭建 3 1 多虚拟机环境搭建 3 2 zookeeper集群搭建 3 1 多虚拟机环境搭建 我们需要搭建zookeeper集群 而由于zookeeper的的服务器数量需要设置为单数 前文介绍了原因 一个zookeep
  • ZooKeeper 分布式协调工具

    目录 一 ZooKeeper 概述 二 ZooKeeper Windows 单机版安装 三 ZooKeeper 集群环境下选举过程 四 ZooKeeper 存储数据的过程 五 ZooKeeper 监听 六 java 操作 ZooKeeper
  • 微服务全栈:深入核心组件与开发技巧

    文章目录 1 服务注册与发现 1 1 客户端注册 ZooKeeper 1 2 第三方注册 独立的服务Registrar 1 3 客户端发现 1 4 服务端发现 1 5 Consul 1 6 Eureka 1 7 SmartStack 1 8
  • Kafka常见的导致重复消费原因和解决方案

    点击上方蓝色字体 选择 设为星标 回复 资源 获取更多资源 大数据技术与架构 点击右侧关注 大数据开发领域最强公众号 暴走大数据 点击右侧关注 暴走大数据 问题分析 导致kafka的重复消费问题原因在于 已经消费了数据 但是offset没来
  • netty 无阻塞队列 MpscArrayQueue,一个字就是快

    netty提供了高效的线程安全的队列 MpscArrayQueue 一个字快 至于快的原因可以去查看相关的文章 内存的伪共享先关的内容 import static java lang Thread sleep import io netty
  • Eureka与Zookeeper的区别

    著名的CAP 理论指出 一个分布式系统不可能同时满足 C 一致性 A 可用性 和 P 分区容错性 由于分区容错性在是分布式系统中必须要保证的 因此我们只能在 A 和 C 之间进行权衡 在此 Zookeeper 保证的是 CP 而 Eurek
  • etcd 集群搭建及常用场景分析

    概述 etcd 是一个分布式一致性k v存储系统 可用于服务注册发现与共享配置 具有以下优点 简单 相比于晦涩难懂的paxos算法 etcd基于相对简单且易实现的raft算法实现一致性 并通过gRPC提供接口调用 安全 支持TLS通信 并可
  • 大数据开发教程——ZooKeeper分布式协调组件

    ZooKeeper是什么 ZooKeeper是一个分布式的 开放源码的分布式应用程序协调服务 是Google的Chubby一个开源的实现 是Hadoop和Hbase Flink的重要组件 中文名 动物管理员 它是一个为分布式应用提供一致性服
  • 使用Xshell7控制多台服务同时安装ZK最新版集群服务

    一 环境准备 主机名称 主机IP 节点 集群内通讯端口 选举leader clinet端提供服务 端口 docker0 192 168 1 100 node 0 2888 3888 2181 docker1 192 168 1 101 no
  • zookeeper学习网址

    1 分布式服务框架 ZooKeeper 管理分布式环境中的数据 http www ibm com developerworks cn opensource os cn ZooKeeper 2 ZooKeeper系列讲座 很全面 http w
  • ZooKeeper面试题(2020最新版,狂神说docker进阶笔记

    这里 process 主要就是通过 ServerCnxn 对应的 TCP 连接发送 Watcher 事件通知 9 客户端回调 Watcher 客户端 SendThread 线程接收事件通知 交由 EventThread 线程回调 Watch
  • zookeeper3.4.6集群部署

    在安装Zookeeper之前 首先需要确保的就是主机名称 可选 hosts都已经更改 并且JDK成功安装 1 安装Zookeeper 使用命令 tar zxvf 命令将gz压缩文件解压 笔者Zookeeper的安装目录为 home Hado
  • Spring-boot+Dubbo(直连模式)

    Spring boot Dubbo 直连模式 Demo 这里应该有很多人会问 直连模式 什么鬼啊 一般情况下我们进行微服务开发时 都是通过zookeeper等注册中心来实现服务的提供和引用的 那直连模式没啥用啊 其实不然 直连模式大有用处
  • Centos7下安装Zookeeper

    一 配置java环境 1 安装JDK yum install y java 1 8 0 openjdk 2 查看版本 root zookeeper java version openjdk version 1 8 0 362 OpenJDK
  • 【HBZ分享】数仓里面的概念-宽表-维度表-事实表概念讲解

    数仓概念 1 度量值 可被统计的 比如 次数 销量 营销额 订单表中的下单金额等可以统计的值叫度量值 2 维度表 1 对事实描述的信息 每一张表都对应现实世界中的一个对象或概念 比如 用户 商品 日期 地区维度 2 比如要分析商品的销售情况
  • 如何实现一个分布锁?

    基本概念 为何需要分布式锁 传统环境中的情况 在程序开发过程中不得不考虑的就是并发问题 在java中对于同一个jvm而言 jdk已经提供了lock和同步等 但是在分布式情况下 往往存在多个进程对一些资源产生竞争关系 而这些进程往往在不同的机
  • 从zookeeper官方文档系统学习zookeeper

    从zookeeper官方文档系统学习zookeeper 1 zookeeper 2 zookeeper 文档 3 zookeeper 单机版 3 1 配置 3 2 启动 3 3 验证 4 zookeeper 集群版 4 1 配置 4 2 启

随机推荐

  • 电脑连接无线路由上不了网 连接手机热点却可以上网

    mercury wireless n adapter 外置无线网卡 1 在网络设置中删除连接过的无线网络 2 设备管理器 网卡右键 属性 高级 wifi config wifi xff08 代替performance xff09 3 ban
  • Godot Engine:GDScript 4.X中语法的变化(2020年8月4日 更新)

    文章目录 4 X版 GDScript范例支持注解属性 xff08 Properties xff09 的定义格式await关键字代替yield加入super关键字去除了多级调用问题小结 4 X版 GDScript范例 支持注解 从4 x开始
  • mysql group by 取出分组结果中每一组的某个字段值不同的数据 分组

    数据 xff1a 目的 xff1a 基于以上查询结果得到每一组指标 xff08 KPI CODE xff09 中不同的KPI INSTANCE的值的数据 预期是取出上图中的第一条数据和第二或第三条数据中的任何一条 尝试 xff1a 刚开始直
  • Codeblocks 常用快捷键整理

    Ctrl 43 Shift 43 C 注释选中文本 Ctrl 43 Shift 43 X 解除选中文本的注释 Shift 43 Tab 回退一个制表符 Ctrl 43 PageUp xff08 Ctrl 43 PageDown xff09
  • 将PNG序列帧图片合成视频

    import globimport cv2def skadi idle img array 61 print 34 开始读取图片 34 修改此处为自己的路径for filename in glob glob r 34 F interact
  • 在树莓派3b/3b+上搭建rt-thread多核开发环境

    1 本文目的 nbsp nbsp nbsp 本 主要介绍如何快速 效的搭建树莓派rt thread的开发环境 按照 章的步骤 可以很快速的将rt thread SMP系统在树莓派上运 起来 2 准备条件 nbsp nbsp 1 raspbe
  • 百度在线人脸识别API简单实现教程

    这里 xff0c 记录一下百度人脸识别在线API的调用 xff0c 语言是python2 7 xff0c 供大家一起学习参考 本教程目录如下 1 申请百度人脸识别应用 2 获取token 3 图片的base64编码 4 人脸识别 5 结果绘
  • java中获取农历日期以及星期几

    34 status 34 200 34 msg 34 34 操作成功 xff01 34 34 success 34 true 34 data 34 34 weekend 34 34 星期二 34 34 date 34 34 2021年12月
  • syntax error: bad substitution是什么问题

    终端出现syntax error bad substitution是什么问题 解决办法 xff1a sudo dpkg reconfigure dash 在选择项中选N0 从 ubuntu 6 10 开始 xff0c ubuntu 就将先前
  • 如何获取自己的公网地址

    我们在上网的时候 xff0c 可以获取自己的公网地址 xff0c 比如在百度上输入ip地址 xff0c 就能显示自己的公网ip 下面讲解下 xff0c 从内网里面上网 xff0c 获取本机公网ip的原理 xff0c 如下所示 xff1a 本
  • 拯救被WSL占满的C盘

    适用场景 1 使用Windows 10做为开发平台 xff0c 并且C盘空间不是特别富裕 2 使用Windows自带的Windows Subsystem for Linux xff08 WSL xff09 默认将Liunx子系统安装在了C盘
  • 条件变量

    1 初始化条件变量pthread cond init include int pthread cond init pthread cond t cv const pthread condattr t cattr 返回值 xff1a 函数成功
  • python练习3:输入分数,输出对应的ABCD级别

    span class token keyword while span span class token boolean True span span class token punctuation span span class toke
  • ros Unable to locate package 找不到ROS软件包的问题解决

    这个问题可能由两个原因导致 xff1a 1 输入的软件包和ros版本不匹配 例如 xff0c 如果我是20 04的系统 xff0c ros版本为noetic xff0c 那么输入这行给1804 melodic准备的安装指令就会报错 xff1
  • python项目打包发布总结

    概览 这里主要收集python项目的打包 发布和部署的常用方法 xff0c 只是入门级别 xff0c 深入的流程还是以官方文档为准 xff08 链接每节都已经给出 xff09 distutils setuptools pip virtual
  • Mac Big Sur --ERROR launching JD-GUI

    适用于最新的macOS Monterey 更新系统后 xff0c 打开jd gui报如下错误 xff1a ERROR launching 39 JD GUI 39 No suitable Java version found on your
  • Centos7--FFmpeg编译安装

    一开始使用中文搜索 xff0c 照着csdn这份教程搞了一上午 xff0c 最后失败了 xff0c 恼火呀 换成英文搜索ffmpeg build arguments 点击第一条CompilationGuide FFmpeg xff0c 官方
  • 一种快速检测Mp4是否损坏的方法

    一种快速检测Mp4是否损坏的方法 由于项目上的需求 xff0c 需要一种能快速检测MP4文件是否有效 xff08 即能正常播放 xff09 的方法 网络上搜索到的绝大多数方案是使用ffmpeg或者ffprobe 以ffprobe为例 xff
  • java String类型转换BigDecimal类型最全工具类

    public class BigDecimalUtil String类型转换BigDecimal类型 public static BigDecimal StringToBigDecimal String str return new Big
  • Zookeeper leader选举源码分析(超详细)

    选举介绍 在开始分析选举的原理之前 xff0c 先了解几个重要的参数 服务器 ID xff08 myid xff09 比如有三台服务器 xff0c 编号分别是 1 2 3 编号越大在选择算法中的权重越大 zxid 事务 id xff08 Z