Spark Scala:使用分析函数获取累积和(运行总计)

2023-12-31

我正在使用窗口函数在 Spark 中实现累积和。 但应用窗口分区功能时,记录输入的顺序不会保持

输入数据:

val base = List(List("10", "MILLER", "1300", "2017-11-03"), List("10", "Clark", "2450", "2017-12-9"), List("10", "King", "5000", "2018-01-28"),
  List("30", "James", "950", "2017-10-18"), List("30", "Martin", "1250", "2017-11-21"), List("30", "Ward", "1250", "2018-02-05"))
  .map(row => (row(0), row(1), row(2), row(3)))

val DS1 = base.toDF("dept_no", "emp_name", "sal", "date")
DS1.show()
+-------+--------+----+----------+
|dept_no|emp_name| sal|      date|
+-------+--------+----+----------+
|     10|  MILLER|1300|2017-11-03|
|     10|   Clark|2450| 2017-12-9|
|     10|    King|5000|2018-01-28|
|     30|   James| 950|2017-10-18|
|     30|  Martin|1250|2017-11-21|
|     30|    Ward|1250|2018-02-05|
+-------+--------+----+----------+

预期输出:

+-------+--------+----+----------+-----------+
|dept_no|emp_name| sal|      date|Dept_CumSal|
+-------+--------+----+----------+-----------+
|     10|  MILLER|1300|2017-11-03|     1300.0|
|     10|   Clark|2450| 2017-12-9|     3750.0|
|     10|    King|5000|2018-01-28|     8750.0|
|     30|   James| 950|2017-10-18|      950.0|
|     30|  Martin|1250|2017-11-21|     2200.0|
|     30|    Ward|1250|2018-02-05|     3450.0|
+-------+--------+----+----------+-----------+

我尝试过以下逻辑

val baseDepCumSal = DS1.withColumn("Dept_CumSal", sum("sal").over(Window.partitionBy("dept_no").
  orderBy(col("sal"), col("emp_name"), col("date").asc).
  rowsBetween(Long.MinValue, 0)
))

baseDepCumSal.orderBy("dept_no", "date").show
+-------+--------+----+----------+-----------+
|dept_no|emp_name| sal|      date|Dept_CumSal|
+-------+--------+----+----------+-----------+
|     10|  MILLER|1300|2017-11-03|     1300.0|
|     10|   Clark|2450| 2017-12-9|     3750.0|
|     10|    King|5000|2018-01-28|     8750.0|
|     30|   James| 950|2017-10-18|     3450.0|
|     30|  Martin|1250|2017-11-21|     1250.0|
|     30|    Ward|1250|2018-02-05|     2500.0|
+-------+--------+----+----------+-----------+

对于 dept_no=10,记录按预期顺序计算,而对于 dept_no=30,记录不按输入顺序计算。


发生这种情况是因为类型不正确。因为工资是一个string

DS1.printSchema
root
 |-- dept_no: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- sal: string (nullable = true)
 |-- date: string (nullable = true)

它按字典顺序排序:

DS1.orderBy("sal").show
+-------+--------+----+----------+
|dept_no|emp_name| sal|      date|
+-------+--------+----+----------+
|     30|  Martin|1250|2017-11-21|
|     30|    Ward|1250|2018-02-05|
|     10|  MILLER|1300|2017-11-03|
|     10|   Clark|2450| 2017-12-9|
|     10|    King|5000|2018-01-28|
|     30|   James| 950|2017-10-18|
+-------+--------+----+----------+ 

要获得所需的结果,您必须进行强制转换(并且不需要框架定义):

DS1.withColumn("Dept_CumSal", sum("sal").over(
  Window
     .partitionBy("dept_no")
    .orderBy(col("sal").cast("integer"), col("emp_name"), col("date").asc))).show
+-------+--------+----+----------+-----------+                                  
|dept_no|emp_name| sal|      date|Dept_CumSal|
+-------+--------+----+----------+-----------+
|     30|   James| 950|2017-10-18|      950.0|
|     30|  Martin|1250|2017-11-21|     2200.0|
|     30|    Ward|1250|2018-02-05|     3450.0|
|     10|  MILLER|1300|2017-11-03|     1300.0|
|     10|   Clark|2450| 2017-12-9|     3750.0|
|     10|    King|5000|2018-01-28|     8750.0|
+-------+--------+----+----------+-----------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Scala:使用分析函数获取累积和(运行总计) 的相关文章

  • 在 SELECT IN 中使用 Oracle 参数时出现问题

    我在将一串数字插入sql查询时遇到问题 SELECT FROM tablename a WHERE a flokkurid IN 3857 3858 3863 3285 ORDER BY sjodategund rodun or SELEC
  • 如何找到多个列中的最小值

    我在我的 DB 3 col 中有一个值 我想在所有这些值中找到一个值 如下所述 表名 MyTable id col1 col2 col3 1 200 300 400 2 100 150 300 3 800 102 20 4 80 80 0
  • 如何在 apache Spark 作业中执行阻塞 IO?

    如果当我遍历 RDD 时 我需要通过调用外部 阻塞 服务来计算数据集中的值怎么办 您认为如何才能实现这一目标 值 Future RDD Double Future sequence tasks 我尝试创建一个 Futures 列表 但由于
  • 如何在mysql中选择具有相同值集的列?

    我的桌子是 patients pid name city disease did dname has disease did pid 我想列出具有相同疾病组的患者 pid 和 did 分别是患者和疾病表中的主键 并且是 has diseas
  • 删除连接到另一表 SQL 的一个表中的记录

    我有两个表 一个包含 212 000 条记录 已弃用的记录 另一个包含 10 500 000 条记录 我想在 id 和 version number 字段上连接两个表 因为两个表都有这些字段 我希望从连接表中删除匹配的记录 来自连接表 即从
  • SQL 错误:“没有这样的表”

    我试图解决为什么我的代码为所有查询返回 null 的原因 最后发现 sql 查询什么也没有返回 我使用简约代码创建了一个新的 AIR 文档 s WindowedApplication
  • Scalatest PlusPlay Selenium 无法调整窗口大小

    对此已经研究了一段时间 我似乎找不到使用 scalatest plus 调整窗口大小的方法 我发现在线搜索或文档的唯一方法http doc scalatest org 2 1 5 index html org scalatest selen
  • 如何在 DB2 AS/400 中将小数字段转换为日期字段?

    我有一个 DECIMAL 字段 其中包含 AS400 格式的日期 1100614 我努力了 cast MYDATE as DATE 但我无法将 DECIMAL 转换为 DATE 而 DATE MYDATE 返回空值 如何将此字段转换为日期字
  • Linq To SQL - 拥有和分组依据

    我下面这个查询工作正常 不过我想使用 Linq 来实现它 select u ID u NAME from Task t join BuildingUser bu ON bu ID BUILDING t ID BUILDING join Us
  • SQL:如何在按部分分组的查询中使用子查询?

    如何在按部分分组的查询中使用子查询 我使用 SQL Server 2008 R2 和 Delphi 2010 我收到此错误 Cannot perform an aggregate function on an expression cont
  • Spark EC2 SSH连接错误SSH返回代码255

    每次我尝试通过 Spark ec2 spark ec2 py 文件在 AWS 上启动 Spark 集群时 都会收到 SSH 连接错误 最终解决了 但是浪费了很多时间 在您将其标记为重复之前 我知道有很多类似的问题被问到 但有两个关键区别 a
  • 不带 GROUP BY 的聚合查询

    这个查询似乎在我的旧机器上完美运行 但是 在我的 MySQL 5 7 14 和 PHP 5 6 25 的新机器上 它会抛出错误 致命错误 未捕获异常 PDOException 并带有消息 SQLSTATE 42000 语法错误或访问冲突 1
  • SQL Server 2008R2 和创建 XML 文档

    论坛上的第一篇文章 因为我真的被这个问题困住了 以下查询正确地将有效的 XML 文档分配给 xTempXML 变量 类型为 xml 注 文档的长度 转换为varchar max 711 select xTempXML select Pres
  • Reporting Services 在哪里存储其日志文件

    最相关的谷歌结果似乎表明 为了访问日志 我们必须将您自己的日志表部署到数据库并制作报告服务写入它 http technet microsoft com en us library ms157403 aspx 简而言之 Reporting S
  • 如何搜索例程的内容/(SP-触发函数)

    我需要在数据库内所有例程的例程主体 存储过程 函数 触发器 中搜索文本 我该怎么做 Thanks SELECT OBJECT NAME object id FROM sys sql modules WHERE definition LIKE
  • Spark 数据帧:根据另一列的值提取一列

    我有一个包含带有连接价目表的交易的数据框 paid currency EUR USD GBP 49 5 EUR 99 79 69 客户已支付 49 5 欧元 如 货币 列中所示 我现在想将支付的价格与价目表中的价格进行比较 因此 我需要根据
  • scala中的协变类型参数需要在java接口中保持不变

    我有一个看起来像这样的特征 一些进一步的信息可以在我自己提出了这个相关问题 https stackoverflow com questions 3695990 inheritance and automatic type conversio
  • st_intersects 与 st_overlaps

    这两个查询有什么区别 select a gid sum length b the geom from polygons as a roads as b where st intersects a the geom b the geom gr
  • 在 SQL Server 上执行分页的最佳方式是什么?

    我有一个数据库超过200万记录 我需要执行分页以在我的 Web 应用程序上显示 该应用程序每页必须有 10 条记录DataGrid 我已经尝试使用ROW NUMBER 但是这种方式会选择所有 200 万条记录 然后只得到 10 条记录 我也
  • 关于 scala.math.Integral 的问题

    有什么方法mkNumericOps andmkOrderingOps of scala math Integral http www scala lang org api current scala math Integral html我们

随机推荐