从 Kafka 进行流聚合时运行“死锁”

2023-12-03

几天前我发布了另一个类似的问题:

  • 如何在启动Spark Streaming进程时加载历史数据,并计算运行聚合

我现在至少设法得到了一个“有效”的解决方案,这意味着该过程本身似乎可以正常工作。但是,由于我是 Spark 的初学者,我似乎错过了一些关于如何以正确的方式(性能/计算方面)构建此类应用程序的事情......

我想做的事:

  1. 应用程序启动时从 ElasticSearch 加载历史数据

  2. 使用 Spark Streaming 在启动时开始侦听 Kafka 主题(包含销售事件,以 JSON 字符串形式传递)

  3. 对于每个传入的 RDD,对每个用户进行聚合
  4. 将 3. 的结果与历史结合起来
  5. 汇总每个用户的新值,例如总收入
  6. 使用 5. 的结果作为下一次迭代的新“历史记录”

我的代码如下:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.elasticsearch.spark.sql._
import org.apache.log4j.Logger
import org.apache.log4j.Level

object ReadFromKafkaAndES {
  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("kafka").setLevel(Level.WARN)

    val checkpointDirectory = "/tmp/Spark"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[4]")
    conf.set("es.nodes", "localhost")
    conf.set("es.port", "9200")

    val topicsSet = Array("sales").toSet

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(15))
    ssc.checkpoint(checkpointDirectory)

    //Create SQLContect
    val sqlContext = new SQLContext(sc)

    //Get history data from ES
    var history = sqlContext.esDF("data/salesaggregation")

    //Kafka settings
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

    // Create direct kafka stream with brokers and topics
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    //Iterate
    messages.foreachRDD { rdd =>

      //If data is present, continue
      if (rdd.count() > 0) {

        //Register temporary table for the aggregated history
        history.registerTempTable("history")

        println("--- History -------------------------------")
        history.show()

        //Parse JSON as DataFrame
        val saleEvents = sqlContext.read.json(rdd.values)

        //Register temporary table for sales events
        saleEvents.registerTempTable("sales")

        val sales = sqlContext.sql("select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId")

        println("--- Sales ---------------------------------")
        sales.show()

        val agg = sqlContext.sql("select a.userId, max(a.latestSaleTimestamp) as latestSaleTimestamp, sum(a.totalRevenue) as totalRevenue, sum(a.totalPoints) as totalPoints from ((select userId, latestSaleTimestamp, totalRevenue, totalPoints from history) union all (select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId)) a group by userId")

        println("--- Aggregation ---------------------------")
        agg.show()

        //This is our new "history"
        history = agg

        //Cache results
        history.cache()

        //Drop temporary table
        sqlContext.dropTempTable("history")

      }

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

计算似乎正确:

--- History -------------------------------
+--------------------+--------------------+-----------+------------+------+
| latestSaleTimestamp|         productList|totalPoints|totalRevenue|userId|
+--------------------+--------------------+-----------+------------+------+
|2015-07-22 10:03:...|Buffer(47, 1484, ...|         91|       12.05|    23|
|2015-07-22 12:50:...|Buffer(256, 384, ...|         41|        7.05|    24|
+--------------------+--------------------+-----------+------------+------+

--- Sales ---------------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp|      totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
|    23|2015-07-29 09:17:...|            255.59|        208|
|    24|2015-07-29 09:17:...|226.08999999999997|        196|
+------+--------------------+------------------+-----------+

--- Aggregation ---------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp|      totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
|    23|2015-07-29 09:17:...| 267.6400001907349|        299|
|    24|2015-07-29 09:17:...|233.14000019073484|        237|
+------+--------------------+------------------+-----------+

但如果应用程序运行多次迭代,我可以看到性能会下降:

Streaming Graphs

我还看到大量跳过的任务,并且随着每次迭代而增加:

Skipped tasks

第一次迭代的图表看起来像

enter image description here

第二次迭代的图表看起来像

enter image description here

迭代次数越多,图表就会变得越长,并且会跳过很多步骤。

基本上,我认为问题在于存储下一次迭代的迭代结果。不幸的是,在尝试了很多不同的事情并阅读文档之后,我无法为此找到解决方案。热烈感谢任何帮助。谢谢!


该流作业并未处于“死锁”状态,但其执行时间随着每次迭代呈指数级增长,导致流作业迟早会失败。

RDD 上的 union->reduce->union->reduce... 迭代过程创建了不断增加的 RDD 谱系。每次迭代都会增加对需要在下一次迭代中计算的谱系的依赖关系,也会导致执行时间增加。依赖性(谱系)图清楚地表明了这一点。

一种解决方案是定期对 RDD 进行检查点。

history.checkpoint()

您还可以探索通过以下方式替换 union/reduce 过程updateStateByKey

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 Kafka 进行流聚合时运行“死锁” 的相关文章

随机推荐

  • 同时从麦克风录制音频并在Python中播放效果

    我的目标是通过笔记本电脑麦克风录制我的声音 并同时在 python 中添加效果 我需要的是类似于音乐效果踏板 您可以在其中连接吉他或麦克风 并添加混响 回声或失真等 我正在使用 pyaudio 和 wave 来录制和播放音频 使用 scik
  • 如何修复 Eclipse Java 虚拟机启动器错误?

    就像标题所说 我遇到了 eclipse JVM 启动器的问题 昨晚 一切正常 据我所知 从那时起我的电脑上没有任何变化 然而 今天早上我醒来发现 当我在 Eclipse 中运行任何项目时 我收到了这个错误 Java Virtual Mach
  • Android 删除本地化

    我在 Play 商店中的应用程序没有除默认版本之外的任何本地化版本 美国英语 这次我创建了一个新的apk来更新它 它使用android support v7 appcompat图书馆 因此 当我上传新的 apk 并在 apk 详细信息中显示
  • 为什么此查询会导致 Oracle 中的合并笛卡尔连接

    我最近有一个查询需要修改 这是原文 SELECT RTRIM position AS POSITION Other fields FROM schema table x WHERE hours gt 0 AND pay RGW AND NO
  • 将大量数据从 C# 导出到 Excel 的最佳/最快方法是什么

    我有使用 OpenXML 库导出数据的代码 我有 20 000 行和 22 列 这需要很长时间 大约 10 分钟 有没有任何解决方案可以将数据从 C 导出到 Excel 速度会更快 因为我是从 ASP NET MVC 应用程序执行此操作 而
  • IMFSinkWriter无法导出mp4的大尺寸视频

    我的 Windows MFC 应用程序具有导出视频文件的功能 并且它可以选择编码格式 通过WMV or MP4 和框架尺寸 但是 不幸的是当我尝试导出时MP4每次都设置大帧大小的文件MF E INVALIDMEDIATYPE发生了 简而言之
  • 正则表达式:按逗号分割,但排除括号和引号内的逗号(单引号和双引号)

    我有一根绳子 5 5 5 C A B A B B A A B C A B 我想用逗号分割它 但需要排除括号和引号内的逗号 单引号和双引号 像这样 5 5 5 C A B A B B A A B C A B 使用java正则表达式如何实现这一
  • AWS:将 Cognito 授权用户限制为特定 Lambda 函数

    我正在使用 AWS 并且有以下设置 UserPool API网关 Lambda函数 API 网关使用 UserPool 授权者来保护 lambda 函数 到目前为止 这是有效的 现在我想将每个 lambda 函数限制为特定的用户组 因此 我
  • Apple 的静默推送通知可以在后台启动我的应用程序吗?

    根据苹果的文档 我可以通过添加来注册静默通知 content available 1键值对aps有效负载字典 我希望我的应用程序在无声通知到达时在后台唤醒 我设置App downloads content in response to pu
  • 在 Azure ML 上安装附加 R 包

    我正在执行以下步骤将 R Hash 2 2 6 zip 包安装到 Azure ML 将 zip 文件作为数据集上传 创建一个新实验并添加 执行 R 脚本 进行实验 拖放 zip 文件数据集进行实验 将步骤3中的数据集连接到步骤2的 执行R脚
  • 泽西岛和 HK2 服务定位器

    我正在尝试在 Application 构造函数 从 ResourceConfig 继承的东西 中初始化 Jersey 应用程序中的一些组件 看起来像这样 public Application Context ServletContext c
  • 如何将数据从 Webform 页面发布到 HTTPHandler.ashx 文件?

    我有一个 Web 应用程序项目来支持到供应商产品后端的文件传输操作 它由 2 个 HTTPHandler 文件组成 在带有 IIS 6 0 的 Win2003 服务器上编译成网站 上传处理程序 ashx 下载处理程序 ashx 这些文件从公
  • pyCuda,发送多个单变量参数的问题

    我这里有一个 pycuda 程序 它从命令行读取图像并保存反转颜色的版本 import pycuda autoinit import pycuda driver as device from pycuda compiler import S
  • mem_fun和bind1st问题

    我有以下课程 class A public ctr and etc A clone B container 现在 我有一个vector
  • Firestore,如何构建“likedBy”查询

    我在思考如何最好地构建我的 非常简单的 Firestore 应用程序时遇到了一些困难 我有一组这样的用户 users A123 name Adam B234 name Bella C345 name Charlie 每个用户都可以 喜欢 或
  • 使两个角圆化和两个直切而不是颤振中的曲线

    我怎样才能在颤振中制作下面的瓷砖状设计 两侧有一点弯曲 两侧的其余部分是直线切割的柔和曲线 我可以制作两个圆角和两个非圆角 但无法制作如下所示的一个 任何人都知道如何制作这样的瓷砖 我已经使用了 RoundRect ClipRRect 和
  • 如何在 C# 中向 UserControl 添加事件?

    我有一个包含 3 个标签的 UserControl 我想为其添加一个事件 该事件在标签之一的文本更改时发生 我正在使用 Visual Studio 2010 首先 您需要在类中声明事件 以及您的方法和构造函数 public event Ev
  • 创建 8 位图像

    我正在尝试创建具有纯背景色的 8 位图像 看起来应该非常简单 但文件上的详细信息将其列为 32 位颜色深度 我缺少什么 public void CreateImage var bmpOut new Bitmap 300 300 var g
  • SyntaxError:创建 virtualenv 时语法无效

    我想为 python 2 7 创建一个 virtualenv 我使用的是 3 7 我以管理员身份运行 cmd 在 Windows 10 上 C WINDOWS system32 gt virtualenv p C Python27 pyth
  • 从 Kafka 进行流聚合时运行“死锁”

    几天前我发布了另一个类似的问题 如何在启动Spark Streaming进程时加载历史数据 并计算运行聚合 我现在至少设法得到了一个 有效 的解决方案 这意味着该过程本身似乎可以正常工作 但是 由于我是 Spark 的初学者 我似乎错过了一