使用spark窗口函数获取最后一个值

2023-12-29

假设我有一个像这样的数据框。

val df = sc.parallelize(Seq(
            (1.0, 1,"Matt"), 
            (1.0, 2,"John"),
            (1.0, 3,null.asInstanceOf[String]),
            (-1.0, 2,"Adam"), 
            (-1.0, 4,"Steve"))
          ).toDF("id", "timestamp","name")

我想获取按时间戳排序的每个 id 的最后一个非空值。这是我的窗户

val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp".desc)

我正在创建一个独特的窗口数据

val filteredDF = df.filter($"name".isNotNull).withColumn("firstName", first("name") over (partitionWindow)).drop("timestamp","name").distinct

并将其连接回实际数据

val joinedDF = df.join(filteredDF, windowDF.col("id") === filteredDF.col("id")).drop(filteredDF.col("id"))

joinedDF.show()

它工作正常,但我不喜欢这个解决方案,有人能给我建议更好的吗?

另外,谁能告诉我为什么最后一个功能不起作用?我尝试了这个,但结果不正确

 val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")

val windowDF = df.withColumn("lastName", last("name") over (partitionWindow))

如果您想传播最后一个已知值(它与使用的逻辑不同)join) 你应该:

  • ORDER BY timestamp.
  • Take last忽略nulls:
val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")

df.withColumn("lastName", last("name", true) over (partitionWindow)).show
// +----+---------+-----+--------+
// |  id|timestamp| name|lastName|
// +----+---------+-----+--------+
// |-1.0|        2| Adam|    Adam|
// |-1.0|        4|Steve|   Steve|
// | 1.0|        1| Matt|    Matt|
// | 1.0|        2| John|    John|
// | 1.0|        3| null|    John|
// +----+---------+-----+--------+

如果你想全局获取最后一个值:

  • ORDER BY timestamp.
  • 设置无界框架。
  • Take last忽略nulls:
val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")
  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("lastName", last("name", true) over (partitionWindow)).show
// +----+---------+-----+--------+
// |  id|timestamp| name|lastName|
// +----+---------+-----+--------+
// |-1.0|        2| Adam|   Steve|
// |-1.0|        4|Steve|   Steve|
// | 1.0|        1| Matt|    John|
// | 1.0|        2| John|    John|
// | 1.0|        3| null|    John|
// +----+---------+-----+--------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用spark窗口函数获取最后一个值 的相关文章

  • 我如何判断我的 Spark 工作是否有进展?

    我有一个正在运行的 Spark 作业YARN它似乎只是挂起并且没有进行任何计算 这是当我这样做时纱线所说的yarn application status
  • 将 PySpark RDD 作为新列添加到 pyspark.sql.dataframe

    我有一个 pyspark sql dataframe 其中每一行都是一篇新闻文章 然后我有一个 RDD 来表示每篇文章中包含的单词 我想将单词的 RDD 作为名为 单词 的列添加到我的新文章数据框中 我试过 df withColumn wo
  • Spark 物理计划和逻辑计划

    我有两个问题 在不添加任何额外代码来打印提交的 Spark 作业的逻辑和物理计划的情况下 有没有办法查看集群上运行的 Spark 作业的物理和逻辑计划 有没有办法动态修改集群上正在运行的 Spark 作业的执行计划以获得更好的性能 请分享您
  • 使用 Spark pandas_udf 创建列,具有动态数量的输入列

    我有这个 df df spark createDataFrame row a 5 0 0 0 11 0 row b 3394 0 0 0 4543 0 row c 136111 0 0 0 219255 0 row d 0 0 0 0 0
  • 使用 Spark 版本 2.2 的 row_number() 函数创建 PySpark DataFrame 中每行的行号

    我有一个 PySpark DataFrame valuesCol Sweden 31 Norway 62 Iceland 13 Finland 24 Denmark 52 df sqlContext createDataFrame valu
  • Spark RDD默认分区数

    版本 Spark 1 6 2 Scala 2 10 我正在执行以下命令spark shell 我试图查看 Spark 默认创建的分区数量 val rdd1 sc parallelize 1 to 10 println rdd1 getNum
  • Sparklyr - 在 Apache Spark Join 中包含空值

    问题在 Apache Spark Join 中包含空值 https stackoverflow com questions 41728762 including null values in an apache spark join有 Sc
  • Spark/Yarn:HDFS 上不存在文件

    我在 AWS 上设置了 Hadoop Yarn 集群 有 1 个主服务器和 3 个从服务器 我已经验证我有 3 个活动节点在端口 50070 和 8088 上运行 我在客户端部署模式下测试了 Spark 作业 一切正常 当我尝试使用 Spa
  • 为什么 Databricks Connect Test 无法在 Mac 上运行?

    我已经阅读了配置文档databricks connect但运行时仍然出现以下错误databricks connect test 来自终端的错误 java lang NoSuchMethodError org apache spark int
  • 如何抑制spark输出控制台中的“Stage 2===>”?

    我有数据帧并试图获取不同的计数并且能够成功获取不同的计数 但是每当 scala 程序执行时我都会收到此消息 Stage 2 gt 1 1 2 我如何在控制台中抑制特定的此消息 val countID dataDF select substr
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • Spark - 如何在本地运行独立集群

    是否有可能运行Spark独立集群仅在一台机器上进行本地操作 这与仅在本地开发作业基本上不同 即local 到目前为止 我正在运行 2 个不同的虚拟机来构建集群 如果我可以在同一台机器上运行一个独立的集群 该怎么办 例如三个不同的 JVM 正
  • 任务和分区之间有什么关系?

    我能说 么 Spark任务的数量等于Spark分区的数量吗 执行器运行一次 执行器内部的批处理 等于一个任务吗 每个任务只产生一个分区 1 的重复 并行度或可以同时运行的任务数量由以下公式设置 Executor实例的数量 配置 每个执行器的
  • fetchsize和batchsize对Spark的影响

    我想通过以下方式控制 RDB 的读写速度Spark直接 但标题已经透露的相关参数似乎不起作用 我可以得出这样的结论吗fetchsize and batchsize我的测试方法不起作用 或者它们确实会影响阅读和写作方面 因为测量结果基于规模是
  • 懒惰背景下的变革与行动

    正如 Learning Spark 闪电般快速的大数据分析 一书中提到的 由于 Spark 计算 RDD 的方式不同 转换和操作也有所不同 在对惰性进行一些解释之后 我发现转换和操作都是惰性地进行的 那么问题来了 这句话的意思是什么 对比
  • 如何使用 SparkR 1.6.0 写入 JDBC 源?

    使用 SparkR 1 6 0 我可以使用以下代码从 JDBC 源读取数据 jdbc url lt jdbc mysql localhost 3306 dashboard user
  • Checkpoint RDD ReliableCheckpointRDD 与原始 RDD 的分区数量不同

    我有一个由两台机器组成的 Spark 集群 当我运行 Spark 流应用程序时 出现以下错误 Exception in thread main org apache spark SparkException Checkpoint RDD R
  • Spark EC2 SSH连接错误SSH返回代码255

    每次我尝试通过 Spark ec2 spark ec2 py 文件在 AWS 上启动 Spark 集群时 都会收到 SSH 连接错误 最终解决了 但是浪费了很多时间 在您将其标记为重复之前 我知道有很多类似的问题被问到 但有两个关键区别 a
  • 如何从spark中的hbase表中获取所有数据

    我在 hbase 中有一个大表 名称为 UserAction 它具有三个列族 歌曲 专辑 歌手 我需要从 歌曲 列族中获取所有数据作为 JavaRDD 对象 我尝试了这段代码 但效率不高 有更好的解决方案来做到这一点吗 static Spa
  • 必须包含 log4J,但它会导致 Apache Spark shell 中出现错误。如何避免错误?

    由于我必须将 jar 包含到 Spark 代码中 因此我想请求帮助找出解决此问题而不删除 log4j 导入的方法 简单代码如下 cp symjar log4j 1 2 17 jar import org apache spark rdd v

随机推荐

  • 使用反射获取属性的字符串名称

    有大量的反射示例可以让您获得 一个类中的所有属性 单个属性 前提是您知道字符串名称 有没有一种方法 使用反射 TypeDescriptor 或其他方式 在运行时获取类中属性的字符串名称 前提是我拥有 的只是类和属性的实例 我有一个类的实例
  • 如何将数据推送到 iPhone 应用程序?

    我是 iPhone 应用程序开发新手 我无法弄清楚如何将数据推送到应用程序 具体来说 我试图找到一种方法将新数据 用户帖子 从服务器推送到应用程序 而无需用户刷新 下拉刷新 有可能吗 有一个接近的解决方案 使用Apple推送通知服务 它允许
  • 连接字符串无法按预期工作[关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我知道这是一个常见问题 但在寻找参考
  • aws_iam_policy 和 aws_iam_role_policy 之间的区别

    我有一个aws iam role我想添加一个策略 通常 我会创建一个策略aws iam role并将其附加到角色上aws iam role policy attachment 但是 我看过一些使用的文档aws iam role policy
  • 如何从另一个分支获取更改

    我目前正在研究featurex分支 我们的主分支被命名为our team 自从我开始工作以来featurex 对分支进行了更多更改our team 我在本地完成此操作是为了获取所有最新更改our team git checkout our
  • 将 PEM 证书解析为 JSON

    我有 PEM 证书并且正在使用openssl查看其内容 是否可以将输出解析为 JSON 格式 也许有一个 Java 库或 Bash 脚本可以做到这一点 命令 openssl x509 in sample cer noout text out
  • 无法将函数并行映射到 tarfile 成员

    我有一个包含 bz2 压缩文件的 tar 文件 我想应用该功能clean file到每个 bz2 文件 并整理结果 在系列中 使用循环很容易 import pandas as pd import json import os import
  • Python:将标志传递给函数

    很长一段时间以来 我一直试图找出将标志传递给 python 函数的最佳方法 最直接的方法是这样的 def func data flag1 flag2 flag3 func my data True False True 这确实很好而且简洁
  • Ajax div 刷新后 Jquery Masonry 相互加载

    我正在使用 ajax 刷新包含图像的 div 我最初使用砌体来添加布局 然后ajax调用返回一个js 使用html 方法刷新div 现在完成后我打电话masonry reloadItems 但砌体将所有图像加载到另一图像上 调整页面大小后
  • 如何在android中解析这个JSON数组

    我想要每个标签的名称 电子邮件和图像 我必须在列表元素中显示 response name Brajendra Mishra email email protected cdn cgi l email protection address I
  • 在 Python 中使用互斥锁和并发 future

    我有一些代码使用并发 future 连接到许多远程主机来运行一些命令 例如 def set host to host value connection connect to host info do something with conne
  • 从底部开始将元素附加到 div?

    我有以下代码 button click function parent append div element div parent height 200px width 100px border 1px solid ccc div div
  • MVVM 多视图

    我正在尝试学习 MVVM 到目前为止进展顺利 我偶然发现了一种我不知道如何实现的情况 我想要的 具有左侧导航和右侧详细信息窗格的视图 右侧详细信息将有一个内容容器 其中包含我的用户控件 以便通过左侧窗格选择视图 我拥有的 主视图模型 我数据
  • 是否需要注册兴趣才能写入 NIO 套接字来发送数据?

    是否需要注册兴趣才能写入 NIO 客户端套接字通道来发送数据 我必须总是打电话吗socketChannel register selector SelectionKey OP WRITE 或类似的东西 在写信给客户之前SocketChann
  • 在 Excel 中根据条件查找最大值或最小值

    在我的电子表格中 我有一列包含负值和正值 我需要获得所有正值中的最小值和所有负值中的最大值 我怎样才能这样做呢 使用数组公式 在以下示例中 您要检查的值位于A2 A10 最大负值 MAX IF A2 A10 lt 0 A2 A10 Pres
  • 使用Python将多个制表符分隔的文本文件插入MySQL?

    我正在尝试创建一个程序 它采用多个制表符分层文本文件 并一次处理一个文件 将它们保存的数据输入 MySQL 数据库 有几个文本文件 例如 movie txt 如下所示 1 Avatar 3 Iron Man 3 Star Trek 每个文本
  • Android - 通过网址启动谷歌地图

    在 iPhone 上 maps google com URL 被本机谷歌地图应用程序拦截并加载 我想在 Android 上执行相同的操作 但 Google 地图正在浏览器中加载 那么 在网页中 是否可以有这个url在 Android 中打开
  • 在 Objective C 中谁调用了 dealloc 方法以及何时调用?

    当在 Objective C 中创建自定义类时 何时以及如何创建dealloc方法调用 这是我必须在课堂上实施的事情吗 您永远不会直接发送 dealloc 消息 相反 对象的 dealloc 方法是通过 NSObject 协议的releas
  • WebStorm 终端颜色

    我的创意终端中出现了令人难以忍受的白色背景 网络风暴 有谁知道我该如何改变这个 打开 文件 gt 设置 然后转到 编辑器 部分 gt 颜色和字体 部分 在那里 您将看到 控制台颜色 选项 在这里 您将看到一个交互式编辑屏幕 以确定您希望这些
  • 使用spark窗口函数获取最后一个值

    假设我有一个像这样的数据框 val df sc parallelize Seq 1 0 1 Matt 1 0 2 John 1 0 3 null asInstanceOf String 1 0 2 Adam 1 0 4 Steve toDF