如何从 Dataproc 上的检查点重新启动 Spark Streaming 作业?

2023-12-03

这是后续dataproc 上的 Spark 流抛出 FileNotFoundException

在过去的几周里(不确定从什么时候开始),重新启动 Spark 流作业,即使使用“kill dataproc.agent”技巧也会抛出此异常:

17/05/16 17:39:02 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at stream-event-processor-m/10.138.0.3:8032
17/05/16 17:39:03 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1494955637459_0006
17/05/16 17:39:04 ERROR org.apache.spark.SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.
    at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)
    at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:62)
    at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:149)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:497)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258)
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:140)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:826)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:826)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:826)
    at com.thumbtack.common.model.SparkStream$class.main(SparkStream.scala:73)
    at com.thumbtack.skyfall.StreamEventProcessor$.main(StreamEventProcessor.scala:19)
    at com.thumbtack.skyfall.StreamEventProcessor.main(StreamEventProcessor.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/05/16 17:39:04 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@5555ffcf{HTTP/1.1}{0.0.0.0:4479}
17/05/16 17:39:04 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
17/05/16 17:39:04 ERROR org.apache.spark.util.Utils: Uncaught exception in thread main
java.lang.NullPointerException
    at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152)
    at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1360)
    at org.apache.spark.SparkEnv.stop(SparkEnv.scala:87)
    at org.apache.spark.SparkContext$$anonfun$stop$11.apply$mcV$sp(SparkContext.scala:1797)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1290)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1796)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:565)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258)
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:140)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:826)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:826)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:826)
    at com.thumbtack.common.model.SparkStream$class.main(SparkStream.scala:73)
    at com.thumbtack.skyfall.StreamEventProcessor$.main(StreamEventProcessor.scala:19)
    at com.thumbtack.skyfall.StreamEventProcessor.main(StreamEventProcessor.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.
    at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)
    at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:62)
    at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:149)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:497)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258)
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:140)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:826)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:826)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:826)
    at com.thumbtack.common.model.SparkStream$class.main(SparkStream.scala:73)
    at com.thumbtack.skyfall.StreamEventProcessor$.main(StreamEventProcessor.scala:19)
    at com.thumbtack.skyfall.StreamEventProcessor.main(StreamEventProcessor.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Job output is complete

如何从 Dataproc 集群上的检查点重新启动 Spark 流作业?


我们最近为 dataproc 作业添加了自动重启功能(可在gcloud beta跟踪并进入v1 API).

要利用自动重启的优势,作业必须能够恢复/清理,这样它就无法工作most无需修改的作业。然而,它does开箱即用带有检查点文件的 Spark 流.

不再需要 restart-dataproc-agent 技巧。自动重启可以抵御作业崩溃、Dataproc 代理故障和迁移时虚拟机重启事件。

例子:gcloud beta dataproc jobs submit spark ... --max-failures-per-hour 1

See: https://cloud.google.com/dataproc/docs/concepts/restartable-jobs

如果您想测试恢复,可以通过重新启动主虚拟机来模拟虚拟机迁移[1]。之后您应该能够描述该工作 [2] 并了解ATTEMPT_FAILUREstatusHistory 中的条目。

[1] gcloud compute instances reset <cluster-name>-m

[2] gcloud dataproc jobs describe

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 Dataproc 上的检查点重新启动 Spark Streaming 作业? 的相关文章

随机推荐

  • 在 Xcode 8.2 中编译大型数组文字

    使用 Swift 存储大型多维数组的最佳方法是什么 我有一个 4D 整数数组 它似乎在 Xcode 8 2 中减慢了编译速度 因为它大约有 200 组 9 个整数 总共 1800 个 第一个数组由 12 个数组组成 每个数组有 8 个数组
  • 如何按日期对数据框进行排序

    我需要在 R 中按日期对数据框进行排序 日期均采用 dd mm yyyy 的形式 日期位于第三列 列标题是 V3 我已经了解了如何按列对数据框进行排序 并且了解了如何将字符串转换为日期值 我无法将两者结合起来以便按日期对数据框进行排序 假设
  • 在 Python 3 中从同一包内和包外导入模块

    好吧 场景很简单 我有这个文件结构 interface py pkg init py mod1 py mod2 py 现在 这些是我的条件 mod2需要导入mod1 interface py 和 mod2 都需要作为主脚本独立运行 如果您愿
  • 如何捕获 PHP 类型提示中的“可捕获的致命错误”?

    我正在尝试在我的一堂课上实现 PHP5 的类型提示 class ClassA public function method a ClassB b class ClassB class ClassWrong 正确用法 a new ClassA
  • 使用 VS2010 Async CTP 的主要风险与好处是什么?

    我想用Visual Studio 异步 CTP 版本 3 在 Windows XP SP3 上的 VS2010 SP1 中进行开发和测试主要是因为我的客户 以及我 在 Windows XP SP3 上 Ghere在MSDN论坛上有相关讨论
  • 在 Excel 中使用 VBA 将文本文件通过 FTP 传输到服务器

    我有一个 Excel 工作表 用户在其中输入某些数据 我想将其存储在文本文件中并使用 FTP 上传到服务器 一个站点建议添加对 Microsoft Internet Transfer Control 的引用 然后定义一个 Inet 对象来执
  • 即时应用程序在 adview.loadAd 上崩溃,SecurityException:无法找到提供程序 com.google.android.gsf.gservices

    我使用的是 Play Services ads 10 2 6 它与已安装的应用程序配合得很好 这是堆栈跟踪 java lang SecurityException Failed to find provider com google and
  • 编写 Rx“RetryAfter”扩展方法

    在书里介绍ToRx作者建议为 I O 编写一个 智能 重试 在一段时间后重试 I O 请求 例如网络请求 这是确切的段落 添加到您自己的库中的一个有用的扩展方法可能是 返回 关闭并重试 方法 我合作过的团队发现了这样一个方法 该功能在执行
  • 如何确定 Python 类的每个属性和方法的定义位置?

    给定 Python 中某个类的实例 能够确定源代码的哪一行将很有用defined每个方法和属性 例如实现1 例如 给定一个模块 ab py class A object z 1 q 2 def y self pass def x self
  • 将文件夹的文件夹中的文件重命名为其父文件夹?

    我有一批文件夹 其名称基于日期 每个文件夹都有一个文件夹 其中的文件名全部相同 有没有办法重命名文件 使它们根据它们所在的目录结构 显示的是基于日期的父文件夹 第一个文件夹 而变得唯一 user date 1 2 2019 ABC 0001
  • 在 MS Access 2010 中使用正则表达式替换列

    ms access 2010中有一个名为sample的表 仅包含一列body 类型 文本
  • 完美转发与 const 引用

    我有一个简单的问题我不明白 int solution int a int b int main int a b std cin gt gt a gt gt b std cout lt lt solution std forward
  • Git:删除超过 1 年的提交

    我有一个 Web 应用程序 使用 git 不仅可以管理源代码控制 还可以部署更改 我将更改推送到 github 上的远程存储库 并且我的网络服务器有一个 webhook 然后根据这些更改进行更新 现在我注意到我的本地 git 存储库大约有
  • 我使用字典是否错误,看起来太慢了

    我使用了 VS 分析器并注意到程序大约 40 的时间花费在下面的行中 我在用着title1 and color1因为 Visual Studio 或 Resharper 建议这样做 下面的代码是否存在性能问题 Dictionary
  • codeigniter 调整图像大小并创建缩略图

    您好 根据 ci 文档 您可以使用 image lib 调整图像大小 并且有一些选项建议我们可以从该图像创建其他缩略图 create thumb FALSE TRUE FALSE boolean Tells the image proces
  • 链接时可以混合静态库和共享库吗?

    我有一个 C 项目 它生成十个可执行文件 我希望将所有这些文件静态链接 我面临的问题是这些可执行文件之一使用第三方库 其中只有共享对象版本可用 如果我通过了 static标记为 gcc ld 会错误说它找不到有问题的库 我认为它正在寻找 a
  • Spring MVC:如何重定向到有错误的页面?

    我试图让我的控制器重定向到带有自定义错误消息的页面 RequestMapping method RequestMethod POST public String processSubmit Valid Voter voter Binding
  • 如何制作深度常量指针

    假设我想用 C 表示二叉树 通常 我想要一个Node像这样的结构 struct Node Node left Node right 这里我使用结构体和原始指针只是为了简单起见 我知道我应该使用智能指针进行内存管理 这种表述有一个问题 我永远
  • Ruby 查看 csv 数据

    我从 csv 文件中获取一些数据 还有如何选择 csv 中的前 20 个数据 例如 A B C D E F 还有方法 def common uploader require csv arr CSV read Rails public pat
  • 如何从 Dataproc 上的检查点重新启动 Spark Streaming 作业?

    这是后续dataproc 上的 Spark 流抛出 FileNotFoundException 在过去的几周里 不确定从什么时候开始 重新启动 Spark 流作业 即使使用 kill dataproc agent 技巧也会抛出此异常 17