Apache Spark - Spark 中的内部作业调度程序如何定义什么是用户,什么是池

2024-01-27

我很抱歉在这里说得有点笼统,但我对 Spark 内部的作业调度如何工作有点困惑。从文档中here https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application我知道它是 Hadoop Fair Scheduler 的某种实现。

我无法理解这里的用户到底是谁(linux 用户、hadoop 用户、spark 客户端?)。我也无法理解这里的池是如何定义的。例如,在我的 hadoop 集群中,我将资源分配给两个不同的池(我们称它们为团队 1 和团队 2)。但是在spark集群中,不同的池和其中的用户不会实例化自己的spark上下文吗?这再次让我产生疑问,当我将属性设置为spark.scheduler.pool时,我应该传递哪些参数。

我对 driver 如何实例化 Spark 上下文,然后将它们拆分为任务和作业有基本的了解。可能我在这里完全没有抓住重点,但我真的很想了解 Spark 的内部调度程序如何在操作、任务和作业的上下文中工作


我找官方文档 https://spark.apache.org/docs/2.4.0/job-scheduling.html#scheduling-within-an-application非常全面,涵盖了您所有的问题。然而,人们可能会觉得从第一次开始就很难消化。

在深入研究细节之前,让我们先给出一些定义和粗略的类比。application是什么创建了 SparkContextsc并且可能被称为您部署的东西火花提交. job是 Spark 定义中的一个动作转变与行动意思是类似的东西count, collect etc.

有两个主要且在某种意义上是独立的主题:application以及内部调度application。前者更多地与资源管理器相关,包括 Spark Standalone 仅 FIFO 模式以及static and dynamic分配。

后者,Spark 内的调度application正如我从你的评论中了解到的,这是你的问题的问题。让我尝试在某种抽象层面上描述那里发生的事情。

假设您提交了您的application你有两个jobs

sc.textFile("..").count()   //job1
sc.textFile("..").collect() //job2

如果这段代码恰好在同一个线程中执行,那么这里就不会发生太多有趣的事情,job2及其所有的tasks获取资源仅在那之后 job1已经完成了。

现在假设您有以下内容

thread1 { job1 }
thread2 { job2 } 

这越来越有趣了。默认情况下,在您的application调度程序将使用FIFO将资源分配给任何一个的所有任务job碰巧首先出现在调度程序中。给对方的任务job仅当有空闲核心并且没有来自更多“优先级”优先的任务时才会获取资源job.

现在假设你设置spark.scheduler.mode=FAIR为您application。从现在起每job有一个概念pool它属于。如果你什么都不做,那么对于每项工作pool标签是“默认”。为您设置标签job你可以执行以下操作

sc.setLocalProperty("spark.scheduler.pool", "pool1").textFile("").count() // job1
sc.setLocalProperty("spark.scheduler.pool", "pool2").textFile("").collect() // job2

这里的一个重要注意事项是设置本地属性 https://spark.apache.org/docs/2.4.0/api/scala/index.html#org.apache.spark.SparkContext@setLocalProperty(key:String,value:String):Unit对每个线程以及所有生成的线程都有效。对我们来说意味着什么?好吧,如果你在同一个线程中,那就没有什么意义了jobs 被依次执行。 但是,一旦您具备以下条件

thread1 { job1 } // pool1
thread2 { job2 } // pool2

job1 and job2在资源分配的意义上变得无关。一般来说,正确配置每个pool在公平调度程序中file https://spark.apache.org/docs/2.4.0/job-scheduling.html#configuring-pool-properties with 最小份额 > 0你可以确定job来自不同池的人将有资源继续进行。

然而,您还可以走得更远。默认情况下,在每个pool jobs 在 a 中排队FIFO方式,这种情况与我们有 FIFO 模式时的场景基本相同,job来自不同线程。要改变你需要改变pool在 xml 文件中有<schedulingMode>FAIR</schedulingMode>.

鉴于这一切,如果你只是设置spark.scheduler.mode=FAIR并让所有的jobs 落入同一个“默认”池中,这与使用默认池大致相同spark.scheduler.mode=FIFO并有你的job在不同的线程中启动。如果您仍然只想要单个“默认”fair池只需更改 xml 文件中“默认”池的配置即可反映这一点。

发挥机制作用pool你需要定义这个概念user这与将“spark.scheduler.pool”从正确的线程设置为正确的值相同。例如,如果您的application监听JMS,然后消息处理器可以为每个消息处理设置池标签job取决于其内容。

最终,不确定字数是否少于官方文档中的字数,但希望它能以某种方式有所帮助:)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Spark - Spark 中的内部作业调度程序如何定义什么是用户,什么是池 的相关文章

  • Spark-获取RDD中的文件名

    我正在尝试处理每天都在增长的 4 个文本文件目录 我需要做的是 如果有人试图搜索发票号码 我应该给他们包含该发票号码的文件列表 我能够通过将文本文件加载为 RDD 来映射和减少文本文件中的值 但是如何获取文件名和其他文件属性呢 从 Spar
  • 重塑案例类构造函数?

    试图找到一种方法来 重塑 案例构造函数以填充某些默认值 以下情况可能吗 def reshape T R1 lt HList R2 lt HList h R1 R2 gt T example case class MyClass a Doub
  • 新式(“内联”)宏需要 scala.meta

    我刚刚更新到 scala meta 2 0 0 M1 和最新的 scala 2 12 3 现在宏不再编译 我所做的唯一更改是将元版本从 1 8 0 更改为 2 0 0 M1 错误 新式 内联 宏需要 scala meta 有谁知道是否有快速
  • Java / Scala Future 由回调驱动

    简洁版本 我怎样才能创建一个Promise
  • 将 Scala 库转换为 DLL (.NET)

    我正在尝试从 scala 类创建一个 Dll 我将 IntelliJ 与 SBT 一起使用 我已经找到了一种使用 ikvm converter 将 jar 文件转换为 Dll 的方法 现在的问题是 当我在 SBT 下使用 package 从
  • Protobuf RPC 在 Hadoop 2.2.0 单节点服务器上不可用?

    我正在尝试在按照本教程安装的本地单节点集群上运行 hadoop 2 2 0 mapreduce 作业 http codesfusion blogspot co at 2013 10 setup hadoop 2x 220 on ubuntu
  • 逆变方法参数类型

    wiki 逆变方法参数类型 https en wikipedia org wiki Covariance and contravariance 28computer science 29 Contravariant method argum
  • Scala:具有复杂结构的树插入尾递归

    我正在 scala 中创建自定义对象树 并且我的插入方法引发堆栈溢出 因为它不是尾递归 但是 我不太清楚如何使其尾递归 我见过使用 累加器 变量的相关示例 但它们要么是只能相乘和覆盖的整数之类的东西 要么是我在适应树时遇到困难的列表 这是我
  • 最小重复子串

    我正在看 Perl代码高尔夫页面 http www perlmonks org node id 82878 不要问为什么 并遇到了这个 第 3 洞 最小重复图案 编写一个子例程 它接受一个字符串 该字符串可能包含 重复模式 并返回最小的重复
  • 如何捕获 Oozie Spark 输出

    有没有办法捕获spark的输出然后将其输入到shell上 我们当前正在使用 scala 创建 jar 文件 并希望我们的 Spark 输出成为 shell 输入 我的想法是使用 wf actionData spark XXXX var 我只
  • 不支持的身份验证令牌,仅当禁用身份验证时才允许 schema='none':{ schema='none' } - Neo4j 身份验证错误

    我正在尝试使用 neo4j spark connector 从 Spark 连接到 Neo4j 当我尝试连接到 Neo4j 时遇到身份验证问题org neo4j driver v1 exceptions AuthenticationExce
  • PySpark Yarn 应用程序在 groupBy 上失败

    我正在尝试在 Yarn 模式下运行一个处理大量数据的作业 2TB 从谷歌云存储读取 管道可以总结如下 sc textFile gs path json map lambda row json loads row map toKvPair g
  • Hadoop 超立方体

    嘿 我正在启动一个基于 hadoop 的超立方体 具有灵活的维度数 有人知道这方面现有的方法吗 我刚刚发现PigOLAP草图 http wiki apache org pig PigOLAPSketch 但没有代码可以使用它 另一种方法是Z
  • 在 Scala 和 SBT 中调试较长的编译时间

    在我的 Scala SBT 项目中 我有一个文件需要 5 分钟才能编译 所有其他的都可以在几秒钟内编译 这使得开发非常痛苦 我确信我滥用了一些 Scala 构造 但我不知道如何调试它 如何在 Scala 中调试较长的编译时间 我正在使用 S
  • 为什么 Spark 比 Hadoop MapReduce 更快

    有人可以使用字数统计示例解释一下为什么 Spark 比 MapReduce 更快吗 bafna的答案提供了故事的记忆方面 但我想补充另外两个重要事实 DAG和生态系统 Spark 使用 惰性求值 来形成连续计算阶段的有向无环图 DAG 通过
  • 错误:协变类型 A 出现在逆变位置

    我试图写一个不可变的Matrix A 班级 我希望该类是协变的A但是当我把 在 前面A编译器开始抱怨类中的某些操作 以下是我的相关子集Matrix类 实际类比以下子集大 5 倍左右 class Matrix A private val co
  • pyspark 中的 Pandas UDF

    我正在尝试在 Spark 数据帧上填充一系列观察结果 基本上我有一个日期列表 我应该为每个组创建缺失的日期 在熊猫中有reindex函数 这是 pyspark 中不可用的 我尝试实现 pandas UDF pandas udf schema
  • Scala:如何在超类上实现克隆方法,并在子类中使用它?

    我可能会以错误的方式处理这个问题 但我想要一个像这样的对象 class MyDataStructure def myClone val clone new MyDataStructure do stuff to make clone the
  • 匿名类上的 NotSerializedException

    我有一个用于过滤项目的界面 public interface KeyValFilter extends Serializable public static final long serialVersionUID 7069537470113
  • 使用 scala 集合 - CanBuildFrom 麻烦

    我正在尝试编写一个接受任何类型集合的方法CC 并将其映射到一个新的集合 相同的集合类型但不同的元素类型 我正在挣扎 基本上我正在尝试实施map but 不在集合本身上 问题 我正在尝试实现一个带有签名的方法 它看起来有点像 def map

随机推荐