split流程
处理协程启动
func (bs *Raftstore) startWorkers(peers []*peer) {
ctx := bs.ctx
workers := bs.workers
router := bs.router
bs.wg.Add(2) // raftWorker, storeWorker
rw := newRaftWorker(ctx, router)
go rw.run(bs.closeCh, bs.wg)
sw := newStoreWorker(ctx, bs.storeState)
go sw.run(bs.closeCh, bs.wg)
router.sendStore(message.Msg{Type: message.MsgTypeStoreStart, Data: ctx.store})
for i := 0; i < len(peers); i++ {
regionID := peers[i].regionId
_ = router.send(regionID, message.Msg{RegionID: regionID, Type: message.MsgTypeStart})
}
engines := ctx.engine
cfg := ctx.cfg
workers.splitCheckWorker.Start(runner.NewSplitCheckHandler(engines.Kv, NewRaftstoreRouter(router), cfg))
workers.regionWorker.Start(runner.NewRegionTaskHandler(engines, ctx.snapMgr))
workers.raftLogGCWorker.Start(runner.NewRaftLogGCTaskHandler())
workers.schedulerWorker.Start(runner.NewSchedulerTaskHandler(ctx.store.Id, ctx.schedulerClient, NewRaftstoreRouter(router)))
go bs.tickDriver.run()
}
point1:
func (w *Worker) Start(handler TaskHandler) {
w.wg.Add(1)
go func() {
defer w.wg.Done()
if s, ok := handler.(Starter); ok {
s.Start()
}
for {
Task := <-w.receiver
if _, ok := Task.(TaskStop); ok {
return
}
handler.Handle(Task)
}
}()
}
func (r *splitCheckHandler) Handle(t worker.Task) {
spCheckTask, ok := t.(*SplitCheckTask)
if !ok {
log.Errorf("unsupported worker.Task: %+v", t)
return
}
region := spCheckTask.Region
regionId := region.Id
log.Debugf("executing split check worker.Task: [regionId: %d, startKey: %s, endKey: %s]", regionId,
hex.EncodeToString(region.StartKey), hex.EncodeToString(region.EndKey))
key := r.splitCheck(regionId, region.StartKey, region.EndKey)
if key != nil {
_, userKey, err := codec.DecodeBytes(key)
if err == nil {
// It's not a raw key.
// To make sure the keys of same user key locate in one Region, decode and then encode to truncate the timestamp
key = codec.EncodeBytes(userKey)
}
msg := message.Msg{
Type: message.MsgTypeSplitRegion,
RegionID: regionId,
Data: &message.MsgSplitRegion{
RegionEpoch: region.GetRegionEpoch(),
SplitKey: key,
},
}
err = r.router.Send(regionId, msg)
if err != nil {
log.Warnf("failed to send check result: [regionId: %d, err: %v]", regionId, err)
}
} else {
log.Debugf("no need to send, split key not found: [regionId: %v]", regionId)
}
}
![](https://img-blog.csdnimg.cn/079fad1255024b4d9b1dc36de51e7ac3.png)
![](https://img-blog.csdnimg.cn/1d911b5e75a1426cb35ba5721aadf333.png)
![](https://img-blog.csdnimg.cn/a9945fb784d841df8d1259079508db2c.png)
peerSender也就是raftCh
![](https://img-blog.csdnimg.cn/fcddb5ab8ce648a983bdbd34deb0fd7c.png)
![](https://img-blog.csdnimg.cn/b2aba6ec02124601b74e6ed692b5a2ce.png)
func (d *peerMsgHandler) onPrepareSplitRegion(regionEpoch *metapb.RegionEpoch, splitKey []byte, cb *message.Callback) {
if err := d.validateSplitRegion(regionEpoch, splitKey); err != nil {
cb.Done(ErrResp(err))
return
}
region := d.Region()
d.ctx.schedulerTaskSender <- &runner.SchedulerAskSplitTask{
Region: region,
SplitKey: splitKey,
Peer: d.Meta,
Callback: cb,
}
}
请求启动过程
![](https://img-blog.csdnimg.cn/c0eda27787b2442fb388696452f343ac.png)
![](https://img-blog.csdnimg.cn/eceafaed6efc4f6faac8bd1ffc6aae4a.png)
触发上面的point1![](https://img-blog.csdnimg.cn/ba77e96e3c5c469ca712b6a71b278ed2.png)
![](https://img-blog.csdnimg.cn/2fd975d0a9624a5996c2bdf9e0b1a7b8.png)
![](https://img-blog.csdnimg.cn/e73348e08c5b458982f10ecf2eb13eca.png)
![](https://img-blog.csdnimg.cn/b0080b5e7f334942b410b85365d74387.png)
func (r *RaftstoreRouter) SendRaftCommand(req *raft_cmdpb.RaftCmdRequest, cb *message.Callback) error {
cmd := &message.MsgRaftCmd{
Request: req,
Callback: cb,
}
regionID := req.Header.RegionId
return r.router.send(regionID, message.NewPeerMsg(message.MsgTypeRaftCmd, regionID, cmd))
}
![](https://img-blog.csdnimg.cn/13f2ba285657480ca87e353313219d53.png)
handleMsg---------》![](https://img-blog.csdnimg.cn/bac100c29e084b4195b1528d5927cb1f.png)
心跳更新region
![](https://img-blog.csdnimg.cn/237cf8a00a0542c4aacd3091926de83d.png)
![](https://img-blog.csdnimg.cn/bff6dd0ea7db4603824839d51d499d51.png)
![](https://img-blog.csdnimg.cn/8b83f9beee5143e6a0a102e43af86829.png)
![](https://img-blog.csdnimg.cn/ae855d9b9929486ab02b09a68d871130.png)
func (m *MockSchedulerClient) RegionHeartbeat(req *schedulerpb.RegionHeartbeatRequest) error {
if err := m.checkBootstrap(); err != nil {
return err
}
m.Lock()
defer m.Unlock()
regionID := req.Region.GetId()
for _, p := range req.Region.GetPeers() {
delete(m.pendingPeers, p.GetId())
}
for _, p := range req.GetPendingPeers() {
m.pendingPeers[p.GetId()] = p
}
m.leaders[regionID] = req.Leader
if err := m.handleHeartbeatVersion(req.Region); err != nil {
return err
}
if err := m.handleHeartbeatConfVersion(req.Region); err != nil {
return err
}
resp := &schedulerpb.RegionHeartbeatResponse{
Header: &schedulerpb.ResponseHeader{ClusterId: m.clusterID},
RegionId: regionID,
RegionEpoch: req.Region.GetRegionEpoch(),
TargetPeer: req.Leader,
}
if op := m.operators[regionID]; op != nil {
if m.tryFinished(op, req.Region, req.Leader) {
delete(m.operators, regionID)
} else {
m.makeRegionHeartbeatResponse(op, resp)
}
log.Debugf("[region %d] schedule %v", regionID, op)
}
store := m.stores[req.Leader.GetStoreId()]
store.heartbeatResponseHandler(resp)
return nil
}