几天前我发布了另一个类似的问题:
- 如何在启动Spark Streaming进程时加载历史数据,并计算运行聚合
我现在至少设法得到了一个“有效”的解决方案,这意味着该过程本身似乎可以正常工作。但是,由于我是 Spark 的初学者,我似乎错过了一些关于如何以正确的方式(性能/计算方面)构建此类应用程序的事情......
我想做的事:
应用程序启动时从 ElasticSearch 加载历史数据
使用 Spark Streaming 在启动时开始侦听 Kafka 主题(包含销售事件,以 JSON 字符串形式传递)
- 对于每个传入的 RDD,对每个用户进行聚合
- 将 3. 的结果与历史结合起来
- 汇总每个用户的新值,例如总收入
- 使用 5. 的结果作为下一次迭代的新“历史记录”
我的代码如下:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.elasticsearch.spark.sql._
import org.apache.log4j.Logger
import org.apache.log4j.Level
object ReadFromKafkaAndES {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("kafka").setLevel(Level.WARN)
val checkpointDirectory = "/tmp/Spark"
val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[4]")
conf.set("es.nodes", "localhost")
conf.set("es.port", "9200")
val topicsSet = Array("sales").toSet
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(15))
ssc.checkpoint(checkpointDirectory)
//Create SQLContect
val sqlContext = new SQLContext(sc)
//Get history data from ES
var history = sqlContext.esDF("data/salesaggregation")
//Kafka settings
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
// Create direct kafka stream with brokers and topics
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
//Iterate
messages.foreachRDD { rdd =>
//If data is present, continue
if (rdd.count() > 0) {
//Register temporary table for the aggregated history
history.registerTempTable("history")
println("--- History -------------------------------")
history.show()
//Parse JSON as DataFrame
val saleEvents = sqlContext.read.json(rdd.values)
//Register temporary table for sales events
saleEvents.registerTempTable("sales")
val sales = sqlContext.sql("select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId")
println("--- Sales ---------------------------------")
sales.show()
val agg = sqlContext.sql("select a.userId, max(a.latestSaleTimestamp) as latestSaleTimestamp, sum(a.totalRevenue) as totalRevenue, sum(a.totalPoints) as totalPoints from ((select userId, latestSaleTimestamp, totalRevenue, totalPoints from history) union all (select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId)) a group by userId")
println("--- Aggregation ---------------------------")
agg.show()
//This is our new "history"
history = agg
//Cache results
history.cache()
//Drop temporary table
sqlContext.dropTempTable("history")
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
计算似乎正确:
--- History -------------------------------
+--------------------+--------------------+-----------+------------+------+
| latestSaleTimestamp| productList|totalPoints|totalRevenue|userId|
+--------------------+--------------------+-----------+------------+------+
|2015-07-22 10:03:...|Buffer(47, 1484, ...| 91| 12.05| 23|
|2015-07-22 12:50:...|Buffer(256, 384, ...| 41| 7.05| 24|
+--------------------+--------------------+-----------+------------+------+
--- Sales ---------------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp| totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
| 23|2015-07-29 09:17:...| 255.59| 208|
| 24|2015-07-29 09:17:...|226.08999999999997| 196|
+------+--------------------+------------------+-----------+
--- Aggregation ---------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp| totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
| 23|2015-07-29 09:17:...| 267.6400001907349| 299|
| 24|2015-07-29 09:17:...|233.14000019073484| 237|
+------+--------------------+------------------+-----------+
但如果应用程序运行多次迭代,我可以看到性能会下降:
![Streaming Graphs](https://i.stack.imgur.com/GQkZt.png)
我还看到大量跳过的任务,并且随着每次迭代而增加:
![Skipped tasks](https://i.stack.imgur.com/icbiv.png)
第一次迭代的图表看起来像
![enter image description here](https://i.stack.imgur.com/noayw.png)
第二次迭代的图表看起来像
![enter image description here](https://i.stack.imgur.com/25taz.png)
迭代次数越多,图表就会变得越长,并且会跳过很多步骤。
基本上,我认为问题在于存储下一次迭代的迭代结果。不幸的是,在尝试了很多不同的事情并阅读文档之后,我无法为此找到解决方案。热烈感谢任何帮助。谢谢!