Spark SQL 执行缓慢且资源空闲

2024-01-11

我有一个 Spark SQL,过去执行时间不到 10 分钟,现在在集群迁移后运行 3 小时,需要深入了解它实际执行的操作。我是 Spark 新手,如果我问一些不相关的问题,请不要介意。

增加spark.executor.memory但没有运气。

环境:Azure 存储上的 Azure HDInsight Spark 2.4

SQL:读取并连接一些数据,最后将结果写入 Hive 元存储。

The spark.sql脚本以以下代码结尾:.write.mode("overwrite").saveAsTable("default.mikemiketable")

Application Behavior: Within the first 15 mins, it loads and complete most tasks (199/200); left only 1 executor process alive and continually to shuffle read / write data. Because now it only leave 1 executor, we need to wait 3 hours until this application finish. enter image description here

Left only 1 executor alive enter image description here

Not sure what's the executor doing: enter image description here

From time to time, we can tell the shuffle read increased: enter image description here

Therefore I increased the spark.executor.memory to 20g, but nothing changed. From Ambari and YARN I can tell the cluster has many resources left. enter image description here

Release of almost all executor enter image description here

非常感谢任何指导。


我想首先对您的案例进行一些观察:

  1. 从任务列表中您可以看到 Shuffle Spill (Disk) 和 Shuffle Spill (Memory) 都有非常高的值。数据交换期间每个分区的最大块大小不应超过 2GB https://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications因此,您应该注意保持混洗数据的大小尽可能小。根据经验,您需要记住每个分区的大小应为 ~200-500MB。例如,如果总数据为 100GB,则至少需要 250-500 个分区才能将分区大小保持在上述限制内。
  2. 前两者并存也意味着执行器内存不足,Spark被迫将数据溢出到磁盘。
  3. 任务的持续时间太长。 A正常任务 https://www.protechtraining.com/blog/post/tuning-apache-spark-jobs-the-easy-way-web-ui-stage-detail-view-911应持续 50-200 毫秒。
  4. 太多被杀死的执行者是另一个迹象,表明你面临 OOM 问题。
  5. Locality https://stackoverflow.com/questions/26994025/whats-the-meaning-of-locality-levelon-spark-cluster是 RACK_LOCAL,它被认为是集群内可以实现的最低值之一。简而言之,这意味着任务正在与存储数据的节点不同的节点中执行。

作为解决方案,我会尝试以下几件事:

  • 通过使用增加分区数量repartition()或通过 Spark 设置spark.sql.shuffle.partitions达到满足上述要求的数量,即 1000 或更多。
  • 改变存储数据的方式并引入分区数据,即日/月/年使用partitionBy
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark SQL 执行缓慢且资源空闲 的相关文章

  • Spark-获取RDD中的文件名

    我正在尝试处理每天都在增长的 4 个文本文件目录 我需要做的是 如果有人试图搜索发票号码 我应该给他们包含该发票号码的文件列表 我能够通过将文本文件加载为 RDD 来映射和减少文本文件中的值 但是如何获取文件名和其他文件属性呢 从 Spar
  • Spark 和 Ipython 中将非数字特征编码为数字的问题

    我正在做一些我必须做出预测的事情numeric数据 每月员工支出 使用non numeric特征 我在用Spark MLlibs Random Forests algorthim 我有我的features数据在一个dataframe看起来像
  • Pyspark显示最大值(S)和多重排序

    感谢这里的一些帮助 使用Pyspark 请不能使用SQL 所以我有一个存储为 RDD 对的元组列表 城市1 2020 03 27 X1 44 城市1 2020 03 28 X1 44 City3 2020 03 28 X3 15 City4
  • scala/spark 代码不允许在 hive 中添加列

    如果源数据有新列 我尝试在 Hive 表中添加一列 所有新列的检测都运行良好 但是 当我尝试将列添加到目标表时 我收到此错误 for f lt df schema fields if f name chk spark sqlContext
  • 如何从 SparkSQL DataFrame 中的 MapType 列获取键和值

    我的镶木地板文件中有数据 该文件有 2 个字段 object id String and alpha Map lt gt 它被读入 SparkSQL 中的数据帧 其架构如下所示 scala gt alphaDF printSchema ro
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • 使用多行选项和编码选项读取 CSV

    在 azure Databricks 中 当我使用以下命令读取 CSV 文件时multiline true and encoding SJIS 似乎编码选项被忽略了 如果我使用multiline选项 Spark 使用默认值encoding那
  • pyspark 中的 Pandas UDF

    我正在尝试在 Spark 数据帧上填充一系列观察结果 基本上我有一个日期列表 我应该为每个组创建缺失的日期 在熊猫中有reindex函数 这是 pyspark 中不可用的 我尝试实现 pandas UDF pandas udf schema
  • Spark Scala 将列从一个数据帧复制到另一个数据帧

    我有一个原始数据框的修改版本 我在其上进行了聚类 现在我想将预测列恢复为原始 DF 索引没问题 因此匹配 我该怎么做 使用这段代码我得到一个错误 println Predicted dfWithOutput show println Ori
  • 如何在Spark结构化流中指定批处理间隔?

    我正在使用 Spark 结构化流并遇到问题 在 StreamingContext DStreams 中 我们可以定义批处理间隔 如下所示 from pyspark streaming import StreamingContext ssc
  • Spark的distinct()函数是否仅对每个分区中的不同元组进行洗牌

    据我了解 distinct 哈希分区 RDD 来识别唯一键 但它是否针对仅移动每个分区的不同元组进行了优化 想象一个具有以下分区的 RDD 1 2 2 1 4 2 2 1 3 3 5 4 5 5 5 在此 RDD 上的不同键上 所有重复键
  • AWS EMR Spark Python 日志记录

    我正在 AWS EMR 上运行一个非常简单的 Spark 作业 但似乎无法从我的脚本中获取任何日志输出 我尝试过打印到 stderr from pyspark import SparkContext import sys if name m
  • 如何在spark Streaming中定期更新rdd

    我的代码是这样的 sc SparkContext ssc StreamingContext sc 30 initRDD sc parallelize path to data lines ssc socketTextStream local
  • 行类型 Spark 数据集的编码器

    我想写一个编码器Row https spark apache org docs 2 0 0 api java index html org apache spark sql Row html输入 DataSet 用于我正在执行的地图操作 本
  • Spark EC2 SSH连接错误SSH返回代码255

    每次我尝试通过 Spark ec2 spark ec2 py 文件在 AWS 上启动 Spark 集群时 都会收到 SSH 连接错误 最终解决了 但是浪费了很多时间 在您将其标记为重复之前 我知道有很多类似的问题被问到 但有两个关键区别 a
  • 如何将 DataFrame 作为输入传递给 Spark UDF?

    我有一个数据框 我想对每一行应用一个函数 该函数依赖于其他数据帧 简化的例子 我有如下三个数据框 df sc parallelize a b 1 c d 3 toDF feat1 feat2 value df other 1 sc para
  • Spark中如何获取map任务的ID?

    Spark中有没有办法获取map任务的ID 例如 如果每个映射任务都调用用户定义的函数 我可以从该用户定义的函数中获取该映射任务的 ID 吗 我不确定您所说的地图任务 ID 是什么意思 但您可以使用以下方式访问任务信息TaskContext
  • Apache Spark:Yarn 日志分析

    我有一个 Spark streaming 应用程序 我想使用 Elasticsearch Kibana 分析作业的日志 我的工作在纱线集群上运行 因此日志将按照我的设置写入 HDFSyarn log aggregation enable为真
  • 将 DStream 转换为 JavaDStream

    我知道我们有一个选择RDD JavaRDD
  • 如何在 Scala 中将 DataFrame 模式写入文件

    我有一个 DataFrame 它从一个巨大的 json 文件加载并从中获取架构 该架构基本上大约有 1000 列 我希望将 printSchema 的相同输出保存在文件中而不是控制台中 有任何想法吗 如果您在本地环境中工作 您可以执行以下操

随机推荐