如何在 Spark 中通过组聚合滚动时间窗口

2024-01-06

我有一些数据想要按某个列进行分组,然后根据组中的滚动时间窗口聚合一系列字段。

以下是一些示例数据:

df = spark.createDataFrame([Row(date='2016-01-01', group_by='group1', get_avg=5, get_first=1),
                            Row(date='2016-01-10', group_by='group1', get_avg=5, get_first=2),
                            Row(date='2016-02-01', group_by='group2', get_avg=10, get_first=3),
                            Row(date='2016-02-28', group_by='group2', get_avg=20, get_first=3),
                            Row(date='2016-02-29', group_by='group2', get_avg=30, get_first=3),
                            Row(date='2016-04-02', group_by='group2', get_avg=8, get_first=4)])

我想分组group_by,然后创建从最早的日期开始并延长到 30 天没有该组条目的时间窗口。 30 天结束后,下一个时间窗口将从不属于上一个窗口的下一行的日期开始。

然后我想聚合,例如获取平均值get_avg,以及第一个结果get_first.

所以这个例子的输出应该是:

group_by    first date of window    get_avg  get_first
group1      2016-01-01              5        1
group2      2016-02-01              20       3
group2      2016-04-02              8        4

编辑:抱歉,我意识到我的问题没有正确指定。我实际上想要一个在 30 天不活动后结束的窗口。我相应地修改了示例的 group2 部分。


修改后的答案:

您可以在此处使用简单的窗口函数技巧。一堆进口:

from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_
from pyspark.sql.window import Window

窗口定义:

w = Window.partitionBy("group_by").orderBy("date")

Cast date to DateType:

df_ = df.withColumn("date", col("date").cast("date"))

定义以下表达式:

# Difference from the previous record or 0 if this is the first one
diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0))

# 0 if diff <= 30, 1 otherwise
indicator = (diff > 30).cast("integer")

# Cumulative sum of indicators over the window
subgroup = sum_(indicator).over(w).alias("subgroup")

Add subgroup表达式到表中:

df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg")
+--------+--------+------------+
|group_by|subgroup|avg(get_avg)|
+--------+--------+------------+
|  group1|       0|         5.0|
|  group2|       0|        20.0|
|  group2|       1|         8.0|
+--------+--------+------------+

first对于聚合没有意义,但如果列单调递增,您可以使用min。否则,您还必须使用窗口函数。

使用 Spark 2.1 进行测试。可能需要子查询和Window与早期 Spark 版本一起使用时的实例。

原来的答案(与指定范围无关)

从 Spark 2.0 开始你应该能够使用a window功能 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.window:

在给定时间戳指定列的情况下,将行分桶到一个或多个时间窗口中。窗口开始是包含的,但窗口结束是排除的,例如12:05 将出现在窗口 [12:05,12:10) 中,但不会出现在 [12:00,12:05) 中。

from pyspark.sql.functions import window

df.groupBy(window("date", windowDuration="30 days")).count()

但从结果中你可以看到,

+---------------------------------------------+-----+
|window                                       |count|
+---------------------------------------------+-----+
|[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1    |
|[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2    |
|[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1    |
+---------------------------------------------+-----+

当涉及到时区时你必须要小心一点。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Spark 中通过组聚合滚动时间窗口 的相关文章

随机推荐

  • 请求运行时位置权限时

    目前 当第一次启动我的应用程序时请求运行时权限时 它会提示用户使用他们的位置 如果您单击 是 它不会像应有的那样启用位置 但如果我重新启动该应用程序 它就会启用该位置 关于在哪里可以让它在首次启动时启用位置有什么建议吗 代码的第一部分在 O
  • AesManaged 和 RijndaelManaged

    我目前正在开发一个连接到旧 Web 服务的 Silverlight 应用程序 我们的旧 Web 服务使用 silverlight 不支持的加密工具 最后 我们决定使用 AesManaged 进行加密 但是 我们的网络服务不支持 AesMan
  • 检测 Entity Framework Core 中的延迟加载

    Entity Framework Core 3 1 2 我已启用UseLazyLoadingProxies on my DbContext以确保数据完整性 但如果使用它 我想在开发过程中抛出异常 如何在每次 EF Core 延迟加载关系时执
  • 在 EC2 上的 Virtualenv 中运行 Django -- ImportError:没有名为 django.core.management 的模块

    我在 git 存储库中本地开发了一个 django 应用程序 我为该项目启动了一个 EC2 实例 并设置了一个包含 我认为是 正确的包 依赖项的 virtualenv 然后我继续将我的存储库克隆到 virtualenv 中 现在 我遇到了困
  • 私有方法和属性的 ReSharper C# 命名风格

    我喜欢将私有方法 属性和事件的首字母设为小写 将公共方法 属性和事件的首字母设为大写 但是 在 ReSharper 7 1 中 C 命名样式下只有一个选项适用all方法 属性和事件 告诉 ReSharper 使用不同约定的最佳方法是什么pr
  • 无序映射与向量

    我正在构建一个小型 2D 游戏引擎 现在我需要存储游戏对象的原型 所有类型的信息 我猜一个容器最多有几千个元素 所有元素都有唯一的键 并且在第一次加载后不会删除或添加任何元素 键值是一个字符串 各种线程将运行 我需要向每个人发送一个密钥 或
  • 我怎么知道是否是require_once?

    我有两个文件 1 索引 php 2 main php 索引 php拨电至main php by 我怎么知道从main php当它执行时 if 索引 php调用他或同一用户独立执行它 如果是 require once 我应该在主 php 中询
  • 如何在 phpunit 中引用外部数据提供者?

    我正在尝试使用 PHPUnit 中的通用数据提供程序运行一些测试 参见下面的测试 namespace AppBundle Tests Controller use Symfony Bundle FrameworkBundle Test We
  • 使用数据框索引数据的 pandas 数据透视表

    我想从 pandas 数据框创建一个数据透视表 使用 dataframe pivot 并且不仅包括数据帧列 还包括数据帧索引中的数据 找不到任何说明如何执行此操作的文档 有小费吗 Use reset index使索引成为一列 In 45 d
  • 在 Android 中为图像视图添加缩放功能

    我想在我的 Android 应用程序中添加缩放功能 我已经尝试过这段代码 但它只是在图像中间放大和缩小 我想对整个图像应用缩放 在同一张图像中 我可以在图像的某些区域放置一个按钮吗 import android content Contex
  • 根据值类型过滤键子列表的接口键

    Problem 给定一个被视为映射的 TypeScript 接口 将唯一键类型与非唯一值类型相关 可以提取 映射到指定值类型的键类型 作为一个具体的例子 从WindowEventMap in lib dom d ts interface W
  • 如何设置 curve_fit 的初始值以找到最佳优化,而不仅仅是局部优化?

    我正在尝试拟合幂律函数 并找到最佳拟合参数 但是 我发现如果参数的初始猜测不同 最佳拟合 输出就会不同 除非我找到正确的初始猜测 否则我可以获得最佳优化 而不是局部优化 有没有办法找到 合适的初始猜测 下面列出了我的代码 请随意提出任何意见
  • Angular 8:对象不支持属性或方法“包含”

    我正在 Angular8 中构建一个应用程序 我在 angular5 6 7 上工作 对于这些应用程序 我取消了 polyfills ts 中存在的导入的注释 对于 Angular 8 它只有 3 个导入 即 classlist js we
  • 将网站表单转换为 PDF 并通过电子邮件发送给网站管理员

    我正在尝试为我的网站创建一个表单 一旦提交 该表单将转换为 PDF 并通过电子邮件发送给我自己 网站管理员 我知道我无法纯粹使用 JQuery Javascript 来完成该操作 我需要使用 PHP 来发送电子邮件 将 HTML 转换为 P
  • 如何在更改值后在运行时保存 ScriptableObject

    我在使用 ScriptableObjects 作为 Unity 应用程序中的保存对象时遇到问题 当我尝试向其中写入值时 一切都工作得很好 但如果我想关闭应用程序并加载 ScriptableObject 的值 这些值将重置为最后的值 这破坏了
  • AVFoundation 声音可以在 iOS 6 模拟器上工作,但不能在设备上工作?

    帮助 我可以在 iOS 模拟器上播放声音 但不能在我的设备上播放声音 这是我的代码 是的 音频文件位于指定位置 它绝对有效 SystemSoundID hashtag NSString path NSBundle mainBundle pa
  • 使用 Java 读取 .jar 清单文件

    所以我试图通过检查 mainfest 文件中的一些值来查看 jar 是否有效 使用java读取和解析文件的最佳方法是什么 我想到使用这个命令来提取文件 jar xvf anyjar jar META INF MANIFEST MF 但我可以
  • 无法通过 Google Apps 脚本打开 Slack 对话框

    我正在尝试使用 google apps 脚本和 Slack 来自动化我的工作 我希望使用 Slack 对话框输入一些文本 以使用 google apps 脚本修改我的 google 电子表格 但是 使用下面的代码 我无法通过以下方式打开对话
  • 如何在 C++ 中将使用 malloc 创建的数组声明为易失性

    我认为下面会给我 10 个易失性整数 volatile int foo 10 但是 我不认为以下内容会做同样的事情 volatile int foo foo malloc sizeof int 10 如果我对此以及如何使用 malloc 拥
  • 如何在 Spark 中通过组聚合滚动时间窗口

    我有一些数据想要按某个列进行分组 然后根据组中的滚动时间窗口聚合一系列字段 以下是一些示例数据 df spark createDataFrame Row date 2016 01 01 group by group1 get avg 5 g