用 pyspark 之前已知的良好值填充 null

2024-01-07

有办法替换吗nullpyspark 数据框中的值与最后一个有效值?还有额外的timestamp and session列(如果您认为需要它们来进行 Windows 分区和排序)。更具体地说,我想实现以下转换:

+---------+-----------+-----------+      +---------+-----------+-----------+
| session | timestamp |         id|      | session | timestamp |         id|
+---------+-----------+-----------+      +---------+-----------+-----------+
|        1|          1|       null|      |        1|          1|       null|
|        1|          2|        109|      |        1|          2|        109|
|        1|          3|       null|      |        1|          3|        109|
|        1|          4|       null|      |        1|          4|        109|
|        1|          5|        109| =>   |        1|          5|        109|
|        1|          6|       null|      |        1|          6|        109|
|        1|          7|        110|      |        1|          7|        110|
|        1|          8|       null|      |        1|          8|        110|
|        1|          9|       null|      |        1|          9|        110|
|        1|         10|       null|      |        1|         10|        110|
+---------+-----------+-----------+      +---------+-----------+-----------+

这使用last并忽略空值。

让我们重新创建与原始数据类似的内容:

import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

d = [{'session': 1, 'ts': 1}, {'session': 1, 'ts': 2, 'id': 109}, {'session': 1, 'ts': 3}, {'session': 1, 'ts': 4, 'id': 110}, {'session': 1, 'ts': 5},  {'session': 1, 'ts': 6}]
df = spark.createDataFrame(d)

df.show()
# +-------+---+----+
# |session| ts|  id|
# +-------+---+----+
# |      1|  1|null|
# |      1|  2| 109|
# |      1|  3|null|
# |      1|  4| 110|
# |      1|  5|null|
# |      1|  6|null|
# +-------+---+----+

现在,让我们使用窗口函数last:

df.withColumn("id", func.last('id', True).over(Window.partitionBy('session').orderBy('ts').rowsBetween(-sys.maxsize, 0))).show()

# +-------+---+----+
# |session| ts|  id|
# +-------+---+----+
# |      1|  1|null|
# |      1|  2| 109|
# |      1|  3| 109|
# |      1|  4| 110|
# |      1|  5| 110|
# |      1|  6| 110|
# +-------+---+----+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

用 pyspark 之前已知的良好值填充 null 的相关文章

  • 使用 Glue 将数据输入到 AWS Elastic Search

    我正在寻找使用 AWS Glue python 或 pyspark 将数据插入 AWS Elastic Search 的解决方案 我见过用于 Elastic Search 的 Boto3 SDK 但找不到任何将数据插入 Elastic Se
  • 我如何判断我的 Spark 工作是否有进展?

    我有一个正在运行的 Spark 作业YARN它似乎只是挂起并且没有进行任何计算 这是当我这样做时纱线所说的yarn application status
  • 将 PySpark RDD 作为新列添加到 pyspark.sql.dataframe

    我有一个 pyspark sql dataframe 其中每一行都是一篇新闻文章 然后我有一个 RDD 来表示每篇文章中包含的单词 我想将单词的 RDD 作为名为 单词 的列添加到我的新文章数据框中 我试过 df withColumn wo
  • PySpark 应用程序因 java.lang.OutOfMemoryError: Java 堆空间而失败

    我通过 pycharm 和 pyspark shell 分别运行 Spark 我已经堆积了这个错误 java lang OutOfMemoryError Java heap space at org apache spark api pyt
  • 更改spark_temporary目录路径

    是否可以更改 temporarySpark在写入之前保存临时文件的目录 特别是 由于我正在编写表的单个分区 因此我希望临时文件夹位于分区文件夹内 是否可以 由于其实现原因 无法使用默认的 FileOutputCommiter FileOut
  • 为 Spark Thrift 服务器提供仓库目录的路径

    我已经设置了 Spark 集群 并且成功通过 Spark SQL 连接器连接 Tableau 我从 Spark shell 创建了表 并使用 saveAsTable 如何访问从 Tableau 保存的表 启动spark thrift服务器时
  • PySpark 中别名方法的用途是什么?

    在用 Python 学习 Spark 时 我很难理解其目的alias方法及其用途 这文档 http spark apache org docs latest api python pyspark sql html显示它被用来创建现有的副本D
  • 配置 Spark on Yarn 以使用 hadoop 本机库

    Summary 我是 Spark 新手 在使用 Snappy 压缩保存文本文件时遇到了问题 我不断收到下面的错误消息 我遵循了互联网上的许多指示 但没有一个对我有用 最终 我找到了解决方法 但是我希望有人就正确的解决方案提供建议 java
  • AssertionError:断言失败:没有在 Databricks 中进行 DeleteFromTable 的计划

    这个命令运行良好有什么原因吗 sql SELECT FROM Azure Reservations WHERE timestamp gt 2021 04 02 返回 2 行 如下 sql DELETE FROM Azure Reservat
  • 如何使用PySpark结构流+Kafka

    我尝试将 Spark 结构流与 kafka 一起使用 并且在使用 Spark 提交时遇到问题 消费者仍然从生产中接收数据 但 Spark 结构出错 请帮我找到我的代码的问题 这是我在 test py 中的代码 from kafka impo
  • scala/spark 代码不允许在 hive 中添加列

    如果源数据有新列 我尝试在 Hive 表中添加一列 所有新列的检测都运行良好 但是 当我尝试将列添加到目标表时 我收到此错误 for f lt df schema fields if f name chk spark sqlContext
  • 为什么 Databricks Connect Test 无法在 Mac 上运行?

    我已经阅读了配置文档databricks connect但运行时仍然出现以下错误databricks connect test 来自终端的错误 java lang NoSuchMethodError org apache spark int
  • 如何抑制spark输出控制台中的“Stage 2===>”?

    我有数据帧并试图获取不同的计数并且能够成功获取不同的计数 但是每当 scala 程序执行时我都会收到此消息 Stage 2 gt 1 1 2 我如何在控制台中抑制特定的此消息 val countID dataDF select substr
  • Spark SQL如何读取压缩的csv文件?

    我尝试过使用apispark read csv读取带有扩展名的压缩 csv 文件bz or gzip 有效 但在源代码中我没有找到任何可以声明的选项参数codec type 即使在这个link https github com databr
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • 任务和分区之间有什么关系?

    我能说 么 Spark任务的数量等于Spark分区的数量吗 执行器运行一次 执行器内部的批处理 等于一个任务吗 每个任务只产生一个分区 1 的重复 并行度或可以同时运行的任务数量由以下公式设置 Executor实例的数量 配置 每个执行器的
  • fetchsize和batchsize对Spark的影响

    我想通过以下方式控制 RDB 的读写速度Spark直接 但标题已经透露的相关参数似乎不起作用 我可以得出这样的结论吗fetchsize and batchsize我的测试方法不起作用 或者它们确实会影响阅读和写作方面 因为测量结果基于规模是
  • 如何在Spark结构化流中指定批处理间隔?

    我正在使用 Spark 结构化流并遇到问题 在 StreamingContext DStreams 中 我们可以定义批处理间隔 如下所示 from pyspark streaming import StreamingContext ssc
  • Spark的distinct()函数是否仅对每个分区中的不同元组进行洗牌

    据我了解 distinct 哈希分区 RDD 来识别唯一键 但它是否针对仅移动每个分区的不同元组进行了优化 想象一个具有以下分区的 RDD 1 2 2 1 4 2 2 1 3 3 5 4 5 5 5 在此 RDD 上的不同键上 所有重复键
  • 使用spark phoenix从表中读取rdd分区号为1

    当我运行我的火花代码时 val sqlContext spark sqlContext val noact table primaryDataProcessor getTableData sqlContext zookeeper table

随机推荐

  • 根据商店营业时间有效确定企业是否营业

    给定时间 例如 目前周二下午 4 24 我希望能够从一组企业中选择当前正在营业的所有企业 我有一周中每一天每个企业的营业时间和休息时间 假设一家企业只能在每小时 00 15 30 45 分钟处开门 关门 我假设每周都有相同的时间表 我最感兴
  • 如何在 for 循环中重用 ES6 javascript 中的生成器?

    我正在尝试编写一个可以接受列表或生成器作为输入的函数 例如这个函数 function x l for let i of l console log i for let i of l console log i 如果我像这样运行它 x 1 2
  • jqGrid:如果主键列被编辑,如何更新行ID

    主键值用作从服务器返回的 json 数据中的行 ID 如果主键值被编辑并保存两次 第二次保存会因jqGrid而出错 再次将原始主键值传递给编辑方法 如果在内联编辑中更改主键值 如何将 jqGrid 行 ID 更新为新的主键值 functio
  • 从 Java/C# 到 C++ 的最佳方式是什么? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 注入修改后的 Spring 安全表达式语言实现后,“需要调试符号信息 (...)”

    我有一个示例类要测试 PreAuthorize注释 看起来或多或少像这样 class BankService PreAuthorize hasCustomRole ROLE CUSTOM or hasRole ROLE EXAMPLE Do
  • 从 SQL Server 数据库生成 DDL 脚本

    如何使用 SQL 选择 存储过程 等 从 SQL Server 数据库生成所有表的 DDL 带有外键 索引等 脚本 除了数据之外我需要一切 I can t使用 Sql Server Management Studio 因为我想在将在 Lin
  • Python重新加载模块不会立即生效

    请参阅下面的复制代码 跟踪内存泄漏我发现 reload module 不会立即生效 下面的程序应该打印 0 1 2 3 4 但是 当快速执行时 它会打印 0 0 0 3 3 等序列 将 sleep 函数中的时间增加到例如 1 秒似乎可以解决
  • 如何在 Swift 中使用 addAttribute

    我正在尝试添加到 UITextViews 的链接 因此我按照以下代码进行操作这个帖子 http www raywenderlich com 48001 easily overlooked new features ios 7 textVie
  • iphone - 带有返回值的performSelectorOnMainThread

    我有以下方法 NSMutableArray getElements NSString theURL 我想知道是否有一种方法可以使用 PerformSelectorOnMainThread 来调用该方法 以便我可以获得返回值 到目前为止 我已
  • 我可以在 Visual Studio 2012 中编译和调试(运行)单个 C++ 文件吗? (如何避免创建过多的项目)

    我正在从一本书中学习 C 并使用 Visual Studio 2012 为了遵循本书的练习 我需要创建多个 cpp 文件 其中包含 main 函数 有什么方法可以编译 调试我的程序 而无需每次都创建新项目 例如 如果我写一个简单的 Hell
  • R 在使用 case_when 时提供参数(R 向量化)

    这是我之前提出的问题的后续问题 当存在大量类别 类型时 R 使用 case when R 向量化 应用多个函数 https stackoverflow com questions 62377561 r apply multiple func
  • wxpython如何退出Mainloop?

    我有一个 wxpython 应用程序 它的运行方式如下 if name main app wx App False frame MainWindow Application frame Show app MainLoop 应用程序的菜单栏上
  • 如何阻止三星手机上过多的对象创建?

    我在许多 Android 三星手机上遇到动画速度变慢的问题 我已经追踪到问题所在 是垃圾收集过多 但是 我的代码没有直接创建任何对象 这是 DDMS 中分配跟踪器的堆栈跟踪 at com samsung android multiwindo
  • 如何在Python中将整个序列推送到redis [重复]

    这个问题在这里已经有答案了 我可以用Redis rpush key 1 2 3 将三个元素推送到redis 但如果有一个序列 seq 1 2 3 Redis rpush key seq 它会将 seq 元素推送到 redis 但不会推送三个
  • 如何使用 Telegram Bot API 从电话号码获取 user_id

    我目前正在开发一个 Telegram 机器人 一旦你知道电话号码 它就能在 Telegram 中获取用户信息 主要目标是获取用户存储在 Telegram 中的所有用户信息 只需知道他 她的电话号码 因此 我尝试使用 Telegram Bot
  • TransactionScope 的层次结构

    是否可以有一个事务范围的层次结构 如果外部事务范围进行了处置 那么内部事务范围中所做的更改会发生什么 我的特殊问题是我的测试代码运行具有事务范围的代码 当我调用具有事务范围的第二组代码时 我得到 无法访问已处置的对象 事务 难道处置内层事务
  • C++ - 更改 X 窗口中的光标

    我以为这很容易找到 但谷歌搜索却毫无帮助 有没有一个简单的 API 可以改变 X 窗口中的鼠标光标 我知道在 Windows 中你可以只调用 SetCursor include
  • AVPlayer 动态音量控制

    如何动态更改 AVPlayer 的音量 我的意思是 我想在每次按下按钮时将音量静音 给定的代码似乎仅在编译时更改它 运行时怎么办 AVURLAsset asset AVURLAsset URLAssetWithURL self myAsse
  • 类路径中的 ICEfaces 库可防止文件下载时弹出“另存为”对话框

    一旦我将库icefaces jaricepush jaricefaces ace jar添加到我的类路径中以便使用ACE组件 我的另存为对话框就不会弹出吗 我不确定这是否是一个错误 但如果类路径中没有库 它就可以工作 这是我的另存为方法 p
  • 用 pyspark 之前已知的良好值填充 null

    有办法替换吗nullpyspark 数据框中的值与最后一个有效值 还有额外的timestamp and session列 如果您认为需要它们来进行 Windows 分区和排序 更具体地说 我想实现以下转换 session timestamp