想要计算...每个 group_id 一天内每小时。
提取数据和时间:
from pyspark.sql.functions import col, count, hour, sum
extended = (df
.withColumn("event_time", col("event_time").cast("timestamp"))
.withColumn("date", col("event_time").cast("date"))
.withColumn("hour", hour(col("event_time"))))
计算聚合
aggs = extended.groupBy("group_id", "date", "hour").count()
我想计算事件的滚动计数
并使用窗口函数:
from pyspark.sql.window import Window
aggs.withColumn(
"agg_count",
sum("count").over(Window.partitionBy("group_id", "date").orderBy("hour")))
要为缺失的间隔获取 0,您必须为每个日期和小时生成参考数据并将其加入。
With df
定义为:
df = sc.parallelize([
("XXXX", "2017-10-25 01:47:02.717013"),
("XXXX", "2017-10-25 14:47:25.444979"),
("XXXX", "2017-10-25 14:49:32.21353"),
("YYYY", "2017-10-25 14:50:38.321134"),
("YYYY", "2017-10-25 14:51:12.028447"),
("ZZZZ", "2017-10-25 14:51:24.810688"),
("YYYY", "2017-10-25 14:37:34.241097"),
("ZZZZ", "2017-10-25 14:37:24.427836"),
("XXXX", "2017-10-25 22:37:24.620864"),
("YYYY", "2017-10-25 16:37:24.964614")
]).toDF(["group_id", "event_time"])
结果是
+--------+----------+----+-----+---------+
|group_id| date|hour|count|agg_count|
+--------+----------+----+-----+---------+
| XXXX|2017-10-25| 1| 1| 1|
| XXXX|2017-10-25| 14| 2| 3|
| XXXX|2017-10-25| 22| 1| 4|
| ZZZZ|2017-10-25| 14| 2| 2|
| YYYY|2017-10-25| 14| 3| 3|
| YYYY|2017-10-25| 16| 1| 4|
+--------+----------+----+-----+---------+