我对 Spark 和 SQL 还很陌生。我正在尝试向我的 df 添加一列(然后将其保存到 Delta 表),该列为每个记录/行提供唯一的 id,并在每次更新特定记录时递增它。
我试图执行以下操作:
SELECT etc,
CONCAT(somerows1) as id1,
ROW_NUMBER() OVER(PARTITION BY somerows1 ORDER BY (SELECT NULL)) AS versionid
FROM etc
somerows1 是几列的串联,以形成唯一的记录。我对以特定形式排序的记录没有特别的兴趣,这就是我选择 ORDER BY (SELECT NULL) 的原因。
我收到以下错误:
Error in SQL statement: AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;
有谁知道如何解决这个问题?
Thanks
我已经通过使用解决了这个问题为每个批次 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch沉于.writeStream
。这允许您创建一个函数,其中流数据帧被视为静态/批处理数据帧(该函数应用于每个微批次)。
在 Scala 中,代码看起来像这样:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{row_number, lit}
val saveWithWindowFunction = (sourceDf: DataFrame, batchId: Long) => {
val windowSpec = Window
.partitionBy("somerows1")
.orderBy(lit(null))
sourceDf
.withColumn("versionid", row_number().over(windowSpec))
//... save the dataframe using: sourceDf.write.save()
}
随着.writeStream
调用你的函数:
.writeStream
.format("delta")
.foreachBatch(saveWithWindowFunction)
.start()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)