elasticjob 源码分析

2023-10-26

简介

elasticjob是基于quartz构建支持分片的分布式弹性可伸缩的job执行组件

zookeeper节点数据设计
job
   leader
        election
            latch
            instance  //主节点的实例ID  临时节点  在节点选举成功后添加
        sharding
            necessary
            processing //临时节点标记 分片是否正在进行
   servers
        10.2.123.152
        123.254.26.23
   instances
        456  //临时节点  运行实例
        235
   sharding
        0
           instance = 10.7.1.2@-@456
           running  //标记该分片的状态正在运行 
        1
           instance = 10.7.1.2@-@456

1 在线的实例节点设计为临时节点

    public void persistOnline() {
        jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
    }

2 标记分片正在进行中的标识

jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
public void shardingIfNecessary() {
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
        if (!isNeedSharding() || availableJobInstances.isEmpty()) {
            return;
        }
        if (!leaderService.isLeaderUntilBlock()) {
            blockUntilShardingCompleted();
            return;
        }
        waitingOtherShardingItemCompleted();
        LiteJobConfiguration liteJobConfig = configService.load(false);
        int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
        log.debug("Job '{}' sharding begin.", jobName);
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
        resetShardingInfo(shardingTotalCount);
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
        jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
        log.debug("Job '{}' sharding complete.", jobName);
    }

分片事务结束时候需要删除节点

curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();

3 实例正在运行的状态的标记

public void registerJobBegin(final ShardingContexts shardingContexts) {
        JobRegistry.getInstance().setJobRunning(jobName, true);
        if (!configService.load(true).isMonitorExecution()) {
            return;
        }
        for (int each : shardingContexts.getShardingItemParameters().keySet()) {
            jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
        }
    }

任务开始执行时候即会注册相应的实例分片运行状态

 private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        if (shardingContexts.getShardingItemParameters().isEmpty()) {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
            }
            return;
        }
        jobFacade.registerJobBegin(shardingContexts);
        String taskId = shardingContexts.getTaskId();
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
        }
        try {
            process(shardingContexts, executionSource);
        } finally {
            // TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
            jobFacade.registerJobCompleted(shardingContexts);
            if (itemErrorMessages.isEmpty()) {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
                }
            } else {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
                }
            }
        }
    }
原理分析之初始化

使用时我们定义执行ElasticJob,但是ElasticJob底层执行的必然是quartz的Job,在源码中是 LiteJob

public final class LiteJob implements Job {
   
   @Setter
   private ElasticJob elasticJob;
   
   @Setter
   private JobFacade jobFacade;
   
   @Override
   public void execute(final JobExecutionContext context) throws JobExecutionException {
       JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
   }
}

那LiteJob 是如何被初始化创建并start的,实际上 elasticjob中具体的任务将会被封

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

elasticjob 源码分析 的相关文章

  • 从APM源码分析GPS、气压计惯导融合

    最近事多 xff0c 忙着开源自研飞控 xff0c 现主要工作基本已经完成 xff0c 代码最迟下月中旬开放 xff0c 博客来不及更新 xff0c 还请各位见谅 xff0c 后面会抽空多更的咯 xff01 xff01 xff01 自研飞控
  • ArrayList源码分析

    ArrayList源码分析 注意 本笔记分析对象为 Java8 版本 随版本不同 源码会发生变化 1 ArrayList类图与简介 ArrayList是一个 非线程安全 基于数组实现的一个动态数组 可以看到 它的顶层接口是 Collecti
  • LiveData源码分析

    首先还是以一个示例开始 xff1a MutableLiveData lt String gt liveString 61 new MutableLiveData lt gt liveString observe mOwner new Obs
  • RxJava 2.x 源码分析 之 FlatMap

    FlatMap 官方定义 xff1a 把被观察者发射出去的事件转化成新的子被观察者 xff0c 然后把这些发射量展开平铺后统一放到一个被观察者中 官方文档 简单来讲就是把被观察者每次发射的事件转化成一个子被观察者 xff0c 然后通过合并
  • PX4源码分析6:位置结算模块需要掌握哪几个核心点?

    位置解算源码位置 xff1a local position estimator main cpp 核心点1 xff1a 在北东地坐标系里 xff0c 如何由准确加速度对飞机的三维速度和三维位置进行估计 xff1f 核心点2 xff1a 如图
  • [gevent源码分析] 深度分析gevent运行流程

    一直对gevent运行流程比较模糊 xff0c 最近看源码略有所得 xff0c 不敢独享 xff0c 故分享之 gevent是一个高性能网络库 xff0c 底层是libevent xff0c 1 0版本之后是libev xff0c 核心是g
  • 【docker 17 源码分析】docker pull image 源码分析

    一 Image主要命令 docker images xff08 所有 xff09 docker images java xff08 所有java xff09 docker images java 8 xff08 固定tag的jave xff
  • Apache IoTDB’s UDF源码分析(1)

    目录 前言 命令行注册UDF函数 Create Function xxx as 34 全限定类名 34 语法分析 生成物理计划 执行物理计划进行函数注册 Select带有UDF函数的查询 前言 继上个月开始了Apache IoTDB的源码贡
  • lemon源码分析

    基本概念见上篇 lemon源码基本概念整理 1 follow集 对于如下4条产生式 program 61 expr TK SEM expr 61 expr TK IMPL expr expr 61 TK LPAREN expr TK RPA
  • PX4源码分析1_PX4源码的下载和编译

    一 基本信息 xff1a 1 软件系统 xff1a Ubuntu 14 04 64bit 2 源码位置 xff1a https github com PX4 Firmware 3 参考博客 xff1a xff08 1 xff09 libin
  • PX4源码分析7_添加mavlink自定义消息

    一 自定义mavlink消息 xff1a 根据uorb消息 xff08 msg xff09 自定义mavlink消息 方法为利用mavlink generator工具在xml文件生成mavlink所需相应的头文件 二 发送自定义mavlin
  • Freertos 源码分析 队列queue

    队列queue xff08 零 xff09 队列的基础概念和形态 xff08 一 xff09 Freertos 队列 queue c FreeRTOS Kernel 10 4 6 include queue h Freertos队列模块包含
  • 【死磕 Java 集合】— ConcurrentSkipListMap源码分析

    转自 xff1a http cmsblogs com p 61 4773 隐藏目录 前情提要简介存储结构源码分析 主要内部类构造方法添加元素添加元素举例删除元素删除元素举例查找元素查找元素举例彩蛋 作者 xff1a 彤哥 出处 xff1a
  • 第13章 商品秒杀功能实现

    mini商城第13章 商品秒杀功能实现 一 课题 商品秒杀功能实现 二 回顾 1 密码安全学 2 微信支付 3 微信退款 三 目标 1 秒杀设计 秒杀业务设计 秒杀架构设计 秒杀表结构设计 2 活动管理 活动分析 有效活动列表查询 3 搜索
  • Spring IOC容器初始化过程 源码分析

    本文主要记录Spring容器创建 源码分析过程 首先贴上一张时序图 好久没画 忘的差不多了 画的不好 可以凑合看一下 接下来 贴上一份测试代码 这里使用AnnotationConfigApplicationContext来初始化Spring
  • 一文搞懂Elastic-Job(内附源码解析)

    前言 Elastic Job是当当基于Zookepper Quartz开发并且开源的Java分布式定时任务 解决Quartz不支持分布式的弊端 它由两个相互独立的子项目Elastic Job Lite和Elastic Job Cloud组成
  • Glide3.7.0源码详解

    基于的Glide版本 3 7 0 本文分析的是Glide最基本的一行代码 Glide with this load url into imageView 我们认准这一个功能点 来看看Glide默默为我们做了什么 这篇文章会分析这行代码背后的
  • 【ClickHouse内核】资源管理

    目录 概述 资源使用追踪机制 MemoryTracker ProfileEvents QueryProfiler 举个例子 资源隔离机制 内存隔离 CPU隔离 IO隔离 资源使用配额 Quota 机制 结论 概述 资源管理对于数据库来说是非
  • android 休眠唤醒机制分析(一) — wake_lock

    Android的休眠唤醒主要基于wake lock机制 只要系统中存在任一有效的wake lock 系统就不能进入深度休眠 但可以进行设备的浅度休眠操作 wake lock一般在关闭lcd tp但系统仍然需要正常运行的情况下使用 比如听歌
  • 【ClickHouse内核】对于分区、索引、标记和压缩数据的协同工作

    目录 概述 写入过程 生成分区目录 生成索引 生成标记和数据压缩文件 各个底层物理文件生成的过程 查询过程 借助索引文件降低扫描范围 借助标记文件降低解压数据的大小 数据标记与压缩数据块的对应关系 多个数据标记对应一个压缩数据块 一个数据标

随机推荐

  • uni-app插件使用注意事项

    1 将插件设置为全局组件后需要将项目重新运行 2 有些插件的功能会互斥 不要贪多全部装上 按需安装即可
  • QT中QMap使用实例详解

    QMap QMultiMap属于关联式容器 其底层结构是通过二叉树实现 故其查找value的效率很快 QMap中的数据都是成对出现的 第一个称为key 键 第二个称value 键值 目录 实例化QMap对象 插入数据 移除数据 遍历数据 由
  • siege压力测试工具安装和介绍

    前言 最近公司有个项目需要做一个短轮询类推送服务器 推送服务器分为三种 短轮询 长轮询 长连接 用户量不大 但是为了保险起见还是做一下压力测试 用的工具是siege 目录 前言 目录 siege介绍 siege安装 siege使用 1 si
  • python爬虫---用数据解析bs4爬取整部三国演义(不用诗词名句网)

    python爬虫 用数据解析bs4爬取整部三国演义 不用诗词名句网 需求 使用bs4实现将三国演义小说的每一章的内容爬取到本地磁盘进行存储 诗词名句网无法进去 所以我自己找了个网站爬取 思路差不多 首先 对首页的页面数据进行爬取 url h
  • 矩阵的逆矩阵 和 转置矩阵

    这几天用到了逆矩阵 就在这里总结一下逆矩阵和转置矩阵 逆矩阵 逆矩阵就是一个矩阵的逆向 比如一个点乘以一个矩阵后得到了一个新的点的位置 如果想通过这个点再获得矩阵转换前的位置 那我们就需要乘以这个矩阵的逆矩阵 在Three js里面 我们可
  • 国产数据库

    作者 JiekeXu 来源 JiekeXu DBA之路 ID JiekeXu IT 大家好 我是 JiekeXu 很高兴又和大家见面了 今天和大家一起来体验一下 TiDB 5 0 欢迎点击上方蓝字关注我 标星或置顶 更多干货第一时间到达 T
  • springboot对bean的生命周期管理

    声明 代码是JavaEE开发的颠覆者 Spring Boot实战代码中的 我只是拿去学习 传统方式 public class BeanWayService public void init System out println Bean i
  • windows10上通过python3远程连接hive

    注意 impyla 既可以连接impala 也可以连接hive 环境 windows10 python版本 3 6 hive版本 1 1 impyla安装过程 安装依赖 pip3 install bit array pip3 install
  • stm32通过ESP8266连接互联网服务器,手机通过网页实现远程控制灯亮灭

    一 实验结果 最终实验结果如上图所示 由于csdn限制gif图像大小 所以模糊了点 但是还是可以看清的 图中是手机在网页中进行操作 然后发送请求到php服务器 php服务器建立tcp链接 该链接通过一个JAVA写的TCP请求转发器 把tcp
  • EasyExcel实现Excel文件导入导出

    1 EasyExcel简介 EasyExcel是一个基于Java的简单 省内存的读写Excel的开源项目 在尽可能节约内存的情况下支持读写百M的Excel github地址 https github com alibaba easyexce
  • 软考高级-信息系统项目管理工程师-备考建议

    本人参加了2023年11月的软考高项 这里分享一下关于高项的备考建议 高项一共有24章节 其中 重点是7 17这10大管理知识域 需要重点理解性学习 1 3 4 18这4章 几乎全是概念 可以阅读性的速看 把相关概念标注出来 考前在速记一下
  • 【python基础知识】18.实操-使用python自动群发邮件

    文章目录 前言 项目实操 明确项目目标 分析过程 拆解项目 逐步执行 代码实现 版本1 0 学习模块 发一封简单邮件 版本2 0 给自己发一封完整邮件 版本3 0 群发完整邮件 前言 之前 我们学习了模块相关的知识 让我们来回顾一下 回顾结
  • Vue列表渲染(v-for的使用)

    列表渲染 列表渲染的东西比较多 我们通过案例一步一步学习列表渲染的相关知识 基本列表 首先写一个基本的列表 想要把persons列表里面的对象展示在li里面 我们可以使用一个指令 v for v for vue提供给我们做循环的指令 语法类
  • Column 'goods_type' in where clause is ambiguous

    今天开发超市管理系统的时候发现了一个问题 百度了一下这个单词ambiguous是暧昧的意思 然后百度了 网上的人说是因为数据库查询的时候的多表查询中 有列名相同导致数据库不知道是那个表的列名 无法识别所以报出这个错误 错误发生在mybati
  • 记使用RabbitMQ的坑

    主要碰到以下几个问题 1 无法正常的启动rabbit服务 见图1 2 工厂启动后无法正常连接消息队列 见图2 3 1 2之后还是无法连接到消息队列 将port端口设置成5672 而不是15672 解决方法 1 针对问题1 在windows服
  • antd-vue表格实现单击或者双击

    在table表格中设置customRow属性 methods中实现 doubleClick record index return on 这里是双击 单击改成click即可 dblclick gt console log record in
  • Verilog 位拼接运算符{}语法要点总结

    Verilog 位拼接运算符语法要点总结 Verilog位拼接运算符 语法回顾 要点总结 Verilog位拼接运算符 语法回顾 verilog中 运算符用于 拼接 多个变量或者常量 基本用法如下 1 变量的拼接 wire a 3 0 b 4
  • android关于屏幕适配的几点建议

    1 使用wrap content match parent weight 2 使用相对布局 尽量不使用绝对布局 3 使用限定符 如 layout large xxx xml 这样大屏设备就会自动使用该布局 4 使用最小宽度限定符 如 lay
  • 基于Redisson的分布式锁

    接口实现类 import java util concurrent TimeUnit import org redisson api RLock import org redisson api RedissonClient 基于Rediss
  • elasticjob 源码分析

    简介 elasticjob是基于quartz构建支持分片的分布式弹性可伸缩的job执行组件 zookeeper节点数据设计 job leader election latch instance 主节点的实例ID 临时节点 在节点选举成功后添