Spark:出现心跳错误后丢失数据

2024-05-11

我有一个在 Spark 集群上运行的 Python 程序,有四个工作线程。它处理一个包含大约 1500 万条记录的巨大 Oracle 表。检查结果后发现大约有600万条记录没有插入。我的写入功能如下:

df.write.format('jdbc').options(
        url=spark_write_url,
        driver='oracle.jdbc.driver.OracleDriver',
        dbtable=dest_table_name,
        user=username,
        password=password).mode('append') \
        .save()

我查了一下大师:

14:20:22 INFO Master: Telling app of lost executor: 1
14:20:22 INFO Master: Telling app of lost worker: worker-20230212103757-1.1.1.1-44269
14:20:22 WARN Master: Removing worker-20230212103834-1.1.1.3-36115 because we got no heartbeat in 60 seconds
14:20:22 INFO Master: Removing worker worker-20230212103834-1.1.1.3-36115 on 1.1.1.3:36115
14:20:22 INFO Master: Telling app of lost executor: 3
14:20:22 INFO Master: Telling app of lost worker: worker-20230212103834-1.1.1.3-36115

另外,我检查工人日志:

 14:19:52 WARN Executor: Issue communicating with driver in heartbeater org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. 
 This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:996)
at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212)
at org.apache.spark.executor.Executor$$Lambda$356/1672492577.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:293)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
... 14 more

我的输出表有主键ID这肯定是独一无二的。但是,当我检查日志时,看到以下错误:

14:20:53,547 - INFO - functions - Dest Table : trx, Exception in Write Spark :An error occurred while calling o2192.save.

:org.apache.spark.SparkException:由于阶段失败而中止作业:阶段274.0中的任务48失败4次,最近一次失败:阶段274.0中丢失任务48.3(TID 10159)(1.1.1.4执行程序0):java.sql .BatchUpdateException:ORA-00001:违反唯一约束(ID_PK)。

我有三个问题,请您指导一下?

First,Spark Standalone中worker节点丢失时如何管理数据?

Second、Spark如何插入数据?是否批量插入数据?例如,当 Spark 插入一个 Bulk 并且该 Bulk 出现如下错误时,unique constraint violated,Spark忽略Bulk而不插入?

Three,如何在不手动检查结果的情况下确保Spark程序正确完成工作?

非常感谢任何帮助。


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark:出现心跳错误后丢失数据 的相关文章

随机推荐

  • 如何从 WinRT StreamSocket 读取所有可用数据并清空 inputStream?

    我想在向套接字写入新数据之前读取当前正在等待套接字的所有数据 WinRT中的读取方法都是异步的 所以我不能简单地while直到套接字为空 由于我确实想丢弃套接字上的数据 因此我不想使用读取器 而是直接从套接字读取数据IInputStream
  • 改变换行行为

    我可以在 TextView 中使用 Spannable 创建具有不同外观 下划线 删除线等的跨度 我怎样才能做同样的事情来改变换行行为 特别是 我不希望电子邮件地址在中间换行 我希望它像一个单词一样 I tried 包裹在一起跨度 http
  • 如何在Python中获取套接字的外部IP?

    当我打电话时socket getsockname 在套接字对象上 它返回我的机器的内部 IP 和端口的元组 但是 我想找回我的外部IP 最便宜 最有效的方式是什么 如果没有外部服务器的配合 这是不可能的 因为您和另一台计算机之间可能存在任意
  • CSS 3.0 用户选择属性替换

    我正在使用 CSS 3 0 它抱怨 用户选择 属性不存在 有谁知道合适的替代品或替代品是什么 user select is 回到规范 https drafts csswg org css ui 4 propdef user selectCS
  • 预编译头和 Visual Studio

    有没有办法设置 Visual Studio 解决方案参数 以便它只创建预编译头而不构建整个解决方案 具体来说 它是一个巨大的 C 解决方案 本身有许多项目 谢谢 仅选择 pch 创建者源文件 通常是 stdafx cpp 然后编译该文件 C
  • [karma-server]:类型错误:无法读取未定义的属性“范围” - CI 环境中的 Angular 单元测试

    我们的 CI CD 管道停止处理 ng test 作业并失败并显示以下错误消息 karma server TypeError Cannot read property range of undefined at handleRangeHea
  • 如何从不同的线程访问控件?

    如何从创建控件的线程以外的线程访问控件 避免跨线程错误 这是我的示例代码 private void Form1 Load object sender EventArgs e Thread t new Thread foo t Start p
  • 使用 sapply 的列表和矩阵

    我有一个也许是基本的问题 我在网上搜索过 我在读取文件时遇到问题 尽管如此 我还是按照 Konrad的建议设法读取了我的文件 我很欣赏这一点 How to get R to read in files from multiple subdi
  • Git 显示更改后的相同文件

    当我似乎无法弄清楚更改时 Git 向我显示整个文件已更改 这是 cygwin git 但它也发生在 msysgit 中 git version git version 2 1 1 diff lt git show HEAD File cs
  • 无法将参数从 `const char *` 转换为 `char *`

    鉴于此代码 void group build int size std string ips Build the LL after receiving the member list from bootstrap head new memb
  • 使用XMLHttpRequest自动网页刷新内存泄漏

    问候 我一直在为一些使用 8 位微控制器的硬件开发网络界面 该网页使用 HTML javascript JSON 和 XHR XMLHttpRequest 进行通信 我想做的是创建一个页面 使用 setInterval 使用控制器中的新值每
  • 选择initializer_list迭代器定义

    Why std initializer list
  • 从内存流播放视频文件

    只是好奇看看这是否可能 我有一个 Windows 应用程序 它从我的电脑上的 avi 文件读取所有字节 然后将其存储在 byte 中 现在我的内存中有 avi 文件 我想直接从内存将其加载到某种视频播放器控件中 我尝试过使用 wmplaye
  • Javascript 警报/消息框中的欧元符号或其他实体

    有谁知道我如何在 javascript 警报窗口中显示欧元或其他 html 实体 alert u20AC HTML 实体字符查找 http leftlogic com lounge articles entity lookup
  • 从delphi应用程序调用.net4.0 com服务器后出现错误异常

    我们正在将代码库从 BDS2006 迁移到 Rad Studio XE 我们发现了一些非常奇怪的行为 如果我们在从 Net4 0 中实现的 COM 服务器创建一些对象后进行无效的浮点运算 即除以零 我们不会没有得到正常异常 即 EDivis
  • 为什么像 BindingList 或 ObservableCollection 这样的类不是线程安全的?

    我一次又一次发现自己必须编写 BindingList 和 ObservableCollection 的线程安全版本 因为当绑定到 UI 时 这些控件无法从多个线程更改 我想理解的是why情况就是这样 这是设计错误还是故意的 问题是设计一个线
  • IE11 元元素破坏 SVG

    我已将 SVG 文件数据直接嵌入到我的 html 中 它在 Chrome 和 Firefox 中显示 但在 IE11 中根本不显示 SVG 的 Pastebin 链接是http pastebin com eZpLXFfD http past
  • Webpack中watch编译时加速scss的方法

    太长了 滚动到底部寻找答案 或者将 Webpack 和 Dart Sass VM 结合起来 https github com sass dart sass releases https github com sass dart sass r
  • 在 .NET Core 中从 HttpResponseMessage 转换为 IActionResult

    我正在将之前在 NET Framework 中编写的一些代码移植到 NET Core 我有这样的事情 HttpResponseMessage result await client SendAync request if result St
  • Spark:出现心跳错误后丢失数据

    我有一个在 Spark 集群上运行的 Python 程序 有四个工作线程 它处理一个包含大约 1500 万条记录的巨大 Oracle 表 检查结果后发现大约有600万条记录没有插入 我的写入功能如下 df write format jdbc