带有 Spark 流的多个 writeStream

2023-12-06

我正在使用 Spark Streaming,在尝试实现多个写入流时遇到一些问题。 下面是我的代码

DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)

其中 writeStreamer 定义如下:

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {

  val query = input
                .writeStream
                .format("orc")
                .option("checkpointLocation", checkPointFolder)
                .option("path", output)
                .outputMode(OutputMode.Append)
                .start()

  query.awaitTermination()
}

我面临的问题是只有第一个表是用 Spark writeStream 写入的,所有其他表都没有发生任何情况。 请问您对此有什么想法吗?


query.awaitTermination()需要被完成after最后一个流已创建。

writeStreamer可以修改函数以返回StreamingQuery而不是在那时等待终止(因为它是blocking):

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
  input
    .writeStream
    .format("orc")
    .option("checkpointLocation", checkPointFolder)
    .option("path", output)
    .outputMode(OutputMode.Append)
    .start()
}

那么你将拥有:

val query1 = DataWriter.writeStreamer(...)
val query2 = DataWriter.writeStreamer(...)
val query3 = DataWriter.writeStreamer(...)

query3.awaitTermination()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

带有 Spark 流的多个 writeStream 的相关文章

随机推荐

  • 使用Java检测互联网连接[重复]

    这个问题在这里已经有答案了 可能的重复 如何检查java中是否存在互联网连接 我想看看是否有人有一种简单的方法来检测使用 Java 时是否有互联网连接 当前的应用程序在 Windows 中使用了 WinInit DLL 中的 Interne
  • 保存用户选择的语言,Android

    如何更改默认 string xml 我可以将我的应用程序更改为五种不同的语言 但每当我关闭应用程序并重新打开它时 它会恢复为默认英语 是否可以以某种方式更改默认文件夹 值 例如 从值 英语 默认为值 pt 葡萄牙语 任何帮助深表感谢 你可以
  • FCM接收消息问题

    我正在将 FCM 集成到我的项目中 但在收到通知时 我在日志中收到以下错误 错误文字颜色为蓝色 W FirebaseMessaging Received message with unknown type text 我的代码是这样的 pub
  • Postgresratio_to_report函数

    有人可以告诉我如何安装分析功能 特别是报告比率Postgres 数据库中的函数 我尝试在 postgres 提供的模块中搜索 但没有看到包含该函数的模块 报告比率 RATIO TO REPORT 是一个分析函数 它计算一个值与一组值之和的比
  • 如何通过pip安装MySQL-python?

    venv bin pip install MySQL python Collecting MySQL python Downloading MySQL python 1 2 5 zip 108kB 100 112kB 1 2MB s Com
  • 绘制误差线(百分位数)

    我对 python 很陌生 我需要一些帮助 我想在绘图上绘制相当于 1sigma 标准差的误差条 作为分布的第 16 个和第 84 个百分位值 我尝试过 使用 matplotlib err np std x 但它只是给了我标准差 谢谢 如果
  • 尝试用 PHP 解析 JSON

    我是 php 新手 这确实难倒了我 我正在尝试解析这个 json 以获取值match id result status 1 num results 1 total results 500 results remaining 499 matc
  • 为什么我不能对一个打开的文件调用 read() 两次?

    对于我正在做的练习 我尝试使用以下命令读取给定文件的内容两次read 方法 奇怪的是 当我第二次调用它时 它似乎没有以字符串形式返回文件内容 这是代码 f f open get the year match re search r Popu
  • Ubuntu 16.04 LTS 上的 PCL(点云库)1.7 构建错误

    我将 Ubuntu 版本从 14 04 lts 更新到 16 04 lts 在构建利用点云库的项目时遇到问题 它曾经在 Ubuntu 14 04 上运行良好 我使用 qtcreator 构建我的项目 我收到的警告消息是 警告 usr lib
  • 从向量中提取时使用 NA 作为索引

    在下面的代码中 x lt 1 8 x NA 我期待着一个TRUE or FALSE回答但我有八个NA反而 我发现了is na提供了TRUE FALSE我正在寻找的 但是 我仍然不确定为什么用向量子集NA结果是NA 有什么解释吗 来自索引中的
  • 在列表中查找具有给定属性值的对象,然后查找字典值

    我有一个对象列表 这些对象中的每一个都有一个Name财产 以及ObservablePairCollection这只是一个自定义字典 其工作方式与字典完全相同 具有键 值对 给定两个字符串 一个用于名称 一个用于键 我想找到第一个与给定名称匹
  • 使用 selenium 运行的无头 Chrome

    System setProperty webdriver chrome driver usr bin google chrome final ChromeOptions chromeOptions new ChromeOptions chr
  • 使 tkinter 窗口出现在所有其他窗口之上

    usr bin env python Display window with toDisplayText and timeOut of the window from Tkinter import def showNotification
  • 使用服务帐户凭据的 GDrive 导出失败并显示 404

    我有一个使用 OAuth 客户端从 GDrive 文件导出文本的脚本 效果非常好 import googleapiclient discovery as google from apiclient http import MediaIoBa
  • 在 Python 中复制嵌套列表

    我想复制一个二维列表 这样如果我修改一个列表 另一个列表就不会被修改 对于一维列表 我只是这样做 a 1 2 b a 现在如果我修改b a没有被修改 但这不适用于二维列表 a 1 2 3 4 b a 如果我修改b a也会被修改 我该如何解决
  • 如何从基本控制器中的 OnActionExecuting 重定向?

    我尝试了两种方法 Response Redirect 不执行任何操作 以及调用基本控制器内部的新方法返回 ActionResult 并让它返回 RedirectToAction 这些都不起作用 如何从 OnActionExecuting 方
  • 如何修复“无法加载 TClassName 的单元 UnitName 符号信息。您想尝试自己查找此文件吗?”

    在delphi 中 创建ActiveX 控件曾经比现在更流行 然而 仍然可以使用 Delphi 创建 ActiveX 控件 这个问题假设Delphi 2007 但无论你使用什么Delphi版本 它都应该是相同的 当您创建 ActiveX 控
  • 移动文本模式光标不起作用

    我一直致力于在我目前正在开发的操作系统中移动文本模式光标 我根本无法让它显示出来 这是我用来更新光标的代码 void update cursor unsigned char cursor loc y pos Cols x pos curso
  • 如何确定给定 wxWidgets 中当前字体的字符串的大小

    有没有办法根据 C wxWidgets中当前选择的字体确定给定字符串的显示长度 以像素为单位 例如 如果我打印出字符串 Speed 并希望在 和后面的值之间放置 10 个像素 我需要知道 Speed 字符串有多长 有没有办法确定这一点 我似
  • 带有 Spark 流的多个 writeStream

    我正在使用 Spark Streaming 在尝试实现多个写入流时遇到一些问题 下面是我的代码 DataWriter writeStreamer firstTableData parquet CheckPointConf firstChec