TinyKv流程梳理三

2023-11-06

split流程

处理协程启动

func (bs *Raftstore) startWorkers(peers []*peer) {
	ctx := bs.ctx
	workers := bs.workers
	router := bs.router
	bs.wg.Add(2) // raftWorker, storeWorker
	rw := newRaftWorker(ctx, router)
	go rw.run(bs.closeCh, bs.wg)
	sw := newStoreWorker(ctx, bs.storeState)
	go sw.run(bs.closeCh, bs.wg)
	router.sendStore(message.Msg{Type: message.MsgTypeStoreStart, Data: ctx.store})
	for i := 0; i < len(peers); i++ {
		regionID := peers[i].regionId
		_ = router.send(regionID, message.Msg{RegionID: regionID, Type: message.MsgTypeStart})
	}
	engines := ctx.engine
	cfg := ctx.cfg
	workers.splitCheckWorker.Start(runner.NewSplitCheckHandler(engines.Kv, NewRaftstoreRouter(router), cfg))
	workers.regionWorker.Start(runner.NewRegionTaskHandler(engines, ctx.snapMgr))
	workers.raftLogGCWorker.Start(runner.NewRaftLogGCTaskHandler())
	workers.schedulerWorker.Start(runner.NewSchedulerTaskHandler(ctx.store.Id, ctx.schedulerClient, NewRaftstoreRouter(router)))
	go bs.tickDriver.run()
}

point1: 

func (w *Worker) Start(handler TaskHandler) {
	w.wg.Add(1)
	go func() {
		defer w.wg.Done()
		if s, ok := handler.(Starter); ok {
			s.Start()
		}
		for {
			Task := <-w.receiver
			if _, ok := Task.(TaskStop); ok {
				return
			}
			handler.Handle(Task)
		}
	}()
}
func (r *splitCheckHandler) Handle(t worker.Task) {
	spCheckTask, ok := t.(*SplitCheckTask)
	if !ok {
		log.Errorf("unsupported worker.Task: %+v", t)
		return
	}
	region := spCheckTask.Region
	regionId := region.Id
	log.Debugf("executing split check worker.Task: [regionId: %d, startKey: %s, endKey: %s]", regionId,
		hex.EncodeToString(region.StartKey), hex.EncodeToString(region.EndKey))
	key := r.splitCheck(regionId, region.StartKey, region.EndKey)
	if key != nil {
		_, userKey, err := codec.DecodeBytes(key)
		if err == nil {
			// It's not a raw key.
			// To make sure the keys of same user key locate in one Region, decode and then encode to truncate the timestamp
			key = codec.EncodeBytes(userKey)
		}
		msg := message.Msg{
			Type:     message.MsgTypeSplitRegion,
			RegionID: regionId,
			Data: &message.MsgSplitRegion{
				RegionEpoch: region.GetRegionEpoch(),
				SplitKey:    key,
			},
		}
		err = r.router.Send(regionId, msg)
		if err != nil {
			log.Warnf("failed to send check result: [regionId: %d, err: %v]", regionId, err)
		}
	} else {
		log.Debugf("no need to send, split key not found: [regionId: %v]", regionId)
	}
}

peerSender也就是raftCh


func (d *peerMsgHandler) onPrepareSplitRegion(regionEpoch *metapb.RegionEpoch, splitKey []byte, cb *message.Callback) {
	if err := d.validateSplitRegion(regionEpoch, splitKey); err != nil {
		cb.Done(ErrResp(err))
		return
	}
	region := d.Region()
	d.ctx.schedulerTaskSender <- &runner.SchedulerAskSplitTask{
		Region:   region,
		SplitKey: splitKey,
		Peer:     d.Meta,
		Callback: cb,
	}
}

请求启动过程

 

 触发上面的point1

 

 

func (r *RaftstoreRouter) SendRaftCommand(req *raft_cmdpb.RaftCmdRequest, cb *message.Callback) error {
	cmd := &message.MsgRaftCmd{
		Request:  req,
		Callback: cb,
	}
	regionID := req.Header.RegionId
	return r.router.send(regionID, message.NewPeerMsg(message.MsgTypeRaftCmd, regionID, cmd))
}

 handleMsg---------》

 心跳更新region

func (m *MockSchedulerClient) RegionHeartbeat(req *schedulerpb.RegionHeartbeatRequest) error {
	if err := m.checkBootstrap(); err != nil {
		return err
	}

	m.Lock()
	defer m.Unlock()

	regionID := req.Region.GetId()
	for _, p := range req.Region.GetPeers() {
		delete(m.pendingPeers, p.GetId())
	}
	for _, p := range req.GetPendingPeers() {
		m.pendingPeers[p.GetId()] = p
	}
	m.leaders[regionID] = req.Leader

	if err := m.handleHeartbeatVersion(req.Region); err != nil {
		return err
	}
	if err := m.handleHeartbeatConfVersion(req.Region); err != nil {
		return err
	}

	resp := &schedulerpb.RegionHeartbeatResponse{
		Header:      &schedulerpb.ResponseHeader{ClusterId: m.clusterID},
		RegionId:    regionID,
		RegionEpoch: req.Region.GetRegionEpoch(),
		TargetPeer:  req.Leader,
	}
	if op := m.operators[regionID]; op != nil {
		if m.tryFinished(op, req.Region, req.Leader) {
			delete(m.operators, regionID)
		} else {
			m.makeRegionHeartbeatResponse(op, resp)
		}
		log.Debugf("[region %d] schedule %v", regionID, op)
	}

	store := m.stores[req.Leader.GetStoreId()]
	store.heartbeatResponseHandler(resp)
	return nil
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

TinyKv流程梳理三 的相关文章

随机推荐

  • 数十万条以上的大量数据如何快速插入数据库中

    引言 这几天工作这边同事遇到了一个问题 对十五万条数据进行计算 插入数据库的时候耗时很严重 使用了批量插入对十五万条数据插入仍然耗费了30秒 前面计算也耗费了二十多秒 系统流畅度因此很难堪 经过我的排查发现主要是两个点需要优化 1 计算的算
  • 关于STM32在线升级文件大或者跳转后中断有问题的解决方法(IAR环境)

    首先 大家都知道是跳转到每个程序的复位中断地址 一开始我的IAR环境的启动文件是没有复位中断地址的 可以去IAR官网随便下载一个重新覆盖掉启动文件 当然还得看你的外设中断有没有包含 或者有多的要去掉 DCD Handler类型的 如果遇到I
  • 怎样在 Markdown 中使程序代码带上行号

    在图灵社区使用 Markdown 写文章时 如果在一段文字的每行开头加上四个空格 或者一个制表符 Tab 这段文字就会被视为程序代码 这样 就会自动识别所用的编程语言 进行代码染色 语法高亮显示 但是 如果这段程序很长的话 就有两个小问题
  • 假设检验,显著性,置信水平,p值,点估计

    1 为什么需要假设检验 以下图激光器项目为例子 抽样30个 改善前720mw 改善后723mw 有一点提升 提升小 可能是正常的波动 所以不一定真的提升了 所以到底是正常波动还是真的改善了 需要结合功率标准差进行分析 标准差决定了波动的情况
  • 医疗管理系统-图形报表、POI报表

    目录 1 套餐预约占比饼形图 1 1 需求分析 1 2 完善页面 1 2 1 导入ECharts库 1 2 2 参照官方实例导入饼形图 1 3 后台代码 1 3 1 Controller 1 3 2 服务接口 1 3 3 服务实现类 1 3
  • STM8 学习笔记 5:时钟

    1 概述 时钟是单片机的脉搏 是单片机的驱动源 使用任何一个外设都必须打开相应的时钟 这样的好处是 如果不使用一个外设的时候 就把它的时钟关掉 从而可以降低系统的功耗 达到节能 实现低功耗的效果 每个时钟tick 系统都会处理一步数据 这样
  • 图数据库-Neo4j:linux centOS7安装

    1 下载 下载地址 社区版免费 https neo4j com download other releases releases 2 解压 tar axvf neo4j community 3 4 5 unix tar gz 3 修改配置文
  • 完全图解自然语言处理中的Transformer——BERT基础(入门长文)

    翻译自Jay Alammar Blog 在上一篇文章可视化Seq2Seq attention中 我们介绍了现在深度学习中特别常用的Attention 注意力 机制 注意力可以提升机器翻译的效果 这篇文章介绍Transformer 一种使用注
  • 深入理解Netty高性能网络框架

    大家好 今天我们来聊聊Netty的那些事儿 我们都知道Netty是一个高性能异步事件驱动的网络框架 它的设计异常优雅简洁 扩展性高 稳定性强 拥有非常详细完整的用户文档 同时内置了很多非常有用的模块基本上做到了开箱即用 用户只需要编写短短几
  • 平衡二叉树(AVL)python实现

    AVL树是一种特殊的二叉搜索树 BST树 数据极端情况下 二叉搜索树会退化成为单链表 但是AVL树通过旋转操作规避了这个问题 查找平均复杂度 O logn AVL树不适于删除的情况 class AVLTreeNode object def
  • Nginx 配置日志打印--HTTP报文

    http include mime types default type application octet stream log format main remote addr remote user time local request
  • loadrunner12——录制脚本时网络连接错误——无法弹出浏览器解决方法

    loadrunner12可以使用火狐浏览器 如果大家安装有问题 为了避免出错 可根据以下地址重新进行安装 https blog csdn net lmm0513 article details 87935863 1 首先打开loadrunn
  • 1到100, 中间随机抽走一个数,用算法得出是哪个数

    import java util ArrayList import java util HashMap import java util List import java util Map public class HunderTest p
  • UNIX环境高级编程 学习笔记 第二章 UNIX标准及实现

    C语言的ANSI标准在1989年得到批准 此标准也被采纳为国际标准ISO IEC 9899 1990 ANSI是美国国家标准学会 American National Standards Institute 它是国际标准化组织ISO Inte
  • 什么是全国大学生电子设计大赛?如何备战?

    前言 全国大学生电子设计竞赛是教育部和工业和信息化部共同发起的大学生学科竞赛之一 每一届都有非常多的大学生参加 但每届的比赛前夕 总会有很多同学并不清楚比赛的相关信息 流程以及如何去备战比赛 导致在比赛中出现准备不足或违法比赛规则等现象从而
  • C++--封装

    目录 封装 封装的含义 封装的意义1 实例一 实例二 C 面对对象的三大特点 继承 封装 多态 封装 封装的含义 将对象的属性和行为作为一个整体 表现生活中的事物 将属性和行为加以权限控制 封装的意义1 设计一个类 将属性和行为写到一起 表
  • Struts简介

    一 Struts英文单词意思 支柱 支架 来源于建筑和旧式飞机使用的金属支架 二 定义 Struts是流行和成熟的基于MVC设计模式的Web应用程序框架 Struts在软件开发中 是一个非常优秀的框架 它先是Jakarta项目的一个子项目
  • 微服务常见解决方案和高并发常见解决方案、以上特殊解决方案

    我确实比较懒 有些东西总是会写到自己的笔记本上或者在线笔记中 一般很少总结写到博客中 闲来无事确实总结了一些 如果你需要 可以私信聊聊 不敢说有特别好特别完美的方案 但是从个人视角结合知识再结合经验还是有点可说的内容的 如果你需要请私信
  • Parameter ‘name‘ not found. Available parameters are [0, 1, param1, param2]] with root cause

    SpringBoot环境 测试查询数据库 第一次直接通过dao层的注解查询 全局查询没问题 加入参数就报错了 提示说传的参数找不到 打印出来都有值 最后查了一下 这是错误的接口方法 查询注解里的参数有问题 修改为
  • TinyKv流程梳理三

    split流程 处理协程启动 func bs Raftstore startWorkers peers peer ctx bs ctx workers bs workers router bs router bs wg Add 2 raft