我有一个在 Spark 集群上运行的 Python 程序,有四个工作线程。它处理一个包含大约 1500 万条记录的巨大 Oracle 表。检查结果后发现大约有600万条记录没有插入。我的写入功能如下:
df.write.format('jdbc').options(
url=spark_write_url,
driver='oracle.jdbc.driver.OracleDriver',
dbtable=dest_table_name,
user=username,
password=password).mode('append') \
.save()
我查了一下大师:
14:20:22 INFO Master: Telling app of lost executor: 1
14:20:22 INFO Master: Telling app of lost worker: worker-20230212103757-1.1.1.1-44269
14:20:22 WARN Master: Removing worker-20230212103834-1.1.1.3-36115 because we got no heartbeat in 60 seconds
14:20:22 INFO Master: Removing worker worker-20230212103834-1.1.1.3-36115 on 1.1.1.3:36115
14:20:22 INFO Master: Telling app of lost executor: 3
14:20:22 INFO Master: Telling app of lost worker: worker-20230212103834-1.1.1.3-36115
另外,我检查工人日志:
14:19:52 WARN Executor: Issue communicating with driver in heartbeater org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds].
This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:996)
at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212)
at org.apache.spark.executor.Executor$$Lambda$356/1672492577.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:293)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
... 14 more
我的输出表有主键ID这肯定是独一无二的。但是,当我检查日志时,看到以下错误:
14:20:53,547 - INFO - functions - Dest Table : trx, Exception in Write Spark :An error occurred while calling o2192.save.
:org.apache.spark.SparkException:由于阶段失败而中止作业:阶段274.0中的任务48失败4次,最近一次失败:阶段274.0中丢失任务48.3(TID 10159)(1.1.1.4执行程序0):java.sql .BatchUpdateException:ORA-00001:违反唯一约束(ID_PK)。
我有三个问题,请您指导一下?
First,Spark Standalone中worker节点丢失时如何管理数据?
Second、Spark如何插入数据?是否批量插入数据?例如,当 Spark 插入一个 Bulk 并且该 Bulk 出现如下错误时,unique constraint violated
,Spark忽略Bulk而不插入?
Three,如何在不手动检查结果的情况下确保Spark程序正确完成工作?
非常感谢任何帮助。