Flink运行时之批处理程序生成计划

2023-11-07

批处理程序生成计划

DataSet API所编写的批处理程序跟DataStream API所编写的流处理程序在生成作业图(JobGraph)之前的实现差别很大。流处理程序是生成流图(StreamGraph),而批处理程序是生成计划(Plan)并由优化器对其进行优化并生成优化后的计划(OptimizedPlan)。

什么是计划

计划(Plan)以数据流(dataflow)的形式来表示批处理程序,但它只是批处理程序最初的表示,在一个批处理程序生成作业图之前,计划还会被进行优化以产生更高效的方案。Plan不同于流图(StreamGraph),它以sink为入口,因为一个批处理程序可能存在若干个sink,所以Plan采用集合来存储它:

protected final List<GenericDataSinkBase<?>> sinks = new ArrayList<>(4);

另外Plan还封装了批处理作业的一些基本属性:jobId、jobName以及defaultParallelism等。

Plan实现了Visitable接口,该接口表示其实现者是可遍历的。Visitable要求实现者完善其accept方法,该方法接收一个Visitor作为遍历器对实现Visitable接口的对象进行遍历。Plan对accept方法的实现是依次对所有的sink进行遍历:

public void accept(Visitor<Operator<?>> visitor) {   
    for (GenericDataSinkBase<?> sink : this.sinks) {      
        sink.accept(visitor);   
    }
}

代码段中的GenericDataSinkBase也间接实现了Visitable接口,在for循环中会调用它的accept方法。

Visitor接口提供了两个遍历方法,分别是前置遍历的preVisit和用于后置遍历的postVisit方法。Plan在内部实现了获得当前批处理程序最大并行度的MaxDopVisitor遍历器,preVisit会将当前遍历算子的并行度跟已知的最大并行度进行对比,在两者之间取较大值:

public boolean preVisit(Operator<?> visitable) {   
    this.maxDop = Math.max(this.maxDop, visitable.getParallelism());   
    return true;
}

获取最大并行度的getMaximumParallelism方法,会实例化该遍历器并调用accept方法进行遍历来获得整个批处理程序的最大并行度:

public int getMaximumParallelism() {   
    MaxDopVisitor visitor = new MaxDopVisitor();   
    accept(visitor);   
    return Math.max(visitor.maxDop, this.defaultParallelism);
}

代码段中的accept方法即为我们之前所展示的那个实现。由此可见accept内部定义了一种遍历模式,而具体遍历过程中要实现的逻辑,取决于对其应用的Visitor。这种设计将遍历模式和遍历逻辑进行了分离。

生成计划源码分析

跟流处理程序中生成流图(StreamGraph)的方式类似,批处理程序中生成计划(Plan)的触发位置也位于执行环境类中。具体而言,是通过createProgramPlan方法来生成Plan的。生成Plan的核心部件是算子翻译器(OperatorTranslation),createProgramPlan方法通过它来”翻译“出计划,核心代码如下:

OperatorTranslation translator = new OperatorTranslation();
Plan plan = translator.translateToPlan(this.sinks, jobName);

根据之前我们对Plan的介绍,可知它是以sink为源头的,所以这里在对计划进行翻译时,也接收的是sink集合。

OperatorTranslation,该类提供了大量的翻译方法来对批处理程序进行翻译。大致来看,它们之间的调用关系如下图:

OperatorTranslation-method-call-chain

上图中的蓝色带箭头的线表示调用关系,而红色的线表示互相调用的关系,也就是说它们之间存在递归调用

可以看出translateToPlan是这个类对外提供能力的入口方法。translateToPlan的完整实现如下代码段:

public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {   
    List<GenericDataSinkBase<?>> planSinks = new ArrayList<GenericDataSinkBase<?>>();
    //遍历sinks集合      
    for (DataSink<?> sink : sinks) {      
        //将翻译生成的GenericDataSinkBase加入planSinks集合
        planSinks.add(
            //对每个sink进行”翻译“
            translate(sink)
        );   
    }      
    //以planSins集合构建Plan对象
    Plan p = new Plan(planSinks);   
    p.setJobName(jobName);   
    return p;
}

上面代码段中的translate方法, 它接收每个需遍历的DataSink对象,然后将其转换成GenericDataSinkBase对象。其实现如下:

private <T> GenericDataSinkBase<T> translate(DataSink<T> sink) {        
    Operator<T> input = translate(sink.getDataSet());      
    GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);            
    return translatedSink;
}

translate方法内部分为两步,第一步是对当前遍历的sink的DataSet进行递归翻译并获得其输入端的Operator对象:

Operator<T> input = translate(sink.getDataSet());

注意这里的Operator对象是Flink core包中的Operator类型,而非批处理API包中的。

批处理相关的设计、命名相比流处理略显混乱,这里面当然有一些历史包袱存在,不过这不是我们关心的重点。为了避免产生混淆,同时为下文作铺垫,我们先分析一下批处理API的顶层设计以及core包中相关的类型设计。批处理API中的几个关键对象DataSet、Operator、DataSource、DataSink之间的继承和关联关系如下图:

DataSet-Operator-DataSource-DataSink-relationship

DataSet作为批处理API抽象的同时也是Operator的父类,而Operator则是批处理中所有算子的父类。DataSource和DataSink在哪里都是特殊的,这里也不例外。DataSource继承自Operator,因此它是一种特殊的算子。而DataSink跟上述这三个类不存在继承关系,但它保持了对DataSet的引用,表示跟它关联的数据集。

批处理API模块跟流处理API模块是完全独立的,就算名称相同的类,也不是双方API所共享的。

上面紧接着的这行代码中的translate方法在对DataSet进行翻译的过程中会枚举所有具体被支持的DataSet,并进行有针对性的翻译,具体被支持的DataSet总共有下图中被框起来的五个:

translate-supported-DataSet-type

对于Operator分支而言,因为这这三种基本的Operator类型处于继承链的最顶端,所以它们基本代表了所有后续派生的Operator。

注意,translate方法返回的Operator并不是批处理API包中的Operator类型,而是基础包中的。具体而言,Flink提供了两套Operator的抽象,它们分别是处于org.apache.flink.api.common.operators包以及org.apache.flink.api.java.operators包。上面展示的继承关系图中的Operator就是批处理的API模块中的,在这个体系中,DataSink是独立的。而core模块中的operators包中的Operator是所有算子的抽象,在这个包中,source、sink都派生自Operator,继承体系如下图所示:

core-module-operators-package-class-diagram

因此批处理Java API模块中的operators包不是核心模块中的operators包的扩展与延伸。核心模块中只是提供了一套公共的抽象,而批处理Java API提供的是面向编程接口的抽象。但他们之间并不是毫无联系,因为在translate方法中,会从批处理Java API模块中operators包往核心模块中operators包的转换,对应的转换关系如下:

  • DataSource -> GenericDataSourceBase (通过DataSource的translateToDataFlow方法)
  • DataSink -> GenericDataSinkBase(通过DataSink的translateToDataFlow方法)
  • SingleInputOperator -> Operator (通过SingleInputOperator抽象的translateToDataFlow方法,供子类实现)
  • TwoInputOperator -> Operator (通过TwoInputOperator抽象的translateToDataFlow方法,供子类实现)
  • BulkIterationResultSet -> BulkIterationBase (直接构建)
  • DeltaIterationResultSet -> DeltaIterationBase (直接构建)

translate方法将会在对特定类型的DataSet的翻译中触发对其递归调用,其顺序是从sink开始逆向往source方向进行的,同时会在它们之间建立关系。

这里需要注意的是,这种模式跟流处理中的生成StreamGraph的差别很大。StreamGraph是依靠StreamNode以及StreamEdge来建立节点和边之间的关系,并基于一个统一的StreamGraph数据结构在遍历中收集所有的StreamNode以及StreamEdge。而批处理所生成的Plan却并非是依靠一个中心化的数据结构,在从sink开始进行逆向遍历时,只构建当前算子跟其输入端算子这种临近算子之间的关系,这些关系被封装在各个算子对象中。如果需要串联起它们或者需要访问DAG整体,那么就需要通过遍历器从sink开始依据这种两两之间的关系进行遍历,因此这种模式可以看成是非中心化的。

每翻译一个DataSet会将其加入到一个名为translated的Map中去。translate方法最终返回的是sink紧邻接着的输入端的算子对象,该输入端算子目前还没有跟该sink进行关联。所以,第二步就是调用下面这句将它们建立关系同时将批处理API中的DataSink翻译为核心包中的GenericDataSinkBase表示:

GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);

最终在遍历完sinks集合后产生planSinks集合并以此创建Plan对象。

现在我们将注意力收回到createProgramPlan方法中来,刚刚已经创建完Plan对象,如果配置了自动类型注册,那么Plan将注入一个用于类型注册的遍历器来遍历所有算子并对其类型进行注册:

if (!config.isAutoTypeRegistrationDisabled()) {   
    plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() {            
        private final HashSet<Class<?>> deduplicator = new HashSet<>();            
        @Override      
        public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {         
            OperatorInformation<?> opInfo = visitable.getOperatorInfo();         
            Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, deduplicator);         
            return true;      
        }      
        @Override      
        public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}   
    });
}

完成自动类型注册之后,下一步是将缓存文件注册到Plan对象上:

registerCachedFilesWithPlan(plan);

何谓缓存文件?这里的缓存文件是指用户通过执行环境对象注册的带有名称以及路径的文件,该路径可以是最终执行任务的工作节点本地的文件路径,也可以是分布式文件系统的路径(这种情况Flink会将文件拷贝到本地)。

上面的这个方法会将注册到执行环境对象的缓存文件注册给Plan对象,以便后续生成JobGraph。

计划优化

其实在Flink为批处理程序生成计划(Plan)之后,它会对计划进行优化产生优化后的计划(OptimizedPlan),而批处理程序对应的作业图(JobGraph)则是基于OptimizedPlan生成的。OptimizedPlan的生成涉及到优化器相关的内容,更深入的分析请参考“优化器”相关的内容。因为这里的重心是介绍用户程序的执行,而了解OptimizedPlan是分析JobGraph的前提,所以我们会对OptimizedPlan进行简单介绍。

OptimizedPlan主要封装了如下这些属性:

  • dataSources:SourcePlanNode集合;
  • dataSinks:SinkPlanNode集合;
  • allNodes:优化后计划中的所有PlanNode节点集合;
  • originalProgram:最初未被优化的Plan对象

在ClusterClient的run方法中生成OptimizedPlan:

OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);

对getOptimizedPlan方法进行追踪会发现其实生成OptimizedPlan的核心代码就一句:

compiler.compile(p);

这里的compiler是优化器Optimizer的实例,而参数p是Plan的实例。

在这之前optimizer模块的名称一直是compiler,近几个版本才完成更名,但模块内的很多注释、变量以及方法名还是能发现那些历史遗留痕迹。可以将后续遇到的所有compiler当作optimizer来理解。

在分析流处理程序生成StreamGraph时,我们展示了通过Flink的计划可视化器生成StreamGraph的图形化表示。同样,计划可视化器也可以展示批处理的OptimizedPlan的图形化表示(遗憾的是无法展示Plan的图形化表示)。我们以flink-examples-batch模块中自带的WordCount作为示例程序来展示其执行计划图,在获得其OptimizedPlan的JSON表示之前,需要对源程序进行一些改造。

首先,将执行环境的并行度设置为2:

env.setParallelism(2);

然后将最终触发程序执行的这句注释掉:

//env.execute("WordCount Example");

换成下面这句:

System.out.print(env.getExecutionPlan());

将打印出来的OptimizedPlan的JSON字符串贴到Flink的计划可视化器中,点击下方的“Draw”按钮即可生成。生成的图如下:

Batch-WordCount-OptimizedPlan

从上图中各个算子的ID编号可以看出生成计划时其遍历的顺序是从sink开始的,因为ID生成器是一个静态计数器。

最后我们来看一下生成OptimizedPlan的JSON字符串的代码:

public String getExecutionPlan() throws Exception {   
    Plan p = createProgramPlan("plan", false);     
    if (executor != null) {      
        return executor.getOptimizerPlanAsJSON(p);   
    }   else {      
        PlanExecutor le = PlanExecutor.createLocalExecutor(null);      
        return le.getOptimizerPlanAsJSON(p);   
    }
}

通过调用PlanExecutor的getOptimizerPlanAsJSON方法获得OptimizedPlan并输出其JSON字符串表示。

更多生成OptimizedPlan的有待分析“优化器”时再分析。


微信扫码关注公众号:Apache_Flink

apache_flink_weichat


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

qrcode_for_apache_flink_qq_group

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink运行时之批处理程序生成计划 的相关文章

  • [转]video视频解码硬解和软解的区别及如何选择

    如果你认为本系列文章对你有所帮助 请大家有钱的捧个钱场 点击此处赞助 赞助额0 1元起步 多少随意 声明 本文只用于个人学习交流 若不慎造成侵权 请及时联系我 立即予以改正 锋影 email 174176320 qq com 硬解 字面上理

随机推荐

  • 百度引流推广怎么做?个人如何做百度推广

    个人如何做百度推广 相对于中小型企业 个人或者微商朋友在网络推广预算比较紧张 做网络营销推广的预算不会太多 因此 更需要在有限的推广费用预算 做出更好的推广效果 无疑 精准引流成为了个人做百度推广的首选 一 什么是百度推广 百度推广可以简单
  • 只出现一次的数字(异或运算^)

    给定一个非空整数数组 除了某个元素只出现一次以外 其余每个元素均出现两次 找出那个只出现了一次的元素 说明 你的算法应该具有线性时间复杂度 你可以不使用额外空间来实现吗 示例 1 输入 2 2 1 输出 1 示例 2 输入 4 1 2 1
  • 践行社会责任的路上,中概股们看到了怎样的风景?

    谈起社会责任 你会想到什么 绿色经济 双碳 目标 共同富裕 乡村振兴 慈善活动 ESG 环境 社会和公司治理 这些名词肯定少不了 当下 全球企业正越发强调社会责任 这或许是商业发展到一定阶段的必然结果 但也离不开公司们对社会事业的特别关注
  • PHP异常处理中的finally

    0x01 异常处理 在做代码分析的时候发现了一个有意思的点 样例代码如下 我们知道finally会在return之前执行 那么上
  • Packing(石板切割问题)回溯算法

    一 问题描述 给定一个最大的总切割目标石块 再给定一系列我们需要的样板石块 寻找切割方法使得我们从目标石块上切割出的所需样板石块的面积和最大 即对目标石块的利用率最高 限制切割为一刀切 即一次切割必须把一块石板一分为二 不能只切一段 左边为
  • 方差公式【数论】

    对于今天打的一道题 非常有感想 然后花了很久很久打了这个函数超多的方差公式 哎 来吧来吧 推导 首先我们知道方差的公式是 K i
  • 架构--网络关键指标公式

    架构 网络关键指标公式 一 经典公式1 估算系统的平均并发用户数和并发用户数峰值 1 1 公式 1 1 1 平均并发用户数 C nL T 参数说明 C 平均并发用户数 通过计算出来的 参数说明 n login session的数量 也就是
  • 告警与恢复告警原理及实现

    一 背景 自 双碳 政策提出以来 KaiwuDB 聚焦 数字能源 领域 为用户打造数字能源管理平台 旨在提升综合能源和碳资产管理能力 数字能源管理平台是以 KaiwuDB 为核心建设的云 边 端一体化数据服务平台 致力于为 IoT 工业互联
  • 多目标灰狼算法(MOGWO)的Matlab代码详细注释及难点解释(佳点集改进初始种群的MOGWO)

    目录 一 外部种群Archive机制 二 领导者选择机制 三 多目标灰狼算法运行步骤 四 MOGWO的Matlab部分代码详细注释 五 MOGWO算法难点解释 5 1 网格与膨胀因子 5 2 轮盘赌方法选择每个超立方体概率 为了将灰狼算法应
  • Ubuntu软件包升级失败的终极修复方法

    升级失败 apt upgrade y 尝试修复 apt autoremove Reading package lists Done Building dependency tree Reading state information Don
  • Centos7.6重置root密码

    启动Centos 7 虚拟机 三秒之内在这个系统boot引导界面迅速按e键进入boot编辑模式 如果没有在3秒内按写e 系统正常启动就不会进入到boot编辑模式了 找到以 linux16 开头的行 将从ro开始 ro不要删 往后到下一行前内
  • synchronized原理之前置知识

    一 Monitor概述 一 Java 对象头以 32 位虚拟机为例 一 普通对象 Object Header 64 bits Mark Word 32 bits Klass Word 32 bits 这个可以找到对象 二 数组对象
  • 构造一个简单的操作系统内核,详解进程切换细节

    1 基本功能介绍 如题 本文将介绍如何构造一个简单的操作系统内核 基于内核版本3 9 4 它有以下功能 1 进程的管理 2 进程的初始化 3 进程基于时间片的调度 2 实操步骤 1 安装qemu 以ubuntu为例 sudo apt get
  • jsp记住密码--Cookie

    jsp记住账号密码 本文介绍使用Cookie来实现记住账号密码操作 什么是Cookie Cookie是客户端访问服务器时 服务器在客户端硬盘上存放的信息 Cookie是服务器通知客户端保存键值对的一种技术 Cookie的用途 Cookie可
  • 百度智能云度能推出全新碳盘查服务,助力企业和园区摸清家底实现精细化管理

    今年1月 国务院发布 十四五 节能减排综合工作方案的通知 方案提出到2025年 全国单位国内生产总值能源消耗比2020年下降13 5 能源消费总量得到合理控制 百度也积极履行科技企业减碳责任 于2021年正式公布到2030年实现集团运营层面
  • 测开上手codewhisperer初体验

    AWS新出了一个插件 codewhisperer 这个名字一听还挺有意思 wispiser意为在耳边轻声细语的人 官方解释是一个强大的机器学习AI代码生成器 可以给你一些代码的建议 Amazon CodeWhisperer is a gen
  • 【CV大模型SAM(Segment-Anything)】如何一键分割图片中所有对象?并对不同分割对象进行保存?

    之前的文章 CV大模型SAM Segment Anything 真是太强大了 分割一切的SAM大模型使用方法 可通过不同的提示得到想要的分割目标 中详细介绍了大模型SAM Segment Anything 根据不同的提示方式得到不同的目标分
  • [转]QNX_IDE使用cin输入变量不能编译通过的解决方法

    如果你认为本系列文章对你有所帮助 请大家有钱的捧个钱场 点击此处赞助 赞助额0 1元起步 多少随意 声明 本文只用于个人学习交流 若不慎造成侵权 请及时联系我 立即予以改正 锋影 email 174176320 qq com 在使用QNX
  • 去掉有定位的left值

    left initial 一开始就是初始 默认值 的意思 就可以解决定位的left啦 转载于 https www cnblogs com renxiao1218 p 11611101 html
  • Flink运行时之批处理程序生成计划

    批处理程序生成计划 DataSet API所编写的批处理程序跟DataStream API所编写的流处理程序在生成作业图 JobGraph 之前的实现差别很大 流处理程序是生成流图 StreamGraph 而批处理程序是生成计划 Plan