将 ROW_NUMBER 列添加到流数据帧

2024-03-04

我对 Spark 和 SQL 还很陌生。我正在尝试向我的 df 添加一列(然后将其保存到 Delta 表),该列为每个记录/行提供唯一的 id,并在每次更新特定记录时递增它。

我试图执行以下操作:

SELECT etc,
CONCAT(somerows1) as id1,
ROW_NUMBER() OVER(PARTITION BY somerows1 ORDER BY (SELECT NULL)) AS versionid
FROM etc

somerows1 是几列的串联,以形成唯一的记录。我对以特定形式排序的记录没有特别的兴趣,这就是我选择 ORDER BY (SELECT NULL) 的原因。

我收到以下错误:

Error in SQL statement: AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;

有谁知道如何解决这个问题?

Thanks


我已经通过使用解决了这个问题为每个批次 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch沉于.writeStream。这允许您创建一个函数,其中流数据帧被视为静态/批处理数据帧(该函数应用于每个微批次)。

在 Scala 中,代码看起来像这样:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{row_number, lit}

val saveWithWindowFunction = (sourceDf: DataFrame, batchId: Long) => {

  val windowSpec = Window
    .partitionBy("somerows1") 
    .orderBy(lit(null))
  
  sourceDf
    .withColumn("versionid", row_number().over(windowSpec))

//... save the dataframe using: sourceDf.write.save()
}

随着.writeStream调用你的函数:

  .writeStream
  .format("delta")
  .foreachBatch(saveWithWindowFunction)
  .start()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 ROW_NUMBER 列添加到流数据帧 的相关文章

  • MySQL 按主键排序

    某些 SQL 服务器允许使用通用语句 例如ORDER BY PRIMARY KEY 我不相信这适用于 MySQL 是否有任何此类解决方法可以允许跨多个表自动选择 或者是否需要查找查询来确定主键 我一直在研究的解决方法包括调用SHOW COL
  • Id 或 [TableName]Id 作为主键/实体标识符

    是否首选使用 Id 作为主键的列名或 TableName Id 作为命名约定 表 账户主键 ID 相对 表 账户主键 AccountId 在我见过的实现中 它似乎分为 50 50 左右 每种方法的优点和缺点是什么 跟进 在我的数据库中使用一
  • SqlCommand 参数与 String.Format [重复]

    这个问题在这里已经有答案了 我一直在互联网上搜索 但似乎找不到任何可以解释我的问题的内容 可能是我没有使用正确的搜索字符串 所以我在这里发帖希望有人可以帮助我有了这个 我的程序是使用Visual Studio 2010用C 编写的 我注意到
  • ClassCastException:java.util.Date 无法转换为 java.sql.Date

    你好 我的代码抛出了ClassCastException StackTrace 显示 java lang ClassCastException java util Date cannot be cast to java sql Date a
  • 使用 WHILE 创建虚拟数据

    我尝试使用 a 在表中插入一些虚拟数据WHILE 但它运行得非常非常慢 我在想也许我写的代码不正确 你能看一下并确认一下吗 Insert dummy data DECLARE i int Content int SET i 5001 WHI
  • 如何将 MySQL 数据库更改为 UTC?

    我使用的是 Windows 7 对数据库方面的东西有点陌生 我尝试在 Google 上搜索如何将系统时区更改为 UTC 但文档有些高级 我不太确定如何更改此字段 在 my ini 文件的 mysqld 部分下 添加以下行 default t
  • 如何查找 PostgreSQL 数据库的上次更新时间?

    我正在使用一个批量更新的 postgreSQL 数据库 我需要知道数据库 或数据库中的表 上次更新或修改的时间 两者都可以 我看到 postgreSQL 论坛上有人建议使用日志记录并查询日志 这对我不起作用 因为我无法控制客户端代码库 你可
  • TOAD 将 &String 视为绑定变量

    我正在使用 Oracle Data Integrator 开发一些 ETL 有时会使用 TOAD 测试部分代码 今天我遇到了 TOAD 的问题 我有一行像 AND column value like DEV PROD 当我尝试运行包含上面过
  • sql查询将两列与一列连接起来

    我在 MS Access 2010 中有 2 个表 如下所示 USERS u id u name LOAN l id l from ref users u id l to ref users u id l amount Users u id
  • 如何在 sqlalchemy 中创建基于文字的查询?

    我创建了一个函数来创建表达式 def test operator1 operation operator2 return literal column operator1 op operation operator2 现在当我用 test
  • 如何在SSRS中的表上创建热图?

    如何在 SSRS 中创建这样的内容 颜色将根据行中的值 承销商 从红色变为绿色 所有这些都在一个组中 您可以通过右键单击各个单元格并根据表达式设置填充颜色来完成此操作 In the Image below I ve mistakingly
  • 使用Powershell访问远程Oracle数据库

    我需要能够连接到我的网络上基于 Windows 7 的 Oracle 服务器 32 位 Oracle XE 我需要连接的机器运行 Windows 7 64 位 两台机器上都安装了 Powershell 我已在 64 位计算机上安装了 Ora
  • 根据表sql中的行替换字符串中的字符

    我需要用一些映射的字符替换字符串中的字符列表 我有一个表 dbo CharacterMappings 有 2 列 CharacterToFilter 和 ReplacementCharacter 假设这个表中有3条记录 Filter Rep
  • 如何在Oracle中使用Timestamp_to_scn和Scn_to_timestamp?

    我的查询结果是这样的 select cast to date a start time mm dd yyyy hh mi ss pm as timestamp date of call ora rowscn from calling tab
  • 如何在 SQL Server 中使用 nvarchar 变量为 unicode 用户添加前缀“N”?

    如何在 SQL Server 中使用 nvarchar 变量为 unicode 用户添加前缀 N 例如 给定这个变量 declare Query1 nvarchar max 我可以这样分配它 set Query1 N 但是如果我想使用怎么办
  • 在sqlite SQL语句中与order by子句结合使用limit

    下面的两条 SQL 语句总是会产生相同的结果集吗 1 SELECT FROM MyTable where Status 0 order by StartTime asc limit 10 2 SELECT FROM SELECT FROM
  • 在一个数据访问层中处理多个连接字符串

    我有一个有趣的困境 我目前有一个数据访问层 它必须与多个域一起使用 并且每个域都有多个数据库存储库 具体取决于所调用的存储过程 目前 我只需使用 SWITCH 语句来确定应用程序正在运行的计算机 并从 Web config 返回适当的连接字
  • 使用 where 进行 select 语句时,HSQLDB 用户缺乏权限或未找到对象错误

    我的数据库使用 SQuirrel SQL 客户端版本 3 5 3 和 HSQLDB 我已经能够为其指定相应的驱动程序 内存中 并创建一个别名 我创建了一个表 CREATE TABLE ENTRY NAME VARCHAR 100 NOT N
  • 在 SQL 数据库中存储“列表”的最正确方法是什么?

    因此 我读了很多关于如何将多个值存储到一个列中是一个坏主意 并且违反了数据标准化的第一条规则 令人惊讶的是 这不是 不要谈论数据标准化 所以我需要一些帮助 目前我正在为我工 作的地方设计一个 ASP NET 网页 我想根据此人所属的 Act
  • 对多个数据库执行 SQL 查询

    我知道我的帖子与该论坛中的其他帖子的标题非常相似 但我真的找不到我需要的答案 这是我的问题 我的 Windows Server 上运行着 SQL Server 在我的 SQL Server 中 我有大约 30 个数据库 它们都具有相同的表和

随机推荐