在 Spark 结构化流 2.3.0 中连接两个流时,左外连接不发出空值

2024-05-16

两个流上的左外连接不发出空输出。它只是等待记录添加到另一个流中。使用套接字流来测试这一点。在我们的例子中,我们想要发出具有 null 值的记录,这些记录与 id 不匹配或/且不属于时间范围条件

水印和间隔的详细信息如下:

val ds1Map = ds1
.selectExpr("Id AS ds1_Id", "ds1_timestamp")
.withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
.selectExpr("Id AS ds2_Id", "ds2_timestamp")
.withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
expr(
""" ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= ds1_timestamp + interval 1 minutes """),
"leftOuter")

val query = output.select("*")
.writeStream

.outputMode(OutputMode.Append)
.format("console")
.option("checkpointLocation", "./spark-checkpoints/")
.start()

query.awaitTermination()

谢谢。


这可能是由于微批量架构实现的警告之一所致,如开发人员指南中所述:https://spark.apache.org/docs/latest/structed-streaming-programming-guide.html#semantic-guarantees-of-stream-stream-inner-joins-with-watermarking https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#semantic-guarantees-of-stream-stream-inner-joins-with-watermarking

在微批次引擎的当前实现中,水印在微批次结束时提前,下一个微批次使用更新的水印来清理状态并输出外部结果。由于我们仅在有新数据需要处理时才触发微批次,因此如果流中没有接收到新数据,则外部结果的生成可能会延迟。简而言之,如果连接的两个输入流中的任何一个在一段时间内没有接收到数据,则外部(左或右两种情况)输出可能会延迟。

对我来说就是这种情况,直到稍后触发另一批数据后,空数据才会被刷新

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Spark 结构化流 2.3.0 中连接两个流时,左外连接不发出空值 的相关文章

随机推荐

  • PHP 和 DOM 文档

    我有一个关于 DOMDocument 的使用和创建 XML 的问题 我有一个 PHP 程序 加载到 XML 文件中 处理XML的每个节点 行 将其发送到另一个进程 然后该进程返回一个 XML 元素 我获取节点的字符串表示形式 以便可以创建
  • Php mysql 30秒后执行任务

    如何让 mysql 查询命令在访问 php 站点 30 秒后执行 您可以对执行 mysql 查询的 php 脚本发出 AJAX 请求 在 js 中使用计时器
  • C# 中两个日期之间的周差

    我正在尝试在 C 中创建一个函数 返回两个日期之间的周差 其目标是提供以下相同的结果 select datediff ww 2018 04 13 2018 04 16 as diff 在上面的示例中 这些日期之间只有 3 天 但它们位于不同
  • Angular2 与 ASP.NET 5 [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在将 Angular2 与 ASP NET5 gulp 和 typescript 结合使用 当我解决
  • 在azure中使用terraform为应用程序服务创建自动缩放规则时出错

    resource azurerm monitor autoscale setting test name AutoscaleSetting resource group name azurerm resource group main na
  • 我的 WPF 应用程序中的 SaveFileDialog 异常

    我的一位客户在保存文件时遇到 WPF 应用程序崩溃的情况 我的保存文件代码是 var saveFileDialog new SaveFileDialog InitialDirectory string Concat Environment
  • MySQL 最佳实践:SELECT 子递归尽可能提高性能?

    我想选择一个根项目及其子项 使其性能尽可能高 我更喜欢使用嵌套集模型 但这次表结构遵循邻接模型 有关嵌套集和邻接模型的更多信息 http mikehillyer com articles managing hierarchical data
  • Android:autoLink 在我的 4.4 设备上无法正常工作

    我是 Android 编程新手 我遇到了这个奇怪的问题 我想让列表中的电话号码可点击 他们会向您发送拨号器 现在这在我的 Android 手机上有效 但它似乎只适用于数字10 个字符 但是当我在 4 1 2 的虚拟设备上测试它时 它在列表中
  • Ant设计文件上传中使用customRequest

    我正在使用 Axios 来处理文件上传 我遇到显示文件上传进度的问题 我使用的文件上传视图是 图片卡 HTML
  • 如何处理 PHP 中浮点数的奇怪舍入

    众所周知 浮点运算并不总是完全准确 但是如何处理它的不一致之处呢 As an example in PHP 5 2 9 this doesn t happen in 5 3 echo round 14 99225 4 14 9923 ech
  • Android 选项菜单中的本地图标拉伸以填充菜单项,标题模糊

    我是新人 所以链接和图片是here https sites google com site nolanramy Hi 我可能犯了一个愚蠢的错误 希望找出那是什么 我想在我的应用程序的选项菜单中使用一些标准菜单图标 ic menu refre
  • php 或 zend 中国际电话号码验证的正则表达式是什么?

    我有一个 zend 表单 其中有一个电话号码字段 并且必须检查验证器 我决定为此使用正则表达式 我搜索了谷歌 但我得到的结果不起作用 谁能给我提供正则表达式 这是我的代码 phone new Zend Form Element Text p
  • 如何根据条件删除结果以计算平均值

    我有下面的架构 对其的快速解释是 鲍勃评分为 5 5 詹姆斯评分 1 5 梅西百货评分高达 5 5 逻辑 如果我是 A 请查找我屏蔽的所有人 查阅所有电影评论 任何留下电影评论且 personA 已屏蔽的人 请将其从计算中删除 计算电影的平
  • 如何设置 Android AVD(SDK 2.1 和 2.2)的默认值(语言和键盘)

    我最近升级到 Android SDK 2 1 和 2 2 但 AVD 始终显示活动的中文和日文字符 我可以进入 设置 取消选中这些选项 但这变得很痛苦 如何设置语言和键盘默认值以避免这种麻烦 谢谢 CommonsWare 的评论就是答案 您
  • Java GSON:获取JSONObject下所有键的列表

    我已经将 GSON 作为 Java 中的 JSON 解析器 但密钥并不总是相同 例如 我有以下 JSON 我已经知道的对象 键1 值1 键2 值2 AnotherObject anotherKey1 anotherValue1 anothe
  • jQuery:如何仅根据表标题从表的列中选择值

    我有一个带有标题 ID 的表 我需要选择此标题下的所有字段 我无权访问源代码 并且该表中没有使用任何类 关于如何完成这件事有什么想法吗 要获取第一列 function var col td nth child 1
  • 使用 ng-options 在 AngularJS 中使用 JSON 填充 select

    编辑 我的代码实际上确实有效 我只是一个有不相关问题的白痴 感谢大家的意见 所以我有一个 JSON 对象数组 格式如下 id id1 text text1 id id2 text text2 我想使用这些填充 AngularJS 选择字段
  • curl 无法获取网页内容,为什么?

    我正在使用curl 脚本转到链接并获取其内容以进行进一步操作 以下是链接和curl脚本
  • 使用不同的 conda-build 根目录

    我正在创建我自己的 conda 食谱 我用 git 签出 存储库很少有演出 而不是在结帐 conda bld 我希望结账于 ssd 这会更快 我怎样才能指定它 另外 在进行克隆时如何指定 git 深度 我想结账 ssd这会更快 我怎样才能指
  • 在 Spark 结构化流 2.3.0 中连接两个流时,左外连接不发出空值

    两个流上的左外连接不发出空输出 它只是等待记录添加到另一个流中 使用套接字流来测试这一点 在我们的例子中 我们想要发出具有 null 值的记录 这些记录与 id 不匹配或 且不属于时间范围条件 水印和间隔的详细信息如下 val ds1Map