TiKV源码分析(一)RaftKV层

2023-11-09

本章序:关于RaftStore层

在TiKV的框架中,可以按照从客户端发起请求到实际落盘大致分为如下几层:
1.Service层
2.Storage层(Percolator)
3.RaftKV层(Raft)
4.RocksDB层
这一章主要分析的是第三层RaftKV层,这一层完成了Raft多副本一致性的操作以及数据的落盘操作。这一层由两个状态机组成,PeerFsm状态机需要落盘日志,ApplyFsm状态机需要落盘从日志中解析出实际数据。在Raft中会将Storage层传来的请求RaftCmdRequest封装为日志形式,所以Storage层在做完了对客户端的请求进行事务模型的处理后发送请求至RaftStore层。
RaftStore层对外的接口是send_command,通过ServerRaftStoreRouter与RaftStore层进行交流,当Storage层需要走Raft流程(LocalRead可以直接从Storage层的缓存读)时就通过RaftStoreRouter接口的send_command函数。Storage层会调用async_write、async_snapshot,其中async_write会调用send_command函数将请求给到RaftStore层。而在read_index如果无法使用LocalRead则一样会调用send_command向RaftStore发送读请求,通过Callback来得到响应结果。

pub trait RaftStoreRouter<EK>:
    StoreRouter<EK> + ProposalRouter<EK::Snapshot> + CasualRouter<EK> + Send + Clone

fn send_command_impl<EK, PR>(
    router: &PR,
    req: RaftCmdRequest,
    cb: Callback<EK::Snapshot>,
    extra_opts: RaftCmdExtraOpts,
) -> RaftStoreResult<()>
where
    EK: KvEngine,
    PR: ProposalRouter<EK::Snapshot>,
{
    let region_id = req.get_header().get_region_id();
    let mut cmd = RaftCommand::new(req, cb);
    cmd.extra_opts = extra_opts;
    router
        .send(cmd)
        .map_err(|e| handle_send_error(region_id, e))
}

之后就会在RaftStore中的handle_msgs的poll线程对RaftCommand类型的数据进行处理。后续会分析其他层的相关信息:

    pub fn handle_msgs(&mut self, msgs: &mut Vec<PeerMsg<EK>>) {
        for m in msgs.drain(..) {
            match m {
                PeerMsg::RaftMessage(msg) => {
                ...
                }
                PeerMsg::RaftCommand(cmd) => {
				...
                }

接下来进入到第三层内部去探究这一层的大致实现:

从RaftBatchSystem开始

batch.rs中看状态机的调度方法:所有Fsm之间的关联使用的是状态机的驱动,Poll在等待handle_raft_ready处理完之后会通过一个状态机的驱动结构reschedule_fsms(fsm_cnt,ReschedulePolicy)(Reschedule来记录一个状态及接下来的调度策略是移除、释放还是重新调度,重新调度会传入当前fsm在数组中的下标,重新将该fsm加入到调度中)。
用栈的弹入弹出操作来模拟一个驱动流程:
1.每次循环进入时会调用fetch_fsm方法,尝试去从fsm_receiver中获取当前传入的fsm,加入到batch中
2.使用该fsm的handle_normal/handle_control函数对该fsm进行处理
3.处理完后弹出上一个batch中的normal驱动机,出栈再入栈或者直接入栈加入新的下一个驱动机进行ReschedulePolicy::Schedule
4.Schedule所做的就是调用batch.reschedule(&self.router,r)对新加入的fsm进行schedule->schedule中使用send方法发送至下一个fsm的路径位置。
store.rs和apply.rs中实现了batch.rs中定义的trait方法,fsm加入后就可以调度store.rs和apply.rs中的具现化方法来执行每个fsm对应的操作。

store.rs中的BatchSystem入手,其中有一个HandlerBuilder,里面含有一个Poller,使用PollHandler实现,PollHandler由Apply.rs和Peer.rs一人提供一个,可以通过Batch中的Poller获取ApplyFsm和PeerFsm的信息从而poll相应的Fsm进行下一步操作。

BatchSystem(HandlerBuilder)->spawn->start_poller->poller.poll()->(PollHandler).handle_control/handle_normal

HandlerBuilder结构体中的handler为PollHandler,HandlerBuilder的build()就是为了得到其中的PollHandler,根据PollHandler又可以构建出Poller结构体,从而可以调用Poller中的Poll方法,PollHandler中存在apply.rs(ApplyPoller)和store.rs(RaftPoller)中,提供了Handle_Control/Handle_Normal主要函数,会在下面分析。在Poll中可以不断的去调用handle函数处理状态机中的各种事件。
PollHandler中提供了handle_normal和handle_control,方便BatchSystem调用对应的FSM。

状态机做了什么

mailbox注册与tx/rx通道设置

Components/RaftStore/Store文件夹下的代码,从整个系统的启动开始,本节主要考虑两个状态机的情况,一个负责Raft管理及落盘日志,一个负责解析日志并落盘日志中的实际KV数据。
(RaftBatchSystem)start_system->RaftBatchSystem/ApplyBatchSystem,以及通过RaftRouter/ApplyRouter分别给RaftBatchSystem/ApplyBatchSystem发送初始控制消息。如:

//在BatchSystem中会调用apply_system.schedule_all对applyfsm进行mailbox
//的注册,并且使用register_all注册至ApplyFsm状态机上,这样后续通过Router发送
//消息时就可以从Fsm的map中找到对应的addr。
impl<EK: KvEngine> ApplyBatchSystem<EK> {
    pub fn schedule_all<'a, ER: RaftEngine>(&self, peers: impl Iterator<Item = &'a Peer<EK, ER>>) {
        let mut mailboxes = Vec::with_capacity(peers.size_hint().0);
        for peer in peers {
            let (tx, fsm) = ApplyFsm::from_peer(peer);
            mailboxes.push((
                peer.region().get_id(),
                BasicMailbox::new(tx, fsm, self.router().state_cnt().clone()),
            ));
        }
        self.router().register_all(mailboxes);
    }
}
//router.rs中
    pub fn register_all(&self, mailboxes: Vec<(u64, BasicMailbox<N>)>) {
        let mut normals = self.normals.lock().unwrap();
        normals.map.reserve(mailboxes.len());
        for (addr, mailbox) in mailboxes {
            if let Some(m) = normals.map.insert(addr, mailbox) {
                m.close();
            }
        }
        normals
            .alive_cnt
            .store(normals.map.len(), Ordering::Relaxed);
    }

设置好mailbox信息后,Peer就可以与Apply之间很好的进行交流了。

消息收发与处理

主要考虑如下消息传递:

  • ApplyTask::Apply peerfsm发给applyfsm
  • PeerMsg::ApplyRes applyfsm返回apply的结果给peerfsm
  • ApplyTaskRes::Apply peerfsm接收applyfsm的返回结果
    每个状态机存在收发消息的线程,主要是Router方法中提供的,负责每个状态下需要与其他Peer或Apply节点通信的消息传输,通过mpsc::channel中提供的tx/rx发送消息至每个状态机的mailbox中。
    每个状态机还存在一个poll线程运行自己收信箱(mailbox)中的消息命令,这个poll线程中主要运行的方法就是handle_normal/handle_control,Peer和Apply再从这两个方法实际展开处理的操作。

1.PeerFsm处理日志消息的raft状态变更,Client的消息首先发送给RaftLeader,Leader会对请求编码成日志,Leader落盘日志的同时会将日志发送给其他Raft的Follower使其可以多副本落盘。Follwer落盘成功后会给Leader发送响应,当Leader收到大多数Follower的响应后Raft过程就完成了,可以通过schedule_task方法向ApplyFsm发送消息:

//Peer端发送消息至Apply,这里发送的数据会添加到ApplyFsm的mailbox里,
//之后在ApplyFsm的poll线程运行handle_normal时当信箱里数据不为空,则会在后续Apply的
//Handle_Normal->Handle_Tasks中进行实际的处理。
ctx.apply_router
   .schedule_task(self.region_id, ApplyTask::apply(apply));

Apply就可以进行实际的kv数据落盘了。ApplyFsm可以去Peer中查得日志使其进行Apply。
2.ApplyFsm根据Peer中发送给ApplyFsm的Apply消息(Apply消息中携带有在Peer中成功落盘的日志apply.entries.take_entries()):

//Apply中当邮箱中消息不为空则在Poll线程中对消息进行处理
//handle_tasks是handle_normal中运行到的方法,该方法用于处理mailbox中接受到的每一条消息。
    fn handle_tasks<W: WriteBatch<EK>>(
        &mut self,
        apply_ctx: &mut ApplyContext<EK, W>,
        msgs: &mut Vec<Msg<EK>>,
    ) {
        let mut drainer = msgs.drain(..);
        loop {
            match drainer.next() {
        Some(Msg::Apply { start, apply }) => {
...
        self.handle_apply(apply_ctx, apply);
}

    /// Handles apply tasks, and uses the apply delegate to handle the committed entries.
    fn handle_apply<W: WriteBatch<EK>>(
        &mut self,
        apply_ctx: &mut ApplyContext<EK, W>,
        mut apply: Apply<EK::Snapshot>,
    ) {
...
        let (mut entries, dangle_size) = apply.entries.take_entries();
...
        self.append_proposal(apply.cbs.drain(..));
        self.delegate
            .handle_raft_committed_entries(apply_ctx, entries.drain(..));
...
    }

从日志中解析出实际的kv数据并进行落盘。

3.Apply状态机将数据落盘之后会发送返回结果给Peer状态机,之后Peer状态机可以做些后续处理工作如通知客户端、更新Raft信息及快照信息等。同时在PeerFsm发送给ApplyFsm的Apply请求中带有Proposal的信息,Proposal中有封装callback函数,该函数可以自定义当相关数据在ApplyFsm罗盘以后可以额外再做一些处理操作,比如额外再返回结果中多打印些信息、或者自动唤醒下一步请求操作,方便我们做一些hook操作进行回调处理。
Apply在运行中会在Notify中返回ApplyResult。

//返回的结果先放在results数组中,
    /// Handles all the committed_entries, namely, applies the committed entries.
    fn handle_raft_committed_entries<W: WriteBatch<EK>>(
        &mut self,
        apply_ctx: &mut ApplyContext<EK, W>,
        mut committed_entries_drainer: Drain<Entry>,
    ) {
        if committed_entries_drainer.len() == 0 {
            return;
        }
        apply_ctx.prepare_for(self);
...
            let res = match entry.get_entry_type() {
                EntryType::EntryNormal => self.handle_raft_entry_normal(apply_ctx, &entry),
                EntryType::EntryConfChange | EntryType::EntryConfChangeV2 => {
                    self.handle_raft_entry_conf_change(apply_ctx, &entry)
                }
            };
...
            match res {
                ApplyResult::None => {}
                ApplyResult::Res(res) => results.push_back(res),
                ApplyResult::Yield | ApplyResult::WaitMergeSource(_) => {
                    // Both cancel and merge will yield current processing.
                    apply_ctx.committed_count -= committed_entries_drainer.len() + 1;
                    let mut pending_entries =
                        Vec::with_capacity(committed_entries_drainer.len() + 1);
                    // Note that current entry is skipped when yield.
                    pending_entries.push(entry);
                    pending_entries.extend(committed_entries_drainer);
                    apply_ctx.finish_for(self, results);
                    self.yield_state = Some(YieldState {
                        pending_entries,
                        pending_msgs: Vec::default(),
                        heap_size: None,
                    });
        }
        apply_ctx.finish_for(self, results);
    }
//之后在finish_for中将结果放入apply_res数组中
    pub fn finish_for(
        &mut self,
        delegate: &mut ApplyDelegate<EK>,
        results: VecDeque<ExecResult<EK::Snapshot>>,
    ) {
...
        self.commit_opt(delegate, false);
        self.apply_res.push(ApplyRes {
            region_id: delegate.region_id(),
            apply_state: delegate.apply_state.clone(),
            exec_res: results,
            metrics: delegate.metrics.clone(),
            applied_index_term: delegate.applied_index_term,
        });
    }
//最终在写库完成后或者关闭或者在Poll的每一批结束时调用end()方法时刻,当Apply_Res不为空也就是存在返回结果时会运行flush方法,此时使用Notify方法将得到的结果返回给到Peer,Notifier是一个Sender
    /// Flush all pending writes to engines.
    /// If it returns true, all pending writes are persisted in engines.
    pub fn flush(&mut self) -> bool {
...
        let is_synced = self.write_to_db();

        if !self.apply_res.is_empty() {
            let apply_res = mem::take(&mut self.apply_res);
            self.notifier.notify(apply_res);
        }
...
    }
}
//对于Notify方法我们可以自定义实现方式,可以设置如下,通过ApplyFsm的tx通道将结果发送给当初注册进来的双向通道中,当初注册的时候会记录Peer的addr信息。此时发送者为ApplyFsm,接收者为PeerFsm,消息内容为PeerMsg::ApplyRes,返回的是Apply操作中产生的Apply_Res:
        fn notify(&self, apply_res: Vec<ApplyRes<EK::Snapshot>>) {
            for r in apply_res {
                let res = TaskRes::Apply(r);
                let _ = self.tx.send(PeerMsg::ApplyRes { res });
            }
        }

4.Apply将结果发送回Peer后,Peer根据消息类型做一些收尾处理,如发送结果给到客户端,更新index、term信息等。

//在Peer(Store中封装的Peer部分)的poll线程会不断处理发送给Peer的各种消息,在handle_msgs中可以看到对Apply发回的ApplyRes消息会调用on_apply_res方法进行处理。
   fn handle_normal(&mut self, peer: &mut PeerFsm<EK, ER>) -> Option<usize> {
...
        while self.peer_msg_buf.len() < self.messages_per_tick {
            match peer.receiver.try_recv() {
                // TODO: we may need a way to optimize the message copy.
                Ok(msg) => {
                    self.peer_msg_buf.push(msg);
                }
...
            }
        }
        let mut delegate = PeerFsmDelegate::new(peer, &mut self.poll_ctx);
        delegate.handle_msgs(&mut self.peer_msg_buf);
        //collect_ready函数会收集propose函数返回的已经完成raft中提议流程的ready结果,当这个结果不为空时会调用handle_raft_ready_append函数,此时完成Raft阶段的日志落盘。所以这里的流程是接收来自其他节点请求的同时不断去搜集当前已经完成Raft的日志并对其进行落盘。关于handle_raft_ready_append涉及到的操作后续在Peer阶段会着重分析。
        delegate.collect_ready();
...
    }
//对ApplyRes消息的处理:
    pub fn handle_msgs(&mut self, msgs: &mut Vec<PeerMsg<EK>>) {
        for m in msgs.drain(..) {
            match m {
...            
                PeerMsg::ApplyRes { res } => {
                    self.on_apply_res(res);
                }
...

这里面主要讲了Peer与Apply交互的主要函数,省略了对Raft应用以及Apply落盘的一些细节,接下来主要来研究内部细节:
日志落盘的键值对形式与KV数据落盘的键值对形式不同,以及Percolator KV与Raw KV的记录形式也不同,对于Percolator事务类型来说键值对里会包含时间戳TSO,用来记录prewrite与commit的情况。这部分后续在事务处理中详细描述。

接下来从peer的handle_normal和apply的handle_normal开始分析一些细节处理:

Peer中的具体操作

handle_normal(store.rs中)=>(PeerFsmDelegate)handle_msgs->处理各种PeerMsg,对于RaftCommand构建self.fsm.batch_req_builder后(add,build)调用propose_raft_command->self.fsm.peer.propose(raft_group RawNode<PeerStorage<EK,ER>>)->propose_normal->raft_group propose

propose是从fsm.batch_req_builder.build(&mut self.ctx_raft_metrics)中取得cmd,再通过self.propose_raft_command(cmd.request,cmd.callback,DiskFullOpt::NotAllowedOnFull)会设置好proposal封装进apply中。在peer中的发送proposal至apply进行落盘主要是peer调用apply_router.schedule_task(region_id,msg)函数,

 let mut apply = Apply::new(
                self.peer_id(),
                self.region_id,
                self.term(),
                committed_entries,
                cbs,//Vec<Proposal<S>>,
            );
            apply.on_schedule(&ctx.raft_metrics);
...
            ctx.apply_router
                .schedule_task(self.region_id, ApplyTask::apply(apply));

propose之后在peer中会有raftstore状态转换得到ready(大多节点都认可了该消息,已得到保障可以进行落盘),ready在collect_ready中进行处理。
ready结构是一个self.raft_group.ready(),在ready中是会存在参数peer_id的(还有snapshot/hs/entries/msgs等) peer
会记录在ctx.store_meta.readers中,在handle_raft_ready_append中产生了一个ready并加入ready_res数组,之后调用post_raft_ready_append->send->send_raft_message,将msgs传递到对应的apply。

handle_normal(store.rs中)=>collect_ready->
1.对于raft消息是通过on_raft_message得到的,peer在收到raft消息之后会调用Raft::step,最终成功后走committed_entries去到applyfsm。ready是handle_raft_ready_appendraft_group ready得来的,会反应在Committed_entries中->handle_raft_committed_entries->ctx.apply_router.schedule_task(self.region_id,ApplyTask::apply(apply))向applyfsm发送apply请求(apply中记录了committed_entries)。
所以说raft与apply之间的关联就在于ready与committed_entries。
2.self.fsm.peer.handle_raft_ready_append(self.ctx)得到ready后self.ctx.ready_res.push将ready结果push进去
3.调用store里的handle_raft_ready对ready_res中的结果进行写入,kv的写入kvEngine,raft的写入raftEngine调用post_raft_ready_append,做一些对ready的善后操作(在handle_raft_ready_append中会处理PollContext会调用handle_raft_ready,用于更新各种数据至kv/raft中后将InvokeContext结果返回给handle_raft_ready_append),然后调用当前fsm handler中的end()方法对当前fsm进行收尾。
对raft数据做完ready处理后就以cpmmitted_entries的形式进入到applyfsm部分进行apply操作。
5.在PollContext中有router(RaftRouter)/apply_router(ApplyRouter),对于apply_router主要调用的是schedule_task方法。

Apply中的具体操作

Apply的Builder中有RaftPollerBuilder和ApplyRouter,RaftPollerBuilder中有engines和store。
Apply的HandlerBuilder就是基于Builder,里面封装了ApplyPoller->ApplyContext->ApplyDelegate->WriteBatch的关于写数据库put/delete方法以及处理一些命令的代码。

handle_normal->handle_tasks->handle_apply/resume_pending->
1.append_proposal->append_normal(将命令push back至pending_cmds)
2.handle_raft_committed_entries->
2.1handle_raft_entry_normal->commit->commit_opt->write_to_db(这里通过ApplyContext提供的Commit方法帮助同步与当前节点与Leader节点发来的Committed_entries相差的日志,主要使用RocksDB里的Ingest SST,当日志同步了以后就可以通过process_raft_cmd开始处理此次的落盘操作,见2.2)
2.2handle_raft_entry_normal->process_raft_cmd->
2.2.1apply_raft_cmd->exec_raft_cmd->exec_write_cmd/exec_admin_cmd->WriteBatch的handle_put/handle_delete->wb.put()。(这里接2.3的的callback)
2.2.2find_pending(将pending_cmds里的命令取出)得到命令对应的callback
2.2.3将找出的callback push至apply_ctx.applied_batch,等到apply_raft_cmd中返回写入成功后的执行结果
3.handle_raft_committed_entries最终得到的结果表明是否成功落盘实际kv数据,使用这个结果对apply_ctx做一些finish_for收尾操作,将得到的results放入apply_ctx的apply_res数组。返回到最开始的handle_normal也就结束了。

当schedule实际完成处理以后,需要返回结果的就返回一个peermsg,不需要的就将不返回,所以apply中的handle_normal主要是对apply_ctx进行处理,当最终在exec_raft_cmd中对命令实际完成后就一路向上返回最终的处理结果。
如下显示一个测试用例,可以看到大致的使用方法:

    fn batch_messages<E>(router: &ApplyRouter<E>, region_id: u64, msgs: Vec<Msg<E>>)
    where
        E: KvEngine,
    {
        let (notify1, wait1) = mpsc::channel();
        let (notify2, wait2) = mpsc::channel();
        router.schedule_task(
            region_id,
            Msg::Validate(
                region_id,
                Box::new(move |_| {
                    notify1.send(()).unwrap();
                    wait2.recv().unwrap();
                }),
            ),
        );
        wait1.recv().unwrap();

        for msg in msgs {
            router.schedule_task(region_id, msg);
        }

        notify2.send(()).unwrap();
    }

...
//此处可以看到proposal中带上了callback,对于得到返回结果会有怎样的处理,在这里当得到write结果时,需要想resp_tx发送结果
        let (resp_tx, resp_rx) = mpsc::channel();
        let p = proposal(
            false,
            1,
            0,
            Callback::write(Box::new(move |resp: WriteResponse| {
                resp_tx.send(resp.response).unwrap();
            })),
        );
        router.schedule_task(
            1,
            Msg::apply(apply(1, 1, 0, vec![new_entry(0, 1, true)], vec![p])),//此处写resp_rx
        );
        // unregistered region should be ignored and notify failed.
        let resp = resp_rx.recv_timeout(Duration::from_secs(3)).unwrap();
...
//此处可以看到一个主要的操作是将peer需要的信息通过schedule_task下发给到applyfsm,之后peer可以从rx中获取返回applyres(snapshot)即可。
        let (snap_tx, _) = mpsc::sync_channel(0);
        batch_messages(
            &router,
            2,
            vec![
                Msg::apply(apply(1, 2, 11, vec![new_entry(5, 5, false)], vec![])),
                Msg::Snapshot(GenSnapTask::new_for_test(2, snap_tx)),
            ],
        );
        let apply_res = match rx.recv_timeout(Duration::from_secs(3)) {
            Ok(PeerMsg::ApplyRes {
                res: TaskRes::Apply(res),
                ..
            }) => res,
            e => panic!("unexpected apply result: {:?}", e),
        };

一些琐碎信息

关于RaftRouter和Transport的关系:Transport是发给Store的,RaftRouter是发给Region的,参数为Router.force_send(region_id,msg),
Peer中用到了router中的地方:send_raft_commad/handle_raft_commited_entries/handle_raft_ready_append/activate

一个Region可以通过范围拆分分布在多个Store上,也就是所谓的MultiRaft。
使用Transport时可以直接使用Transport内RaftClient中的send方法
ctx.trans.send(SendMsg)/
RaftMessage::default/
handle_raft_ready_append: eraftpb::Message/
post_raft_ready_append: ready.take_messages() (raft_group.ready() raft_group是RawNode::New)
handle_raft_ready_advance: light_rd.take_messages()(raft_group.advance_append(ready))。
send_extra_message/send_raft_message/perpare_raft_message() (Raft_Message::Default)

一些消息参数:
PeerMsg::RaftCommand/
PeerMsg::RaftMessage/
PeerMsg::ApplyRes/
PeerMsg::Start/
PeerMsg::CasualMsg/
PeerMsg::replicate/
ControlMsg::LatencyInspect/
StoreMsg::RaftMessage/
StoreMsg::Tick/
StoreMsg::Start/

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

TiKV源码分析(一)RaftKV层 的相关文章

  • 机器学习中最基本的概念之一:数据集、样本、特征和标签

    本文重点 数据集 样本 特征和标签是机器学习中的重要概念 这些概念在机器学习算法的设计和实现过程中起着至关重要的作用 在本文中 我们将对这些概念进行详细的讲解 以便更好地理解机器学习算法的基本原理和应用 一 数据集 数据集是机器学习中最基本

随机推荐