128-152-spark-核心编程-源码

2023-05-16

128-spark-核心编程-源码(主要以了解基本原理和流程为主):

总体相关

​ 1.环境准备(Yarn 集群)
​ (1) Driver, Executor
​ 2.组件通信
​ (1) Driver => Executor
​ (2) Executor => Driver
​ (3) Executor => Executor
​ 3.应用程序的执行
​ (1) RDD 依赖
​ (2)阶段的划分
​ (3) 任务的切分
​ (4)任务的调度
​ (5)任务的执行

​ 4.Shuffle
​ (1) Shuffle 的原理和执行过程
​ (2) Shuffle 写磁盘
​ (3) Shuffle 读取磁盘
​ 5.内存的管理
​ (1)内存的分类
​ (2)内存的配置

起点:org/apache/spark/deploy/SparkSubmit.scala main

java org.apache.spark.deploy.SparkSubmit
java HelloWorld
JVM => Process ( SparkSubmit)
SparkSubmit.main
jps

结合尚硅谷视频讲解图片理解。

#提交命令
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

main->doSubmit->parseArguments->parse(args.asJava)->SparkSubmitArguments.handle(--master --class)
给action赋值action = Option(action).getOrElse(SUBMIT) org.apache.spark.deploy.SparkSubmit#submit ->doRunMain()-org.apache.spark.deploy.SparkSubmit#runMain->prepareSubmitEnvironment(准备提交的环境)

#准备提交的环境
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

#根据环境找到childMainClass
if (isYarnCluster) {childMainClass = YARN_CLUSTER_SUBMIT_CLASS。。。} (YARN_CLUSTER_SUBMIT_CLASS:	org.apache.spark.deploy.yarn.YarnClusterApplication)

#yarnclient创建了资源调度器rmclient
YarnClient.createYarnClient->ApplicationClientProtocol rmClient;->org.apache.spark.deploy.yarn.Client#run
#提交应用程序,返回appid
org.apache.spark.deploy.yarn.Client#submitApplication
#客户端启动      yarnClient.init(hadoopConf)->yarnClient.start()->val newApp = yarnClient.createApplication()创建应用	->	createContainerLaunchContext(创建容器环境)->createApplicationSubmissionContext(创建提交环境)
#连接和提交,yarnClient连接,submitApplication提交
yarnClient.submitApplication(appContext)->Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++  ->amArgs->amClass="org.apache.spark.deploy.yarn.ApplicationMaster"(启动ApplicationMaster)
      


 #启动ApplicationMaster
 org.apache.spark.deploy.yarn.ApplicationMaster#main-》org.apache.spark.deploy.yarn.YarnRMClient#amClient属性。applicationmaster和resourcemaster的链接-》org.apache.spark.deploy.yarn.ApplicationMaster#runDriver-》
 org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication启动用户应用程序-》startUserApplication.start(driver驱动线程初始化sparkcontext以及run mian方法)-》org.apache.spark.deploy.yarn.ApplicationMaster#registerAM(注册到rm,申请资源)-》org.apache.spark.deploy.yarn.YarnRMClient#createAllocator(创建分配器)-》org.apache.spark.deploy.yarn.YarnAllocator#allocateResources(返回可用资源列表)-》
org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers(处理可用的容器)-》
org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers(运行已分配的容器)-》
org.apache.spark.deploy.yarn.ExecutorRunnable#prepareCommand(准备指令)-》
nmClient.startContainer(container.get, ctx)(向指定的NM启动容器)-》
/bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend(启动Executor)


#启动Executor
org.apache.spark.executor.YarnCoarseGrainedExecutorBackend#main-》  run  ->
org.apache.spark.SparkEnv#createExecutorEnv(创建executorenv环境)-》
org.apache.spark.rpc.netty.Dispatcher#registerRpcEndpoint(注册rpc通讯终端)-》
org.apache.spark.rpc.netty.Inbox#Inbox(收件箱,自己给自己发消息constructor -> onStart -> receive* -> onStop)-》
org.apache.spark.executor.CoarseGrainedExecutorBackend#onStart-》
org.apache.spark.rpc.RpcEndpointRef#ask(向driver注册executor)-》
org.apache.spark.scheduler.SchedulerBackend-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receiveAndReply(接收和应答)-》
org.apache.spark.rpc.RpcCallContext#reply( context.reply(true)注册成功)-》
org.apache.spark.executor.CoarseGrainedExecutorBackend#receive(executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,resources = _resources)创建Executor计算对象)-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receive(makeOffers(executorId)注册成功)-》

通讯环境:

Netty:通讯框架,AIO异步非阻塞IO,BIO阻塞式IO,NIO非阻塞式IO

Linux对AIO支持不够好,Windows支持,Linux采用Epoll方式模仿AIO操作

org.apache.spark.SparkContext#createSparkEnv-》
org.apache.spark.rpc.RpcEnv#create(rpcenv创建)-》
org.apache.spark.rpc.netty.NettyRpcEnv#NettyRpcEnv-》
org.apache.spark.util.Utils$#startServiceOnPort-》
org.apache.spark.rpc.netty.NettyRpcEnv#startServer(创建服务)-》
org.apache.spark.network.server.TransportServer#TransportServer(创建服务)-》
org.apache.spark.network.server.TransportServer#init(初始化)-》
org.apache.spark.network.util.NettyUtils#getServerChannelClass(nio,EPOLL方式模仿异步)-》
org.apache.spark.rpc.netty.Dispatcher#registerRpcEndpoint(注册通讯终端,receive接受数据,收件箱inbox)-》
org.apache.spark.rpc.RpcEndpointRef#ask(发送数据,终端引用,)-》
org.apache.spark.rpc.netty.NettyRpcEnv#outboxes属性,发件箱

应用程序的执行:

应用SparkContext对象重要相关字段

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EjehU7jt-1670772034910)(png/image-20211023100428603.png)]

(1) RDD 依赖

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wCRroNC7-1670772034911)(png/image-20211023101856923.png)]

(2)阶段的划分

org.apache.spark.rdd.RDD#collect(行动算子的触发)-》
org.apache.spark.SparkContext#runJob(运行任务)-》
org.apache.spark.scheduler.DAGScheduler#runJob(有向无环图)-》
org.apache.spark.util.EventLoop#post(将JobSubmitted事件放入事件队列中)-》
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive(事件队列中取出作业提交)-》
org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted(进行阶段的划分)-》
org.apache.spark.scheduler.DAGScheduler#createResultStage(进行阶段的划分)-》
org.apache.spark.scheduler.DAGScheduler#getOrCreateParentStages(获取上级阶段)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RePkKYud-1670772034912)(png/image-20211023101920351.png)]

(3) 任务的切分

每个阶段最后分区的数量即n*2个任务,例如最后一个阶段分为3个分区,最后shufflerdd也会有三个分区

org.apache.spark.scheduler.DAGScheduler#submitStage-》
org.apache.spark.scheduler.DAGScheduler#submitMissingTasks(没上一节阶段提交任务,有上一级阶段提交上一级阶段)-》

(4)任务的调度

org.apache.spark.scheduler.TaskScheduler#submitTasks(任务调度器提交任务)-》
org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks(实现)-》
org.apache.spark.scheduler.TaskSetManager(任务tasksset的管理者)-》
org.apache.spark.scheduler.SchedulableBuilder(调度器)-》
org.apache.spark.scheduler.TaskSchedulerImpl#initialize(初始化调度器,默认FIFO)-》
org.apache.spark.scheduler.Pool#addSchedulable(任务池添加调度)-》
org.apache.spark.scheduler.SchedulerBackend#reviveOffers(取任务)-》org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers(集群模式取)-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers()-》
org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers(获取资源调度信息)-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#launchTasks(调度任务)-》
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))(任务池取出发送终端执行)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HJ6oNHxS-1670772034912)(png/image-20211023104940587.png)]

本地化级别
#补充,本地化级别,首选位置。task任务发送到哪里,数据和节点位置等。效率考虑,移动数据不如移动计算
for(currentMaxLocality<-taskSet.myLocalityLevels) 
移动数据不如移动计算
计算和数据的位置存在不同的级别,这个级别称之为本地化级别
进程本地化:数据和计算在同一个进程中
节点本地化:数据和计算在同一个节点中
机架本地化:数据和计算在同一个机架中
任意

(5)任务的执行

org.apache.spark.executor.CoarseGrainedExecutorBackend#receive(executor接收到消息)-》
org.apache.spark.executor.Executor#launchTask(启动Task)-》
java.util.concurrent.ThreadPoolExecutor#execute(每个线程执行每个task)-》
org.apache.spark.executor.Executor.TaskRunner#run-》
org.apache.spark.scheduler.Task#run

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YeoNG4rn-1670772034914)(png/image-20211023111151468.png)]

Shuffle原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pnRL92cj-1670772034914)(png/image-20211023180336159.png)]

详解转变的过程

前提1:一核,一个task,落盘一个文件,三个任务读取数据,但是没法分辨需要的数据是文件中的那些数据

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ULeyvl0C-1670772034915)(png/image-20211023180544869.png)]

前提2:多核,多task,每个任务针对落盘三个文件,导致小文件过多

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NTmqEXci-1670772034915)(png/image-20211023180730597.png)]

前提3:对前提2的优化,将同核任务的落盘,写相同的文件,但是真实环境task可能会很多,下游任务也可能人多,或者100核数。文件将还是很多。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DG26XExg-1670772034916)(png/image-20211023180910097.png)]

前提4:对前提3的优化,写到同一个文件,使用index索引文件记录下游任务读取数据的偏移量。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V85FxisI-1670772034916)(png/image-20211023182401664.png)]

Shuffle实现过程:

org.apache.spark.scheduler.ShuffleMapTask-》
org.apache.spark.scheduler.ShuffleMapTask#runTask-》
org.apache.spark.shuffle.ShuffleWriteProcessor#write(写)-》
org.apache.spark.shuffle.ShuffleManager#getWriter(获取写入的对象)-》
org.apache.spark.shuffle.sort.SortShuffleWriter#write()-》
org.apache.spark.util.collection.ExternalSorter#writePartitionedMapOutput-》
org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter#commitAllPartitions(提交)-》
org.apache.spark.shuffle.IndexShuffleBlockResolver#writeIndexFileAndCommit(索引文件和数据文件的提交)-》

#读
org.apache.spark.scheduler.ResultTask#runTask(结果任务)-》
org.apache.spark.rdd.RDD#getOrCompute(获取或计算)-》
org.apache.spark.rdd.ShuffledRDD#compute(shufflerdd的计算)-》
org.apache.spark.shuffle.BlockStoreShuffleReader#read(读取数据)

shuffle写:

org.apache.spark.shuffle.ShuffleWriteProcessor(shuffle写的处理器)-》write-》
org.apache.spark.shuffle.ShuffleManager(shuffle管理器,hash早期有,sort现版本)-》
org.apache.spark.shuffle.sort.SortShuffleManager#getWriter(获取到了下面的写对象的SortShuffleWriter)-》
org.apache.spark.shuffle.sort.SortShuffleWriter#write(写)-》
org.apache.spark.util.collection.ExternalSorter#writePartitionedMapOutput(写出)-》
org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter#commitAllPartitions(提交)

补充writer写的类型,判断条件org.apache.spark.shuffle.sort.SortShuffleManager#registerShuffle

处理器写对象判断条件
SerializedShuffleHandleUnsafeShuffleWriter1.序列化规则支持重定位操作(java序列化不支持,kryo序列化支持) 2.不能使用预聚合功能 3.如果下游的分区数量小区大(16777215+1=16777216)PackedRecordPointer.MAXIMUM_PARTITION_ID + 1
BypassMergeSortShuffleHandleBypassMergeSortShuffleWriter1.不能使用预聚合 2、如果下游的分区数量小区等于200(可配)
BaseShuffleHandleSortShuffleWriter其他情况

Shuffle归并排序和读

org.apache.spark.util.collection.ExternalSorter#insertAll(插入)-》
org.apache.spark.util.collection.PartitionedAppendOnlyMap(支持预聚合的map结构)
org.apache.spark.util.collection.PartitionedPairBuffer(不支持预聚合的结构)
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap#changeValue(预聚合)-》位置-》
org.apache.spark.util.collection.ExternalSorter#maybeSpillCollection(是否溢写磁盘)-》
org.apache.spark.util.collection.Spillable#maybeSpill-》
org.apache.spark.util.collection.ExternalSorter#spill(溢写)-》
org.apache.spark.util.collection.ExternalSorter#spillMemoryIteratorToDisk(写到磁盘)-》
org.apache.spark.util.collection.ExternalSorter#writePartitionedMapOutput-》
org.apache.spark.util.collection.ExternalSorter#merge(Merge spilled and in-memory data合并溢写和磁盘)-》
org.apache.spark.util.collection.ExternalSorter#mergeSort(归并排序)-》
org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter#commitAllPartitions()-》
org.apache.spark.shuffle.IndexShuffleBlockResolver#writeIndexFileAndCommit(写索引文件和数据文件)->

5.内存的管理
(1)内存的分类

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ir6j3dQC-1670772034916)(png/image-20211024010705514.png)]

相关位置:

org.apache.spark.memory.UnifiedMemoryManager#apply	->
org.apache.spark.memory.UnifiedMemoryManager#getMaxMemory

#相关参数:org.apache.spark.memory.UnifiedMemoryManager#RESERVED_SYSTEM_MEMORY_BYTES预留内存300M

​ (2)内存的配置

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

128-152-spark-核心编程-源码 的相关文章

  • 分类算法之朴素贝叶斯

    1 朴素贝叶斯分类算法 朴素贝叶斯 Naive Bayes NB 算法是基于贝叶斯定理与特征条件独立假设的分类方法 该算法是有监督的学习算法 解决的是分类问题 是将一个未知样本分到几个预先已知类别的过程 朴素贝叶斯的思想就是根据某些个先验概
  • Spark(七)——累加器和广播变量

    5 累加器 通过在驱动器中调用SparkContext accumulator initialValue 方法 创建出存有初始值的累加器 返回值为org apache spark Accumulator T 对象 其中 T 是初始值 ini
  • SparkSession和sparkSQL

    一 概述 spark 有三大引擎 spark core sparkSQL sparkStreaming spark core 的关键抽象是 SparkContext RDD SparkSQL 的关键抽象是 SparkSession Data
  • 大数据--pyspark远程连接hive

    上一篇文章介绍了python连接hive的过程 通过地址 端口号访问到hive并对hive中的数据进行操作 这一篇文章介绍一下怎么通过windows本地pyspark 本地部署好的spark 远程虚拟机的hive 完成本地pyspark对h
  • 重新定义分析 - EventBridge 实时事件分析平台发布

    对于日志分析大家可能并不陌生 在分布式计算 大数据处理和 Spark 等开源分析框架的支持下 每天可以对潜在的数百万日志进行分析 事件分析则和日志分析是两个完全不同的领域 事件分析对实时性的要求更高 需要磨平事件领域中从半结构化到结构化的消
  • Spark DataFrame的Join操作和withColumn、withColumnRenamed方法实践案例(Scala Demo代码)

    import org apache log4j Level Logger import org apache spark sql SparkSession import org apache spark sql functions obje
  • java中使用spark如何将column多列合为一列

    接下来介绍几种使用spark将DataFrame中一行的多列合并到一列中 并且该列以不同的类型展示保存 1 建立dataset 自己需要连接的mongo库 private static String datasource 自己需要连接的mo
  • 【Apache Spark 】第 1 章Apache Spark 简介:统一分析引擎

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • 基于Spark的电商用户行为实时分析可视化系统(Flask-SocketIO)

    基于Spark的电商用户行为实时分析可视化系统 Flask SocketIO 项目简介 该项目已上线蓝桥课程 有需要的可凭邀请码 UB5mdLbl 学习哦 有优惠 课程地址 https www lanqiao cn courses 2629
  • Spark Job写文件个数的控制以及小文件合并的一个优化

    文章目录 背景说明 通过引入额外Shuffle对写入数据进行合并 EnsureRepartitionForWriting Rule CoalesceShufflePartitions Rule OptimizeShuffleWithLoca
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-3)

    文章目录 1 3 Scala的数据结构 1 3 1 数组 数组的遍历 数组转换 1 3 2 元组 创建元组 获取元组中的值 拉链操作 1 3 3 集合 List Set Map 1 3 Scala的数据结构 对于每一门编程语言来说 数组 A
  • 大数据开发必备面试题Spark篇合集

    1 Hadoop 和 Spark 的相同点和不同点 Hadoop 底层使用 MapReduce 计算架构 只有 map 和 reduce 两种操作 表达能力比较欠缺 而且在 MR 过程中会重复的读写 hdfs 造成大量的磁盘 io 读写操作
  • Hudi和Kudu的比较

    与Kudu相比 Kudu是一个支持OLTP workload的数据存储系统 而Hudi的设计目标是基于Hadoop兼容的文件系统 如HDFS S3等 重度依赖Spark的数据处理能力来实现增量处理和丰富的查询能力 Hudi支持Increme
  • spark_hadoop集群搭建自动化脚本

    bin bash 脚本使用说明 1 使用脚本前需要弄好服务器的基础环境 2 在hadoop的每个节点需要手动创建如下目录 data hdfs tmp 3 修改下面的配置参数 4 脚本执行完备后需要收到格式化namenode
  • 【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)

    欢迎关注博客主页 https blog csdn net u013411339 本文由 王知无 原创 首发于 CSDN博客 本文首发CSDN论坛 未经过官方和本人允许 严禁转载 欢迎点赞 收藏 留言 欢迎留言交流 声明 本篇博客在我之前发表
  • Spark的常用概念总结

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 基本概念 1 RDD的生成 2 RDD的存储 3 Dependency 4 Transformation和Action 4 1 Transformatio
  • 通过yarn提交作业到spark,运行一段时间后报错。

    加粗样式
  • Spark常用参数解释

    Spark的默认配置文件位于堡垒机上的这个位置 SPARK CONF DIR spark defaults conf 用户可以自行查看和理解 需要注意的是 默认值优先级最低 用户如果提交任务时或者代码里明确指定配置 则以用户配置为先 用户再
  • spark SQL基础教程

    1 sparkSQL入门 sparksql专门用于处理结构化的数据 而RDD还可以处理非结构化的数据 sparksql的优点之一是sparkfsql使用统一的api读取不同的数据 第二个优点是可以在语言中使用其他语言 例如python 另外
  • spark相关

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 pandas是什么 二 使用步骤 1 引入库 2 读入数据 总结 前言 提示 这里可以添加本文要记录的大概内容 例如 随着人工智能的不断发展 机器学习这门

随机推荐