这个用例可以通过 Spark 的滞后/任何其他功能来完成吗?

2024-03-08

我使用的是spark-2.4.1v。我的项目中有一个用例,对于每个日期(process_date),我需要考虑当天的记录和前一天的记录,并对该数据集执行某些其他操作。 那么如何为此准备数据集呢?我尝试使用滞后函数但没有取得太大成功。

对于上述用例,给出的数据如下:

+----------+----------+----+-------+------------+-----------+
|company_id|  gen_date|year|quarter|total_assets|create_date|
+----------+----------+----+-------+------------+-----------+
| 989856662|2019-01-02|2019|      1| 3900.435058| 2019-09-11|
| 989856665|2019-01-02|2019|      1| 4836.435058| 2019-09-11|
| 989856667|2019-01-02|2019|      1| 5836.435058| 2019-09-11|
| 989856662|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856665|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856667|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856662|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856665|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856667|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856662|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
+----------+----------+----+-------+------------+-----------+

这里 gen_date 是关键列。对于每个 gen_date,我需要获取其先前可用的 gen_date 记录。这些将按设置一起处理,即对于 process_date 2019-01-02,它应该具有 2019-01-02 和 2019-01-01 的记录,就像 gen_date 2018-12-30 及其之前的 gen_date 的 process_date 记录一样,即2018-12-29,但这里的 2018-12-29 gen_date 记录不可用,因此应被视为 gen_date 2018-12-30 记录。

在给定的集合中:

对于 process_date 2019-01-02 => ( gen_date 2019-01-02 ) 的记录 + ( gen_date 2019-01-01 ) 的记录
对于 process_date 2019-01-01 => ( gen_date 2019-01-01 ) 的记录 + ( gen_date 2018-12-31 ) 的记录
对于 process_date 2018-12-31 => ( gen_date 2018-12-31 ) 的记录 + ( gen_date 2018-12-30 ) 的记录
对于 process_date 2018-12-30 => ( gen_date 2018-12-30 ) 的记录 + 没有以前的 gen_date 记录。

输出应如下所示:

+----------+------------+----------+----+-------+------------+-----------+
|company_id|process_date|  gen_date|year|quarter|total_assets|create_date|
+----------+------------+----------+----+-------+------------+-----------+
| 989856662|  2019-01-02|2019-01-02|2019|      1| 3900.435058| 2019-09-11|
| 989856662|  2019-01-02|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856665|  2019-01-02|2019-01-02|2019|      1| 4836.435058| 2019-09-11|
| 989856665|  2019-01-02|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856667|  2019-01-02|2019-01-02|2019|      1| 5836.435058| 2019-09-11|
| 989856667|  2019-01-02|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856662|  2019-01-01|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856662|  2019-01-01|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856665|  2019-01-01|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856665|  2019-01-01|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856667|  2019-01-01|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856667|  2019-01-01|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856662|  2018-12-31|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856662|  2018-12-31|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|  2018-12-31|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856665|  2018-12-31|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|  2018-12-31|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856667|  2018-12-31|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
| 989856662|  2018-12-30|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|  2018-12-30|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|  2018-12-30|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
+----------+------------+----------+----+-------+------------+-----------+

如何实现上述输出?

下面是所附的笔记本网址。

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/988191344931748/7035720262824085/latest.html https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/988191344931748/7035720262824085/latest.html


为了获取给定的前一天的详细信息gen_datecompany_id,您可以使用滞后函数,如下所示spec,

val windowSpec  = Window.partitionBy("company_id").orderBy("gen_date") 

val intermediateDF = finDF
  .withColumn("previous_gen_date", lag("gen_date",1).over(windowSpec))

上述步骤将根据company_id和gen_date为您获取上一代日期,您可以将此数据与您的原始数据连接起来以获得相关的前一天数据。

val finalDF = intermediateDF.alias("a")
  .join(finDF.alias("b"), col("a.company_id") === col("b.company_id") &&
    col("a.previous_gen_date") === col("b.gen_date"), "left_outer")
    .select(col("a.*"),
      col("b.year").as("previous_gen_date_year"),
      col("b.quarter").as("previous_gen_date_quarter"),
      col("b.total_assets").as("previous_gen_date_total_assets"),
      col("b.create_date").as("previous_gen_date_create_date")
    )

上述连接将产生前一天的完整数据以及生成日期。

+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
|company_id|gen_date  |year|quarter|total_assets|create_date|previous_gen_date|previous_gen_date_year|previous_gen_date_quarter|previous_gen_date_total_assets|previous_gen_date_create_date|
+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
|989856662 |2018-12-30|2018|4      |3832.435058 |2019-09-11 |null             |null                  |null                     |null                          |null                         |
|989856662 |2018-12-31|2018|4      |3700.435058 |2019-09-11 |2018-12-30       |2018                  |4                        |3832.435058                   |2019-09-11                   |
|989856662 |2019-01-01|2019|1      |3800.435058 |2019-09-11 |2018-12-31       |2018                  |4                        |3700.435058                   |2019-09-11                   |
|989856662 |2019-01-02|2019|1      |3900.435058 |2019-09-11 |2019-01-01       |2019                  |1                        |3800.435058                   |2019-09-11                   |
+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+

在这里你的gen_date也可以充当process_date列,您可以使用此比较任何操作的两天数据。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

这个用例可以通过 Spark 的滞后/任何其他功能来完成吗? 的相关文章

  • 从数据框中的列中删除小数

    我有一个数据框 列中有数字 这些数字是小数 我想删除列中的小数和整数 我的数据框expsrs看起来像这样 ENSG00000226823 1 15 14 4947 22 5606 13 5819 5 09327 16 8503 ENSG00
  • 从第二个 DF 中查找一个 DF 中属于同等大小的矩形(由两个点给出)的点的快速(矢量化)方法

    我的数据框 A 如下所示 type latw lngs late lngn 0 1000 45 457966 9 174864 45 458030 9 174907 1 1000 45 457966 9 174864 45 458030 9
  • Sparklyr - 在 Apache Spark Join 中包含空值

    问题在 Apache Spark Join 中包含空值 https stackoverflow com questions 41728762 including null values in an apache spark join有 Sc
  • 如何使用动态名称计算 R 数据框中的多个新列

    我正在尝试在 R 数据框中生成多个新列 变量 并使用从向量中获取的动态新名称 新变量是根据单列的组 级别计算的 数据框包含测量值 counts 不同的化学元素 element 沿深度 z 新变量的计算方法是将特定深度的每个元素的计数除以代理
  • 数据帧上的多个条件

    我正在尝试编写一个新列 is good 如果 value 列中的数据集介于范围 1 到 6当 value2 列位于范围 5 到 10如果不满足这两个条件 则标记为 0 我知道如果你这样做 df is good 1 if x gt 1 and
  • python,在数据框中存储字典

    我构建了一个 pandas 数据框 它在每个单元格中存储一个简单的字典 例如 Sales 0 Revenue 0 我可以通过以下方式从数据帧中检索特定值 df columnA index100 Revenue 但现在我想绘制一个图表 其中包
  • Spark/Yarn:HDFS 上不存在文件

    我在 AWS 上设置了 Hadoop Yarn 集群 有 1 个主服务器和 3 个从服务器 我已经验证我有 3 个活动节点在端口 50070 和 8088 上运行 我在客户端部署模式下测试了 Spark 作业 一切正常 当我尝试使用 Spa
  • df.drop(如果存在)

    下面是一个函数 它接受一个文件并删除列名row num start date end date 问题是并非每个文件都有这些列名 因此该函数返回错误 我的目标是更改代码 以便删除这些列 如果存在 但如果某个列不存在则不会返回错误 def re
  • 使用 pandas 删除停用词

    我想从数据框的列中删除停用词 列内有需要拆分的文本 例如我的数据框如下所示 ID Text 1 eat launch with me 2 go outside have fun 我想应用停用词text column所以应该分开 我试过这个
  • 如何在 Julia 中将列数据类型从浮点更改为字符串?

    我正在尝试将数据框中的一列从浮点数转换为字符串 我努力了 df readtable data csv coltypes String String String String String Float64 Float64 String 但我
  • 如何使用groupby将多个函数应用于Pandas中的多个列?

    我有一个正常的df A pd DataFrame 1 5 2 2 4 4 3 3 1 4 2 2 5 1 4 columns A B C index 1 2 3 4 5 下列的这个食谱 https stackoverflow com que
  • 根据缺少标题的列将文件选项卡到 pandas 数据框中

    如何将带有空列标题的选项卡文件转换为数据框 更具体地说 如何仅使用与相邻未标记列中的字母 在本例中为 P 相对应的值来填充此数据框 这是我正在使用的选项卡文件的表示 请注意 A 或 P 列上缺少标题 gene cell 1 cell 2 M
  • 闪亮错误:参数暗示行数不同

    我正在尝试开发一个简单的应用程序 从 Kijiji 网站获取本地分类广告 我用几乎相同的脚本制作了一个类似的应用程序 但我没有收到下面描述的错误 所以我不知道这个脚本出了什么问题 我尝试了我能想到的一切 但无法让它发挥作用 的结构df数据框
  • 如何从 SparkSQL DataFrame 中的 MapType 列获取键和值

    我的镶木地板文件中有数据 该文件有 2 个字段 object id String and alpha Map lt gt 它被读入 SparkSQL 中的数据帧 其架构如下所示 scala gt alphaDF printSchema ro
  • Python Pandas——用前一列的值向前填充整行

    pandas 开发新手 如何使用先前看到的列中包含的值向前填充 DataFrame 独立的示例 import pandas as pd import numpy as np O 1 np nan 5 np nan H 5 np nan 5
  • 计算例如具有多列 data.frames 的列表中的平均值

    我有几个 data frames 的列表 每个 data frame 有几列 通过使用mean mylist first dataframe a我可以得到这个 data frame 中 a 的平均值 但是我不知道如何计算列表中存储的所有 d
  • Spark 结构化流中具有不同计数的聚合抛出错误

    我正在尝试在 Spark 结构化流中获取 Parentgroup childgroup 和 MountingType 组的唯一 id 代码 下面的代码抛出错误 withWatermark timestamp 1 minutes val ag
  • fetchsize和batchsize对Spark的影响

    我想通过以下方式控制 RDB 的读写速度Spark直接 但标题已经透露的相关参数似乎不起作用 我可以得出这样的结论吗fetchsize and batchsize我的测试方法不起作用 或者它们确实会影响阅读和写作方面 因为测量结果基于规模是
  • R 中的列乘以子字符串

    假设我有一个数据框 其中包含多个组件及其在多个列中列出的属性 并且我想对这些列运行多个函数 我的方法是尝试将其基于每个列标题中的子字符串 但我无法弄清楚如何做到这一点 下面是数据框的示例 Basket F Type 1 F Qty 1 F
  • 使用spark phoenix从表中读取rdd分区号为1

    当我运行我的火花代码时 val sqlContext spark sqlContext val noact table primaryDataProcessor getTableData sqlContext zookeeper table

随机推荐