两个流上的左外连接不发出空输出。它只是等待记录添加到另一个流中。使用套接字流来测试这一点。在我们的例子中,我们想要发出具有 null 值的记录,这些记录与 id 不匹配或/且不属于时间范围条件
水印和间隔的详细信息如下:
val ds1Map = ds1
.selectExpr("Id AS ds1_Id", "ds1_timestamp")
.withWatermark("ds1_timestamp","10 seconds")
val ds2Map = ds2
.selectExpr("Id AS ds2_Id", "ds2_timestamp")
.withWatermark("ds2_timestamp", "20 seconds")
val output = ds1Map.join( ds2Map,
expr(
""" ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND ds2_timestamp <= ds1_timestamp + interval 1 minutes """),
"leftOuter")
val query = output.select("*")
.writeStream
.outputMode(OutputMode.Append)
.format("console")
.option("checkpointLocation", "./spark-checkpoints/")
.start()
query.awaitTermination()
谢谢。
这可能是由于微批量架构实现的警告之一所致,如开发人员指南中所述:https://spark.apache.org/docs/latest/structed-streaming-programming-guide.html#semantic-guarantees-of-stream-stream-inner-joins-with-watermarking https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#semantic-guarantees-of-stream-stream-inner-joins-with-watermarking
在微批次引擎的当前实现中,水印在微批次结束时提前,下一个微批次使用更新的水印来清理状态并输出外部结果。由于我们仅在有新数据需要处理时才触发微批次,因此如果流中没有接收到新数据,则外部结果的生成可能会延迟。简而言之,如果连接的两个输入流中的任何一个在一段时间内没有接收到数据,则外部(左或右两种情况)输出可能会延迟。
对我来说就是这种情况,直到稍后触发另一批数据后,空数据才会被刷新
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)