第一个解决方案基于Spark Datasets
and SparkSQL
并提供预期结果。
有很多方法可以配置这种方法,甚至考虑到性能问题,这可能会在稍后讨论。
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
object basic {
val spark = SparkSession
.builder()
.appName("Sample app")
.master("local")
.config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
.getOrCreate()
val sc = spark.sparkContext
case class Owner(car: String, pcode: String, qtty: Double)
case class Invoice(car: String, pcode: String, qtty: Double)
def main(args: Array[String]): Unit = {
val data = Seq(
Owner("A", "666", 80),
Owner("B", "555", 20),
Owner("A", "444", 50),
Owner("A", "222", 20),
Owner("C", "444", 20),
Owner("C", "666", 80),
Owner("C", "555", 120),
Owner("A", "888", 100)
)
val fleet = Seq(
Invoice("A", "666", 15),
Invoice("C", "666", 10),
Invoice("A", "888", 12),
Invoice("B", "555", 200)
)
val expected = Seq(
Owner("A", "666", 65),
Owner("B", "555", 20), // not redistributed because produce a negative value
Owner("A", "444", 69.29),
Owner("A", "222", 27.71),
Owner("C", "444", 21.43),
Owner("C", "666", 70),
Owner("C", "555", 128.57),
Owner("A", "888", 88)
)
Logger.getRootLogger.setLevel(Level.ERROR)
try {
import spark.implicits._
val owners = spark.createDataset(data).as[Owner].cache()
val invoices = spark.createDataset(fleet).as[Invoice].cache()
owners.createOrReplaceTempView("owners")
invoices.createOrReplaceTempView("invoices")
/**
* this part fetch car and pcode from owner with the substracted quantity from invoice
*/
val p1 = spark.sql(
"""SELECT i.car,i.pcode,
|CASE WHEN (o.qtty - i.qtty) < 0 THEN o.qtty ELSE (o.qtty - i.qtty) END AS qtty,
|CASE WHEN (o.qtty - i.qtty) < 0 THEN 0 ELSE i.qtty END AS to_distribute
|FROM owners o
|INNER JOIN invoices i ON(i.car = o.car AND i.pcode = o.pcode)
|""".stripMargin)
.cache()
p1.createOrReplaceTempView("p1")
/**
* this part fetch all the car and pcode that we have to redistribute their quantity
*/
val p2 = spark.sql(
"""SELECT o.car, o.pcode, o.qtty
|FROM owners o
|LEFT OUTER JOIN invoices i ON(i.car = o.car AND i.pcode = o.pcode)
|WHERE i.car IS NULL
|""".stripMargin)
.cache()
p2.createOrReplaceTempView("p2")
/**
* this part fetch the quantity to distribute
*/
val distribute = spark.sql(
"""
|SELECT car, SUM(to_distribute) AS to_distribute
|FROM p1
|GROUP BY car
|""".stripMargin)
.cache()
distribute.createOrReplaceTempView("distribute")
/**
* this part fetch the proportion to distribute proportionally
*/
val proportion = spark.sql(
"""
|SELECT car, SUM(qtty) AS proportion
|FROM p2
|GROUP BY car
|""".stripMargin)
.cache()
proportion.createOrReplaceTempView("proportion")
/**
* this part join p1 and p2 with the distribution calculated
*/
val result = spark.sql(
"""
|SELECT p2.car, p2.pcode, ROUND(((to_distribute / proportion) * qtty) + qtty, 2) AS qtty
|FROM p2
|JOIN distribute d ON(p2.car = d.car)
|JOIN proportion p ON(d.car = p.car)
|UNION ALL
|SELECT car, pcode, qtty
|FROM p1
|""".stripMargin)
result.show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty |
+---+-----+------+
|A |444 |69.29 |
|A |222 |27.71 |
|C |444 |21.43 |
|C |555 |128.57|
|A |666 |65.0 |
|B |555 |20.0 |
|C |666 |70.0 |
|A |888 |88.0 |
+---+-----+------+
*/
expected
.toDF("car","pcode","qtty")
.show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty |
+---+-----+------+
|A |666 |65.0 |
|B |555 |20.0 |
|A |444 |69.29 |
|A |222 |27.71 |
|C |444 |21.43 |
|C |666 |70.0 |
|C |555 |128.57|
|A |888 |88.0 |
+---+-----+------+
*/
} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
}
使用 API 数据集
具有相同结果的此问题的另一种方法是使用Datasets
和它伟大的API
,作为一个例子:
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
object basic2 {
val spark = SparkSession
.builder()
.appName("Sample app")
.master("local")
.config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
.getOrCreate()
val sc = spark.sparkContext
final case class Owner(car: String, pcode: String, o_qtty: Double)
final case class Invoice(car: String, pcode: String, i_qtty: Double)
def main(args: Array[String]): Unit = {
val data = Seq(
Owner("A", "666", 80),
Owner("B", "555", 20),
Owner("A", "444", 50),
Owner("A", "222", 20),
Owner("C", "444", 20),
Owner("C", "666", 80),
Owner("C", "555", 120),
Owner("A", "888", 100)
)
val fleet = Seq(
Invoice("A", "666", 15),
Invoice("C", "666", 10),
Invoice("A", "888", 12),
Invoice("B", "555", 200)
)
val expected = Seq(
Owner("A", "666", 65),
Owner("B", "555", 20), // not redistributed because produce a negative value
Owner("A", "444", 69.29),
Owner("A", "222", 27.71),
Owner("C", "444", 21.43),
Owner("C", "666", 70),
Owner("C", "555", 128.57),
Owner("A", "888", 88)
)
Logger.getRootLogger.setLevel(Level.ERROR)
try {
import spark.implicits._
val owners = spark.createDataset(data)
.as[Owner]
.cache()
val invoices = spark.createDataset(fleet)
.as[Invoice]
.cache()
val p1 = owners
.join(invoices,Seq("car","pcode"),"inner")
.selectExpr("car","pcode","IF(o_qtty-i_qtty < 0,o_qtty,o_qtty - i_qtty) AS qtty","IF(o_qtty-i_qtty < 0,0,i_qtty) AS to_distribute")
.persist(StorageLevel.MEMORY_ONLY)
val p2 = owners
.join(invoices,Seq("car","pcode"),"left_outer")
.filter(row => row.anyNull == true)
.drop(col("i_qtty"))
.withColumnRenamed("o_qtty","qtty")
.persist(StorageLevel.MEMORY_ONLY)
val distribute = p1
.groupBy(col("car"))
.agg(sum(col("to_distribute")).as("to_distribute"))
.persist(StorageLevel.MEMORY_ONLY)
val proportion = p2
.groupBy(col("car"))
.agg(sum(col("qtty")).as("proportion"))
.persist(StorageLevel.MEMORY_ONLY)
val result = p2
.join(distribute, "car")
.join(proportion, "car")
.withColumn("qtty",round( ((col("to_distribute") / col("proportion")) * col("qtty")) + col("qtty"), 2 ))
.drop("to_distribute","proportion")
.union(p1.drop("to_distribute"))
result.show()
/*
+---+-----+------+
|car|pcode| qtty|
+---+-----+------+
| A| 444| 69.29|
| A| 222| 27.71|
| C| 444| 21.43|
| C| 555|128.57|
| A| 666| 65.0|
| B| 555| 20.0|
| C| 666| 70.0|
| A| 888| 88.0|
+---+-----+------+
*/
expected
.toDF("car","pcode","qtty")
.show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty |
+---+-----+------+
|A |666 |65.0 |
|B |555 |20.0 |
|A |444 |69.29 |
|A |222 |27.71 |
|C |444 |21.43 |
|C |666 |70.0 |
|C |555 |128.57|
|A |888 |88.0 |
+---+-----+------+
*/
} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
}
关于性能和调优的一些一般注意事项。
它始终取决于您的特定用例,但总的来说,首先,如果您可以过滤和清理数据,您可能会看到一些改进。
使用高级声明式 API 的一个重点是将自己与低级实现细节隔离。
优化是一项工作催化剂优化器 https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html。
这是一个复杂的引擎,我真的怀疑有人可以在不深入研究其内部结构的情况下轻松对其进行改进。
默认分区数财产:spark.sql.shuffle.partitions
,正确设置。
默认情况下Spark SQL
uses spark.sql.shuffle.partitions
数量
用于聚合和连接的分区,默认为 200。
这通常会导致分区爆炸,但没有任何影响
由于这 200 个任务(每个分区)具有查询的性能
一切都在得到结果之前开始和完成。
考虑您的查询真正需要多少个分区。
Spark
只能为每个分区运行 1 个并发任务RDD
,最多为集群中的核心数。
因此,如果您的集群有 50 个核心,您希望 RDD 至少有 50 个分区。
至于选择“良好”的分区数量,您通常需要至少与并行执行器的数量一样多。
您可以通过调用获得这个计算值
sc.defaultParallelism
或检查 RDD 分区数
df.rdd.partitions.size
重新分区:增加分区,过滤后重新平衡分区增加并行度repartition(numPartitions: Int)
Coalesce:在输出到 HDFS/外部之前减少分区而不进行 shuffle 合并coalesce(numPartitions: Int, suffle: Boolean = false)
您可以点击此链接:通过合并和重新分区管理 Spark 分区 https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4
缓存数据以避免重新计算: dataFrame.cache()
分析器——逻辑查询计划分析器
分析器是 Spark SQL 中的逻辑查询计划分析器,在语义上
验证未解决的逻辑计划并将其转换为已分析的逻辑计划。
您可以使用解释访问数据集的分析逻辑计划(使用
启用扩展标志)
dataframe.explain(extended = true)
有关更多性能选项,请参阅文档:性能调优 https://spark.apache.org/docs/2.4.0/sql-performance-tuning.html
运行 Spark 进程有很多可能性,但这始终取决于您的用例。
批处理还是流处理?数据框还是普通 RDD?蜂巢还是不蜂巢?数据是否被打乱?等等......
我强烈推荐你Spark SQL 的内部原理 https://jaceklaskowski.gitbooks.io/mastering-spark-sql/ by 雅采克·拉斯科夫斯基.
最后,您必须使用不同的值和基准进行一些试验,以了解数据样本的处理时间。
val start = System.nanoTime()
// my process
val end = System.nanoTime()
val time = end - start
println(s"My App takes: $time")
希望这可以帮助。