如何在 PySpark 中累计聚合一天内超过“1 小时”的窗口

2024-05-25

我有一个如下所示的 Spark DataFrame:

+---------+--------------------------+
|group_id |event_time                |
+---------+--------------------------+
|XXXX     |2017-10-25 14:47:02.717013|
|XXXX     |2017-10-25 14:47:25.444979|
|XXXX     |2017-10-25 14:49:32.21353 |
|YYYY     |2017-10-25 14:50:38.321134|
|YYYY     |2017-10-25 14:51:12.028447|
|ZZZZ     |2017-10-25 14:51:24.810688|
|YYYY     |2017-10-25 14:37:34.241097|
|ZZZZ     |2017-10-25 14:37:24.427836|
|XXXX     |2017-10-25 14:37:24.620864|
|YYYY     |2017-10-25 14:37:24.964614|
+---------+--------------------------+

我想计算一天内每小时的滚动事件数group_id.

所以,对于日期时间25-10 14:00和对于一个group_id,我想计算该事件的计数group_id from 25-10 00:00 till 25-10 14:00.

做如下的事情:

df.groupBy('group_id', window('event_time', '1 hour').alias('model_window')) \
    .agg(dfcount(lit(1)).alias('values'))

计算每小时的事件数,但不累计每天的事件数。

有任何想法吗?

EDIT: 预期的输出类似于:

    +---------+---------------------------------------------+-------+
    |group_id |model_window                                 |values |         
    +---------+---------------------------------------------+-------+
    |XXXX     |[2017-10-25 00:00:00.0,2017-10-25 01:00:00.0]| 10    |
    |XXXX     |[2017-10-25 00:00:00.0,2017-10-25 02:00:00.0]| 17    |
    |XXXX     |[2017-10-25 00:00:00.0,2017-10-25 03:00:00.0]| 22    |
    |YYYY     |[2017-10-25 00:00:00.0,2017-10-25 01:00:00.0]| 0     |
    |YYYY     |[2017-10-25 00:00:00.0,2017-10-25 02:00:00.0]| 1     |
    |YYYY     |[2017-10-25 00:00:00.0,2017-10-25 03:00:00.0]| 9     |
    +---------+---------------------------------------------+-------+

想要计算...每个 group_id 一天内每小时。

提取数据和时间:

from pyspark.sql.functions import col, count, hour, sum

extended = (df
  .withColumn("event_time", col("event_time").cast("timestamp"))
  .withColumn("date", col("event_time").cast("date"))
  .withColumn("hour", hour(col("event_time"))))

计算聚合

aggs = extended.groupBy("group_id", "date", "hour").count()

我想计算事件的滚动计数

并使用窗口函数:

from pyspark.sql.window import Window

aggs.withColumn(
    "agg_count", 
    sum("count").over(Window.partitionBy("group_id", "date").orderBy("hour")))

要为缺失的间隔获取 0,您必须为每个日期和小时生成参考数据并将其加入。

With df定义为:

df = sc.parallelize([
    ("XXXX", "2017-10-25 01:47:02.717013"),
    ("XXXX", "2017-10-25 14:47:25.444979"),
    ("XXXX", "2017-10-25 14:49:32.21353"),
    ("YYYY", "2017-10-25 14:50:38.321134"),
    ("YYYY", "2017-10-25 14:51:12.028447"),
    ("ZZZZ", "2017-10-25 14:51:24.810688"),
    ("YYYY", "2017-10-25 14:37:34.241097"),
    ("ZZZZ", "2017-10-25 14:37:24.427836"),
    ("XXXX", "2017-10-25 22:37:24.620864"),
    ("YYYY", "2017-10-25 16:37:24.964614")
]).toDF(["group_id", "event_time"])

结果是

+--------+----------+----+-----+---------+                                      
|group_id|      date|hour|count|agg_count|
+--------+----------+----+-----+---------+
|    XXXX|2017-10-25|   1|    1|        1|
|    XXXX|2017-10-25|  14|    2|        3|
|    XXXX|2017-10-25|  22|    1|        4|
|    ZZZZ|2017-10-25|  14|    2|        2|
|    YYYY|2017-10-25|  14|    3|        3|
|    YYYY|2017-10-25|  16|    1|        4|
+--------+----------+----+-----+---------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 PySpark 中累计聚合一天内超过“1 小时”的窗口 的相关文章

  • Python 中的 Lanczos 插值与 2D 图像

    我尝试重新缩放 2D 图像 灰度 图像大小为 256x256 所需输出为 224x224 像素值范围从 0 到 1300 我尝试了两种使用 Lanczos 插值来重新调整它们的方法 首先使用PIL图像 import numpy as np
  • Django 管理员在模型编辑时间歇性返回 404

    我们使用 Django Admin 来维护导出到我们的一些站点的一些数据 有时 当单击标准更改列表视图来获取模型编辑表单而不是路由到正确的页面时 我们会得到 Django 404 页面 模板 它是偶尔发生的 我们可以通过重新加载三次来重现它
  • 如何使用 SparkR 1.6.0 写入 JDBC 源?

    使用 SparkR 1 6 0 我可以使用以下代码从 JDBC 源读取数据 jdbc url lt jdbc mysql localhost 3306 dashboard user
  • 如何替换 pandas 数据框列中的重音符号

    我有一个数据框dataSwiss其中包含瑞士城市的信息 我想用普通字母替换带有重音符号的字母 这就是我正在做的 dataSwiss Municipality dataSwiss Municipality str encode utf 8 d
  • 根据列值突出显示数据框中的行?

    假设我有这样的数据框 col1 col2 col3 col4 0 A A 1 pass 2 1 A A 2 pass 4 2 A A 1 fail 4 3 A A 1 fail 5 4 A A 1 pass 3 5 A A 2 fail 2
  • 如何获取 Kafka 偏移量以进行结构化查询以进行手动且可靠的偏移量管理?

    Spark 2 2引入了Kafka的结构化流源 据我了解 它依赖 HDFS 检查点目录来存储偏移量并保证 恰好一次 消息传递 但是旧码头 比如https blog cloudera com blog 2017 06 offset manag
  • Spark KMeans 无法处理大数据吗?

    KMeans 有几个参数training http spark apache org docs latest api python pyspark mllib html highlight kmeans pyspark mllib clus
  • 以编程方式停止Python脚本的执行? [复制]

    这个问题在这里已经有答案了 是否可以使用命令在任意行停止执行 python 脚本 Like some code quit quit at this point some more code that s not executed sys e
  • Python pickle:腌制对象不等于源对象

    我认为这是预期的行为 但想检查一下 也许找出原因 因为我所做的研究结果是空白 我有一个函数可以提取数据 创建自定义类的新实例 然后将其附加到列表中 该类仅包含变量 然后 我使用协议 2 作为二进制文件将该列表腌制到文件中 稍后我重新运行脚本
  • Python 函数可以从作用域之外赋予新属性吗?

    我不知道你可以这样做 def tom print tom s locals locals def dick z print z name z name z guest Harry print z guest z guest print di
  • 从 Flask 访问 Heroku 变量

    我已经使用以下命令在 Heroku 配置中设置了数据库变量 heroku config add server xxx xxx xxx xxx heroku config add user userName heroku config add
  • Python 的“zip”内置函数的 Ruby 等价物是什么?

    Ruby 是否有与 Python 内置函数等效的东西zip功能 如果不是 做同样事情的简洁方法是什么 一些背景信息 当我试图找到一种干净的方法来进行涉及两个数组的检查时 出现了这个问题 如果我有zip 我可以写这样的东西 zip a b a
  • 将图像分割成多个网格

    我使用下面的代码将图像分割成网格的 20 个相等的部分 import cv2 im cv2 imread apple jpg im cv2 resize im 1000 500 imgwidth im shape 0 imgheight i
  • 如何在 Python 中追加到 JSON 文件?

    我有一个 JSON 文件 其中包含 67790 1 kwh 319 4 现在我创建一个字典a dict我需要将其附加到 JSON 文件中 我尝试了这段代码 with open DATA FILENAME a as f json obj js
  • 如何为 Spark RDD 中的元素分配唯一的连续编号

    我有一个数据集 user product review 并希望将其输入到 mllib 的 ALS 算法中 该算法需要用户和产品是数字 而我的是字符串用户名和字符串SKU 现在 我获取不同的用户和 SKU 然后在 Spark 外部为它们分配数
  • 如何计算 pandas 数据帧上的连续有序值

    我试图从给定的数据帧中获取连续 0 值的最大计数 其中包含来自 pandas 数据帧的 id date value 列 如下所示 id date value 354 2019 03 01 0 354 2019 03 02 0 354 201
  • Python 类继承 - 诡异的动作

    我观察到类继承有一个奇怪的效果 对于我正在处理的项目 我正在创建一个类来充当另一个模块的类的包装器 我正在使用第 3 方 aeidon 模块 用于操作字幕文件 但问题可能不太具体 以下是您通常如何使用该模块 project aeidon P
  • 导入错误:没有名为 site 的模块 - mac

    我已经有这个问题几个月了 每次我想获取一个新的 python 包并使用它时 我都会在终端中收到此错误 ImportError No module named site 我不知道为什么会出现这个错误 实际上 我无法使用任何新软件包 因为每次我
  • Python Selenium:如何在文本文件中打印网站上的值?

    我正在尝试编写一个脚本 该脚本将从 tulsaspca org 网站获取以下 6 个值并将其打印在 txt 文件中 最终输出应该是 905 4896 7105 23194 1004 42000 放置的动物 的 HTML span class
  • 如何使用 Pycharm 安装 tkinter? [复制]

    这个问题在这里已经有答案了 I used sudo apt get install python3 6 tk而且效果很好 如果我在终端中打开 python Tkinter 就可以工作 但我无法将其安装在我的 Pycharm 项目上 pip

随机推荐