记一次spark streaming+kafka 运行时间不稳定调优历程

2023-11-08

记一次spark streaming+kafka 运行时间不稳定调优历程

问题现象

首次使用spark streaming进行流式计算的时候遇到的一个问题,即spark streaming读取kafka消息进行流式计算, 但是在数据量比较大的情况下总会出现一些batch的process time比较长,但是大多数batch能在较短的时间内完成,而且全部的batch运行时间呈两个极端分布,要么很长要么很短。
在这里插入图片描述
如上图,运行时间曲线出现多处尖峰,而我们期望的一般是连续平滑的曲线。

先说明题主这边的运行环境状况:集群搭建是基于HDP2.6.3版本,其中spark版本是2.11, kafka版本是0.10,其中spark、kafka、HDFS共享集群资源,业务诉求是每5分钟触发一次batch进行统计,需要在5分钟内计算完成,因为从运行图上看卡顿时间大约几十秒,并不会影响最终5分钟内运行完的要求,因此前期阶段只是断断续续在找原因并没有纠缠,但是近期接到项目要求统计1分钟粒度数据,那这个问题就极有可能影响到最后的运行是否按时完成,经过一段颇费周折的排查,终于找到原因,并将过程记录下来希望帮到遇到类似问题的同学。

排障调优历程

1. 单机差异排查

通过streaming的web可看到运行的每个批的详细信息,我们注意到在运行时间上的批次里,基本都是少部分任务时间很长,大多数task还是很快的
在这里插入图片描述
可以看到75%的task在1秒内执行完,但是Max最大的确实41秒,首先猜想的是机器性能差异,但是经过排查集群机器性能并无区别,且每台机器上执行的task比较均衡,基本上排除了单机性能差异问题,且在耗时较长的批次中,可以发现耗时长的task每次回出现在不同的机器上,因此也从侧面排除了机器性能差异问题。

2. 网络因素排查

那么会不会是spark读取kafka网络问题呢

从这张图可以看到,读取kafka全部具有数据本地性PROCESS_LOCAL,且集群内网万兆口互联,网络不应该成为瓶颈。
在这里插入图片描述
且从这张图中可以排除数据量差异的原因以及GC耗时的原因。
数据倾斜加盐排查
既然数据量差异不是很大,接着对数据特征进行了筛查,通过最终打印多个批次的不同的key的数据量,我们发现并不存在时间长的批次里某些key的数据量突然增大的情况,且最后对数据量的较大的几个key加随机数分散处理,还是未能解决问题,也应证了不是数据倾斜的问题。
单次消费数据量会话超时消费速率排查
接着对kafka消费参数进行调节,主要有以下这么几个
加大session.timeout.ms、减小max.poll.records,延长会话超时时间,减少每次从kafka拉取的数据量,参数经过多次调整,未解决问题。

3. 集群重建问题复现排查

后面题主怀疑过是集群的问题,在今年3月份有了一批压测机器,我们在全新的环境照着之前的模式再搭了一套HDP集群起来,但是发现还是有同样的问题,真是令人沮丧,一度换衣是spark的bug了。
kafka读取超时压缩排查
后面再询问多为大数据圈内好友的经历后,建议修改了一个参数spark.streaming.kafka.consumer.poll.ms,默认512 将这个参数值改为了1000ms,神奇的现象出现了

有两点,第一,耗时长的批次的process time不会“那么长”了,只比正常运行的batch多了3秒,第二,多了3秒的批次日志会报如下异常

20/06/18 16:48:16 WARN TaskSetManager: Lost task 9.1 in stage 103632.0 (TID 621842, 172.16.0.44): java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-user-dinc global-biz-log 3 808993488 after polling for 512
        at scala.Predef$.assert(Predef.scala:170)
        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

很明显,大致意思是5000ms内未从kafka中获取到任何数据信息,接下来会进行重试,原因找到了,之前未获取到消息会一直阻塞到大约延迟40秒拿到消息运行成功,现在是5秒内拿不到消息就重试,不长时间等待。

解决方案
我们可以先修改两个参数临时解决这个问题,第一,减小spark.streaming.kafka.consumer.poll.ms参数到1000ms以内,即1秒超时就重试,第二,将spark.task.maxFailures改为10,默认值是4,加大重试次数,修改完这两个参数后基本上解决了这个问题,多数批次在阻塞重连后都能很快读到消息并运行成功。但这只是临时解决方案,kafka集群不稳定是最根本的原因,最后我们还是建议将kafka集群和计算、存储集群分开部署,减少CPU、IO密集对消息队列带来的不稳定影响。
原文链接:https://blog.csdn.net/u013716179/article/details/94330478

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

记一次spark streaming+kafka 运行时间不稳定调优历程 的相关文章

  • Spark集群安装部署

    目录 一 环境准备 二 安装步骤 三 使用Standalone模式 四 使用Yarn模式 一 环境准备 由于Spark仅仅是一种计算机框架 不负责数据的存储和管理 因此 通常都会将Spark和Hadoop进行统一部署 由Hadoop中的HD
  • 分类算法之朴素贝叶斯

    1 朴素贝叶斯分类算法 朴素贝叶斯 Naive Bayes NB 算法是基于贝叶斯定理与特征条件独立假设的分类方法 该算法是有监督的学习算法 解决的是分类问题 是将一个未知样本分到几个预先已知类别的过程 朴素贝叶斯的思想就是根据某些个先验概
  • windows下安装spark及hadoop

    windows下安装spark 1 安装jdk 2 安装scala 3 下载spark spark下载地址 3 1安装spark 将下载的文件解压到一个目录 注意目录不能有空格 比如说不能解压到C Program Files 作者解压到了这
  • spark集群搭建与mysql元数据管理

    找个spark集群搭建是针对于上一篇hadoop的基础上搭建的 所以spark的版本也是要按照着hadoop版本进行下载 1 解压spark 修改spark的 etc profile的home目录 2 安装SCALA 并配置SCALA HO
  • Spark(七)——累加器和广播变量

    5 累加器 通过在驱动器中调用SparkContext accumulator initialValue 方法 创建出存有初始值的累加器 返回值为org apache spark Accumulator T 对象 其中 T 是初始值 ini
  • spark创建maven工程创建scala目录并编译

    背景 我创建spark的maven工程的时候 在java目录同级还创建了一个scala目录 这就得考虑编译相关的事了 解决 1 创建source folder 如下图所示 直接创建就好了 2 编译带来的问题 编译的时候发现一个问题 就是在s
  • pyspark 连接远程hive集群配置

    今天本地spark连接远程hive集群 直接把配置导入进去 本地直接应用远程环境 1 安装spark 设置spark环境变量 2 拿到远程集群配置文件 将配置文件放在spark conf 目录下 xml 一共五个文件 3 将mysql co
  • Compressed Sparse Column format(CSC)

    CSR Compressed Sparse Row format 和CSC Compressed Spare Column format 都是一种稀疏矩阵的存储格式 这里分别给出实例 假设有如下矩阵 1360
  • 广电用户画像分析之根据用户行为数据进行筛选与标签添加

    在数据处理和分析领域 我们经常需要根据用户的行为数据进行筛选和标签添加 以便更好地理解用户行为和偏好 在本篇博客中 我们将介绍两个示例 展示如何根据用户的收视行为数据和订单信息进行数据处理和分析 前情提要 数据集分析 广电用户画像分析之探索
  • 大数据相关常用软件下载地址集锦

    文章目录 每日一句正能量 前言 一 软件下载地址如下 二 文档地址如下 结语 每日一句正能量 生命中有一些人与我们擦肩了 却来不及遇见 遇见了 却来不及相识 相识了 却来不及熟悉 熟悉了 却还是要说再见 前言 由于大数据开发中经常需要用到Z
  • Spark SQL 之 Temporary View

    Spark SQL 之 Temporary View spark SQL的 temporary view 是支持原生SQL 的方式之一 spark SQL的 DataFrame 和 DataSet 均可以通过注册 temporary vie
  • 【Spark NLP】第 7 章:分类和回归

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-3)

    文章目录 1 3 Scala的数据结构 1 3 1 数组 数组的遍历 数组转换 1 3 2 元组 创建元组 获取元组中的值 拉链操作 1 3 3 集合 List Set Map 1 3 Scala的数据结构 对于每一门编程语言来说 数组 A
  • Spark 从入门到放弃(一)Spark基础概念

    一 Spark基础概念 1 Application Spark应用程序 application 应用 其实就是用spark submit提交的程序 一个application通常包含三部分 从数据源 比方说HDFS 取数据形成RDD 通过R
  • 学习笔记-Spark环境搭建与使用

    一 20 04 Ubuntu安装 清华源ISO源 https mirrors tuna tsinghua edu cn ubuntu releases 20 04 下载链接 https mirrors tuna tsinghua edu c
  • 使用Flink1.16.0的SQLGateway迁移Hive SQL任务

    使用Flink的SQL Gateway迁移Hive SQL任务 前言 我们有数万个离线任务 主要还是默认的DataPhin调度CDP集群的Hive On Tez这种低成本任务 当然也有PySpark 打Jar包的Spark和打Jar包的Fl
  • Flume之:二、企业开发案例

    Flume之 二 企业开发案例 文章目录 Flume之 二 企业开发案例 三 企业开发案例 1 监控端口数据官方案例 2 实时读取本地文件到HDFS案例 3 实时读取目录文件到HDFS案例 4 flume监控Kafka gt Spark知识
  • spark groupByKey和groupBy,groupByKey和reduceByKey的区别

    1 groupByKey Vs groupBy 用于对pairRDD按照key进行排序 author starxhong object Test def main args Array String Unit val sparkConf n
  • Spark 配置

    文章目录 1 Spark 配置 1 1 Spark 属性 1 1 1 动态加载Spark属性 1 1 2 查看Spark属性 1 2 环境变量 2 重新指定配置文件目录 3 继承Hadoop集群配置 4 定制的Hadoop Hive配置 1
  • 2023_Spark_实验二十九:Flume配置KafkaSink

    实验目的 掌握Flume采集数据发送到Kafka的方法 实验方法 通过配置Flume的KafkaSink采集数据到Kafka中 实验步骤 一 明确日志采集方式 一般Flume采集日志source有两种方式 1 Exec类型的Source 可

随机推荐