Spark常见错误剖析与应对策略

2023-11-11

问题一:

日志中出现:org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0

原因分析:
shuffle分为shuffle write和shuffle read两部分。
shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。
shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。
shuffle read的时候数据的分区数则是由spark提供的一些参数控制。可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即使不会导致JVM crash也会造成长时间的gc。
解决方案:
1、减少shuffle数据
主要从代码层面着手,可以将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。
2、修改分区
通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度适当提高这个值,例如500。
3、增加失败的重试次数和重试的时间间隔
通过spark.shuffle.io.maxRetries控制重试次数,默认是3,可适当增加,例如10。
通过spark.shuffle.io.retryWait控制重试的时间间隔,默认是5s,可适当增加,例如10s。
4、提高executor的内存
在spark-submit提交任务时,适当提高executor的memory值,例如15G或者20G。

问题二: 日志中出现:Caused by: org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

原因分析:
从上述日志中可以看出在ShuffleMapStage阶段,也就是ShuffleRead阶段,在Driver在向各个Executor广播输入数据时候,出现了超时现象。
解决方案:
1、适当增加超时时间:spark.sql.broadcastTimeout=800
2、适当增加重试次数:spark.sql.broadcastMaxRetries=3
3、关闭广播变量join:set spark.sql.autoBroadcastJoinThreshold = -1

问题三: 日志中出现:org.apache.spark.sql.catalyst.parser.ParseException

原因分析:
spark在做sql转化时报错。
解决方案:
检查sql是否书写正确

问题四: 日志中出现:SparkException: Could not find CoarseGrainedScheduler

原因分析:
这是一个资源问题应该给任务分配更多的cores和executors,并且分配更多的内存。并且需要给RDD分配更多的分区
解决方案:
1、调大一下资源和cores和executers的数量
2、在配置资源中加入这句话也许能解决你的问题:
–conf spark.dynamicAllocation.enabled=false

问题五: 日志中出现:Exception in thread “main” java.lang.NoSuchMethodError: scala.collection.immutable. c o l o n colon coloncolon.tl$1()Lscala/collection/immutable/List;

原因分析:
scala版本不一致问题
解决方案:
1、通过给spark任务指定相同版本的镜像
–conf spark.kubernetes.container.image=镜像地址

问题六: 日志中出现:org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 9478 tasks (1024.1 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)

原因分析:
序列化结果集的大小超过了spark任务默认的最大结果集大小(默认spark.driver.maxResultSize为1g)
解决方案:
1、增加spark.driver.maxResultSize的大小
–conf spark.driver.maxResultSize=2g

问题七: 日志中出现:The executor with id 12 exited with exit code 137

原因分析:
executor内存溢出(oom)
解决方案:
1、增加executor内存
示例参数:–conf spark.executor.memory=10g
注:少部分情况为堆外内存(overhead memory)不足,需要增加堆外内存
示例参数:–conf spark.executor.memoryOverhead=5g

问题八: WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local): ExecutorLostFailure (executor lost) WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217): java.io.IOException: Connection from /192.168.47.217:55483 closed java.util.concurrent.TimeoutException: Futures timed out after [120 second ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong

原因分析:
TaskSetManager: Lost task & TimeoutException
因为网络或者gc的原因,worker或executor没有接收到executor或task的心跳反馈
解决方案:
1、提高 spark.network.timeout 的值,根据情况改成300(5min)或更高
2、配置所有网络传输的延时,如果没有主动设置以下参数,默认覆盖其属性

问题九: 日志中出现:java.lang.OutOfMemoryError: Not enough memory to build and broadcast

原因分析:
Driver 端OOM。
Driver 端的 OOM 逃不出 2 类病灶:
创建的数据集超过内存上限
收集的结果集超过内存上限
广播变量在创建的过程中,需要先把分布在所有 Executors 的数据分片拉取到 Driver 端,然后在 Driver 端构建广播变量,最后 Driver 端把封装好的广播变量再分发给各个 Executors。第一步的数据拉取其实就是用 collect 实现的。如果 Executors 中数据分片的总大小超过 Driver 端内存上限也会报 OOM。
解决方案:
增加driver端的内存大小

问题十: java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance

原因分析:
executor端OOM
User Memory 用于存储用户自定义的数据结构,如数组、列表、字典等。因此,如果这些数据结构的总大小超出了 User Memory 内存区域的上限,就会出现这样的报错。

问题十一: spark sql 执行insert overwrite的时候,出现数据重复。

原因分析:
Spark SQL在执行SQL的overwrite的时候并没有删除旧的的数据文件(Spark SQL生成的数据文件),Spark SQL写入Hive的流程如下:

1.Spark写入Hive会先生成一个临时的_temporary目录用于存储生成的数据文件,全部生成完毕后全部移动到输出目录,然后删除_temporary目录,最后创建Hive元数据(写分区);
2.Spark写入数据任务使用了同一个_temporary目录,导致其中一个完成数据生成和移动到Hive路径之后删除_temporary目录失败(任务被kill掉了),进一步导致数据已经到了但是元数据没有创建。
3.上一个任务虽然生成了数据文件但是没有元数据,则后一个任务的overwrite找不到元数据因此无法删除Hive路径下的数据文件(第二个任务会任务目录下没有数据生成)
4.当最后一个执行完成的Spark插入任务结束后,此时Hive路径下已经移动过来多个任务的数据文件,由于已经没有正在执行的Spark写任务,因此删除_temporary目录成功,创建元数据成功,结果就是这个元数据对应了该Hive路径下所有版本的数据文件。

问题十二: Spark任务正常执行10分钟左右,但是偶尔会出现任务运行时间过长比如5个小时左右

原因分析:

通过spark ui看到spark任务的task运行都是在10分钟左右,有一个task运行时间达到了5.4h一直没有运行完成。
解决方案:
设置这个参数spark.speculation=true;
原理:在Spark中任务会以DAG图的方式并行执行,每个节点都会并行的运行在不同的executor中,但是有的任务可能执行很快,有的任务执行很慢,比如网络抖动、性能不同、数据倾斜等等。有的Task很慢就会成为整个任务的瓶颈,此时可以触发 推测执行 (speculative) 功能,为长时间的task重新启动一个task,哪个先完成就使用哪个的结果,并Kill掉另一个task。

问题十三: org.apache.spark.shuffle.FetchFailedException: The relative remote executor(Id: 21), which maintains the block data to fetch is dead.

原因分析:
资源不足导致executor没有心跳,driver就判定其丢失,就去连其他的executor,但其他的因为配置都一样,所以也连不上。重试n次后,就会报错

解决方案:
减少使用触发shuffle的操作,例如reduceByKey,从而减少使用内存
增大spark.network.timeout,从而允许有更多时间去等待心跳响应
增加spark.executor.cores,从而减少创建的Executor数量,使得总使用内存减少
同时增大spark.executor.memory,保证每个Executor有足够的可用内存
增大spark.shuffle.memoryFraction,默认为0.2(需要spark.memory.useLegacyMode配置为true,适用于1.5或更旧版本,已经deprecated)
例:
-conf spark.driver.memory=10g —conf spark.executor.cores=2 --conf spark.executor.memory=24g --conf spark.executor.memoryOverhead=4g --conf spark.default.parallelism=1500 --conf spark.sql.shuffle.partitions=1500 —conf spark.network.timeout=300

问题十四: java.io.IOException: java.io.EOFException: Unexpected end of input stream

原因分析:
spark任务输入数据异常,spark任务读取gz格式压缩的csv文件时,由于存在异常数据发生报错。gz格式压缩的文件存在空数据

解决方案:
1.定位到异常数据清除即可
2.过滤异常数据直接写入

问题十五: Exception in thread “main” java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)[Ljava/lang/Object;

原因分析:
scala版本不一致

解决方案:
更换 服务scala版本一致的镜像

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark常见错误剖析与应对策略 的相关文章

  • 在 HIVE 中查找函数

    我想检查一个字段是否包含字符串 我想要一个如下所示的函数 FIND string to find field to search 我的数据如下所示 field to search no match in this string record
  • 将 Hive 表导出到 hdfs 中的 csv

    我知道在 Hive 中将表保存到 csv 或其他文本文件 时 分隔符存在一个已知问题 所以我想知道你们是否可以帮助我解决这个问题 我有一个现有的表 表 A 我想将其以 csv 格式保存到 hdfs 通过阅读其他回复 我相信我必须首先创建一个
  • hive 查询特定联合类型的记录

    我创建了一个示例配置单元表 CREATE TABLE union test foo UNIONTYPE
  • Hive:转换“yyyy-MM-dd'T'HH:mm:ss.SSS'Z'”中缺少秒数的字符串日期时间

    我使用以下代码将字符串日期时间变量转换为日期时间 但转换后的字符串缺少 SSS 部分 使用的代码 cast FROM UNIXTIME UNIX TIMESTAMP oldtime yyyy MM dd T HH mm ss SSS Z y
  • Hive 表的默认分隔符是什么?

    如果我们在创建表时不提及任何分隔符 hive 是否有默认分隔符 创建表日志 ts bigint 行字符串 按 dt 字符串 国家 地区字符串 分区 默认分隔符 001 如果创建hive表时没有设置 您可以将其更改为其他分隔符 例如 hive
  • Hive 上的自定义 MapReduce 程序,规则是什么?输入和输出怎么样?

    我被困了几天 因为我想根据我在 hive 上的查询创建一个自定义的地图缩减程序 在谷歌搜索后我发现没有太多例子 而且我仍然对规则感到困惑 创建自定义 MapReduce 程序的规则是什么 映射器和减速器类怎么样 任何人都可以提供任何解决方案
  • 在 IDEA 中运行 Spark on Hive 项目期间创建事务连接工厂时出错

    我正在尝试为 Spark Streaming 项目设置一个开发环境 该项目需要将数据写入 Hive 我有一个包含 1 个主设备 2 个从设备和 1 台开发机器的集群 在 Intellij Idea 14 中编码 在 Spark shell
  • Hive如何存储数据,什么是SerDe?

    当查询表时 SerDe 将将文件中的字节中的一行数据反序列化为 Hive 内部使用的对象来操作该行数据 执行 INSERT 或 CTAS 时 请参阅第 441 页上的 导入数据 表的 SerDe 将将 Hive 的一行数据的内部表示序列化为
  • 当气流 initdb 时,导入错误:无法导入名称 HiveOperator

    我最近安装了airflow对于我的工作流程 在创建项目时 我执行了以下命令 airflow initdb 返回以下错误 2016 08 15 11 17 00 314 init py 36 INFO Using executor Seque
  • hive创建表的多个转义字符

    我正在尝试将带有管道分隔符的 csv 加载到配置单元外部表 数据值包含单引号 双引号 括号等 使用 Open CSV 版本 2 3 测试文件 csv id name phone 1 Rahul 123 2 Kumar s 456 3 Nee
  • 如何通过Python访问Hive?

    https cwiki apache org confluence display Hive HiveClient HiveClient Python https cwiki apache org confluence display Hi
  • 在 HIVE 中,使用 COALESCE 将 Null 值替换为相同的列值

    我想用同一列中的值替换特定列的空值我想得到结果 我在下面尝试过 select d day COALESCE val LAST VALUE val TRUE OVER ORDER BY d day ROWS BETWEEN UNBOUNDED
  • 使用 Hiveql 循环

    我正在尝试合并 2 个数据集 例如 A 和 B 数据集 A 有一个变量 Flag 它有 2 个值 我并没有只是将两个数据合并在一起 而是尝试根据 标志 变量合并两个数据集 合并代码如下 create table new data as se
  • hive - 在值范围之间将一行拆分为多行

    我在下面有一张表 想按从开始列到结束列的范围拆分行 即 id 和 value 应该对开始和结束之间的每个值重复 包括两者 id value start end 1 5 1 4 2 8 5 9 所需输出 id value current
  • <问题> Hive 中的浮点数据类型

    初始化数据 CREATE TABLE test test data user VARCHAR 10 amount FLOAT TBLPROPERTIES transactional true INSERT INTO test test da
  • Sqoop 导出分区的 Hive 表

    我在尝试导出分区的 Hive 表时遇到了一些问题 这是否完全受支持 我尝试用谷歌搜索并找到一张 JIRA 票证 sqoop export connect jdbc mysql localhost testdb table sales exp
  • 适用于 Python 3.x 的 Hive 客户端

    是否可以使用 Python 3 x 连接到 hadoop 并运行 hive 查询 我正在使用Python 3 4 1 我发现可以按照这里写的方式完成 https cwiki apache org confluence display Hiv
  • Hive 聚集在多个列上

    据我所知 当配置单元表聚集在一列上时 它会执行该分桶列的哈希函数 然后将该行数据放入其中一个桶中 每个桶都有一个文件 即如果有 32 个桶 那么 hdfs 中就有 32 个文件 将 clustered by 放在多个列上意味着什么 例如 假
  • hive 中的授予权限在 hdp2.2 上不起作用

    我正在 CentOS 6 5 上使用 Ambari 设置来试验 HDP2 2 集群 但在运行 Hive GRANT 查询时遇到问题 例如 一个查询 grant select on Tbl1 to user root 给了我一个看起来像这样的
  • 将数据从 .txt 文件加载到 Hive 中以 ORC 形式存储的表

    我有一个数据文件位于 txt格式 我正在使用该文件将数据加载到 Hive 表中 当我将文件加载到类似表中时 CREATE TABLE test details txt visit id INT store id SMALLINT STORE

随机推荐