当转换为列的值列表未知时,旋转的成本很高。 Spark 已超载pivot
将它们作为参数的方法。
def pivot(pivotColumn: String, values: Seq[Any])
如果它们未知,Spark 必须对数据集中的不同值进行排序和收集。否则,逻辑是非常简单和描述的here.
该实现添加了一个新的逻辑运算符 (o.a.s.sql.catalyst.plans.logic.Pivot)。该逻辑运算符由新的分析器规则 (o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot) 转换,该规则当前将其转换为包含大量 if 语句的聚合,每个主值一个表达式。
例如, df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D") 将被转换为 df.groupBy 的等效项("A", "B").agg(expr(“sum(if(C = '小', D, null))”), expr(“sum(if(C = '大', D, null)) )”))。您可以自己完成此操作,但它会变得很长并且可能很快就会出错。
如果没有旋转,我会做类似的事情:
val in = spark.read.csv("input.csv")
//cast to the unix timestamp
.withColumn("timestamp", unix_timestamp($"time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType))
.drop($"time_stamp")
现在,我们可以按时间窗口和主机名对数据集进行分组,并将 KPI 指标收集到地图中。
有一个优秀的answer正是描述了这一点。
val joinMap = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }
val grouped = in.groupBy(window($"timestamp", "5 minutes"), $"Hostname")
.agg(joinMap(collect_list(map($"kpi_subtype", $"value_current".cast(DoubleType)))).as("metrics"))
Output
+------------------------------------------+--------+-------------------------------------------------------------+
|window |Hostname|metrics |
+------------------------------------------+--------+-------------------------------------------------------------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |[Total -> 100.0, used -> 42.0, buffer -> 7.0, cached -> 9.0] |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
+------------------------------------------+--------+-------------------------------------------------------------+
现在我们定义一些别名和一个简单的 select 语句:
val total = col("metrics")("Total")
val used = col("metrics")("used")
val buffer = col("metrics")("buffer")
val cached = col("metrics")("cached")
val result = grouped.select($"window", $"Hostname",
(total - ((total - used + buffer + cached) / total) * 100).as("percentage"))
现在我们开始:
+------------------------------------------+--------+----------+
|window |Hostname|percentage|
+------------------------------------------+--------+----------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |17.0 |
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |26.0 |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |17.0 |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |17.0 |
+------------------------------------------+--------+----------+