从代码中取消 Apache Flink 作业

2023-12-28

我现在的情况是想从代码中停止/取消 flink 作业。这是在我的集成测试中,我正在向我的 flink 作业提交任务并检查结果。当作业异步运行时,即使测试失败/通过,它也不会停止。我想在考试结束后在车站工作。

我尝试了一些事情,我在下面列出:

  1. 获取工作经理演员
  2. 获取正在运行的作业
  3. 对于每个正在运行的作业,向作业管理器发送取消请求

当然,这不会运行,但我不确定 jobmanager actorref 是否错误或缺少其他内容。

我得到的错误是: [flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[ akka://flink/temp/$a] 到 Actor[akka://flink/user/jobmanager_1] 未交付。 [1]遇到的死信。可以使用配置设置“akka.log-dead-letters”和“akka.log-dead-letters-during-shutdown”关闭或调整此日志记录

这意味着作业管理器参与者引用错误或发送给它的消息不正确。

代码如下所示:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
 val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
      val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
      if(result.isInstanceOf[RunningJobsStatus]){
        val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
        val itr = runningJobs.iterator()
        while(itr.hasNext){
          val jobId = itr.next().getJobId
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
          try {
            Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
          }
          catch {
            case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
          }

        }
      }
    }
    catch{
      case e : Exception => "Could not retrieve running jobs from the JobManager." + e
    }

  }

有人可以检查这是否是正确的方法?

编辑 : 要完全停止作业,需要先停止Task Manager,再停止JobManager,顺序是先Task Manager,再停止JobManager。


你正在创建一个新的ActorSystem然后尝试找到一个有这个名字的演员/user/jobmanager_1在同一个演员系统中。这是行不通的,因为实际的作业管理器将在不同的环境中运行ActorSystem.

如果您想获得ActorRef对于真正的工作经理,您要么必须使用相同的ActorSystem进行选择(然后您可以使用本地地址),或者您已经找到作业管理器参与者的远程地址。远程地址的格式为akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]。如果您有权访问FlinkMiniCluster那么你可以使用leaderGateway承诺获得现任领导的ActorGateway.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从代码中取消 Apache Flink 作业 的相关文章

  • 远程创建 Akka Actor,无需新的 ActorSystem

    我已经仔细阅读了文档好几次了 http doc akka io docs akka 2 1 4 scala remoting html http doc akka io docs akka 2 1 4 scala remoting html
  • Actors 中 future 的执行上下文

    我有一个 Actor 并且在某些消息上我正在运行一些返回 Future 的方法 def receive Receive case SimpleMessge gt val futData Future Int futData map data
  • 在 Flink 流中使用静态 DataSet 丰富 DataStream

    我正在编写一个 Flink 流程序 其中我需要使用一些静态数据集 信息库 IB 来丰富用户事件的数据流 对于例如假设我们有一个买家的静态数据集 并且有一个传入的事件点击流 对于每个事件 我们希望添加一个布尔标志来指示事件的执行者是否是买家
  • Akka 2:如何暂停消息处理?

    在我使用 Akka 掌握 Actor 模型的过程中 出现了很多问题 这是另一张 假设我们有一个 Actor 由于某些业务逻辑或可用资源 它必须在给定时间内停止处理消息 可能发生这种情况的情况可能是 节流 可能有一个发送电子邮件的 Actor
  • 如何确定 akka 中生成的 actor 数量?

    我最近开始研究 Akka 2 0 框架 并且能够运行一些代码 生成执行简单 Oracle 数据库调用 执行简单计算等的 Actor 但是在生产中什么也没有 我想知道的是 是否有一般的经验法则或最佳实践来确定为某些类型的任务生成多少个参与者
  • Akka 流如何不断实现?

    我在用阿卡流 http doc akka io docs akka stream and http experimental 1 0 scala stream index html在 Scala 中进行轮询AWS SQS https aws
  • akka 远程处理中出现“最大允许大小 128000 字节,编码类 scala 的实际大小”错误

    我想使用 Akka Remoting 在参与者之间通过网络交换消息 但是对于大型字符串消息 我收到以下错误 akka remote OversizedPayloadException Discarding oversized payload
  • Java反应式框架的比较[关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我看到许多框架 库声称它们可以帮助用 Java 构建响应式应用程序 例如 Akka Vert x RxJava Reactor QBit 等 他
  • 如何处理 Akka 子 actor 的长时间初始化?

    我有一个演员 它创建一个子演员来执行一些冗长的计算 问题是子 Actor 的初始化需要几秒钟 并且父 Actor 在子 Actor 创建和完全初始化之间发送给子 Actor 的所有消息都将被丢弃 这是我正在使用的代码的逻辑 class Ch
  • 创建具有通用返回类型的 FlinkSQL UDF

    我想定义函数MAX BY接受类型值T和类型的订购参数Number并根据排序从窗口返回最大元素 类型为T 我试过了 public class MaxBy
  • Akka、SQS 和 Camel 的消费者投票率

    我正在做的一个项目需要从SQS读取消息 我决定使用Akka来分布式处理这些消息 由于 Camel 支持 SQS 并且在 Consumer 类中内置了 Akka 中使用的功能 因此我认为最好以这种方式实现端点并读取消息 尽管我还没有看到很多人
  • 知道 akka actor 何时完成

    有几个人和我一起从事一个项目 一直在试图找出解决这个问题的最佳方法 看起来这应该是经常需要的标准东西 但由于某种原因我们似乎无法得到正确的答案 如果我有一些工作要做 并且我向路由器抛出一堆消息 我如何知道所有工作何时完成 例如 如果我们正在
  • 我可以将 flink RocksDB 状态后端与本地文件系统一起使用吗?

    我正在探索使用 FlinkrocksDb 状态后端 文档似乎暗示我可以使用常规文件系统 例如 file data flink checkpoints 但代码 javadoc 仅在此处提到 hdfs 或 s3 选项 我想知道是否可以将本地文件
  • 在 Akka 中配置嵌套 Router

    我有一些嵌套的路由器 应创建它FromConfig 我想要的是这样的 test akka actor deployment worker router round robin nr of instances 5 slave router b
  • Akka 与现有 java 项目集成的示例

    如果我已经有现有的javaWeb 应用程序使用spring and servlet容器 将 Akka 集成到其中的正确方法是什么 就像我将会有Actor1 and Actor2互相沟通的 开始使用这些演员的切入点是什么 例如 1 把它放在那
  • 在 EB 上的 Docker 中运行的应用程序拒绝连接到自身

    我有一个 Play 2 Web 应用程序 我使用 Docker 将其部署到 Elastic Beanstalk 在此 Web 应用程序中 我启动了一个 Akka 集群 启动过程涉及将自动伸缩组中的所有节点添加为种子节点 包括其自身 第一次部
  • Flink 窗口:聚合并输出到接收器

    我们有一个数据流 其中每个元素都是这种类型 id String type Type amount Integer 我们想要聚合这个流并输出总和amount每周一次 目前的解决方案 Flink 管道示例如下所示 stream keyBy ty
  • 使用 GlobalWindow 在 Beam 中进行状态垃圾收集

    Apache Beam 最近推出了状态细胞 https beam apache org blog 2017 02 13 stateful processing html 通过StateSpec和 StateId注释 在 Apache Fli
  • Java / Scala Future 由回调驱动

    简洁版本 我怎样才能创建一个Promise
  • 我想使用 Flink 的 Streaming File Sink 写入 ORC 文件,但它无法正确写入文件

    我正在从 Kafka 读取数据并尝试将其以 ORC 格式写入 HDFS 文件系统 我使用了他们官方网站上的以下链接参考 但我可以看到Flink为所有数据写入完全相同的内容并生成这么多文件并且所有文件都可以103KB https ci apa

随机推荐

  • 多对多 Spring Data JPA 关系中的额外列,变化最小

    我需要更改项目的模型 现在 我们有两个具有双向多对多关系的类 这意味着在关系表中 现在需要向关系添加额外的信息 我的问题是 唯一的方法是为关系创建一个类 例如 使用与已存在的关系表相同的名称创建一个类 我这么问是因为如果我们需要改变项目中的
  • 有没有办法在 Visual Studio 中自动更新已安装的 NuGet 包?

    正如标题所示 我想知道是否有一种方法可以在包源中出现新版本时自动更新已安装的 NuGet 包 该用例是一个将某些公司策略 代码分析 签名等 应用于我们的项目的包 一旦该包更新 我希望能够为此包配置自动更新 我确实知道 NuGet 有一个包恢
  • Python 列表是否保证其元素保持插入的顺序?

    如果我有以下Python代码 gt gt gt x gt gt gt x x 1 gt gt gt x x 2 gt gt gt x x 3 gt gt gt x 1 2 3 Will x保证永远是 1 2 3 或者临时元素的其他顺序是否可
  • Xpath选择多个标签

    我想要使 用 PHP DOMXPath 查询的多个标签 td 和 th 我该怎么做 您可以使用 联盟 运营商 这是一个例子 doc new DOMDocument doc gt loadHTML table tr th table head
  • 使用自动滚动向面板添加控件 (c#)

    我有一个带有属性的面板AutoScroll true 通过动态地将其他控件添加到面板而不滚动 一切正常 void addControl int top 13 this Controls Count cmdSet Height ucComma
  • 如何定义 R 函数的参数类型?

    我正在编写一个 R 函数 并且我想确保我的 R 函数的参数属于某个类 例如 矩阵 做这个的最好方式是什么 假设我有一个函数 foo 它计算矩阵的逆 foo lt function x I want to make sure x is of
  • 名称冲突的类的构造函数

    我正在使用 clang 使用 c 14 方言编译我的代码 举个例子 class x int i public x int i this gt i i void x void f class x my x Do something here
  • jboss 7.1 xalan 问题?

    我正在尝试在 JBoss7 上创建基于 Apache Jena 的应用程序 Apache Jena 使用 Xalan 2 11 0 JBoss 7 附带 2 7 1 当我尝试调用该应用程序时 出现异常 其根源是 org apache xer
  • 记录函数闭包

    例如 假设我的包中有一个函数闭包 f function x x x g function y x lt lt y h function x list g g h h l f 5 l g 10 l h 什么是正确的 在官方CRAN http
  • JFactory导入失败

    我正在尝试为 Android 应用程序制作一个登录系统 该系统可与我的 2 5 Joomla 网站一起使用 我试图通过制作一个 Joomla 插件来做到这一点 Android 应用程序将发布数据发送到 php 文件 然后该文件对用户进行身份
  • 减少 Swing 应用程序中耦合的设计模式

    大家好 我目前正在开发 Java Swing 应用程序 并且正在寻找一些指导 该应用程序相当小 但我注意到 随着代码库变得越来越大 我的对象图中存在大量耦合 我对 Swing 比较陌生 但我已经编程了足够长的时间 知道它的发展方向 我遇到的
  • Django 中间件并获取视图名称?

    我正在尝试用 Django 编写我的第一个中间件 class RefreshBalance def process view self request view func view args view kwargs pass 我想检测视图是
  • volatile int 比 AtomicInteger 快吗

    我目前正在做一个示例练习 我发现一个奇怪的观察结果 如果我用易失性程序替换 AutomicInteger 则运行速度会更快 注意 我只进行读操作 code import java util ArrayList import java uti
  • 如何访问 Backbone 视图中的父元素?

    在 Backbone 模型视图中 似乎 this el parent 不起作用 从视图中选择父元素的最佳方法是什么 我正在使用设置 eltagName li 为了景观 默认情况下 Backbone 分配一个空的div到你的视图中 你无法访问
  • 如何使用opencv python解决theta迷宫?

    I have to find shortest path from the center of the maze to the outermost circle I have to solve this problem using open
  • 检查 WHERE 子句中参数是否为 NULL

    我在执行一个存储过程时遇到了麻烦 该过程需要永远执行 它相当大 我可以理解我需要一些时间 但这个持续了将近 20 分钟 经过一些调试和研究后 我注意到替换这部分WHERE clause p DrumNo IS NULL OR T ORDER
  • 获取不同项目及其数量的列表

    我有一个对象 它有很多属性 但唯一需要担心的两个是 myobject ID这是一个int myobject Names这是一个HashSet 然后我有一个List这些对象看起来与此类似 List
  • 如何从 Jupyter Notebook 中的 .py 文件调用函数?

    我不想在每个 Jupyter Notebook 文件中编写相同的函数 如果我只需要编辑一次函数而不需要在每个 ipynb 文件中进行编辑 那就更容易了 问题是 如果我编辑 py 文件 我必须重新启动内核 这将重新启动一切 有什么方法可以简单
  • 在 XML 中保留原始换行符类型(\r 与 \r\n)

    我有一个应用程序 我想在其中使用 XML 文件来存储 1 文档的原始文本 以及 2 使用字符偏移量 指向 原始文本的多个实体 例如
  • 从代码中取消 Apache Flink 作业

    我现在的情况是想从代码中停止 取消 flink 作业 这是在我的集成测试中 我正在向我的 flink 作业提交任务并检查结果 当作业异步运行时 即使测试失败 通过 它也不会停止 我想在考试结束后在车站工作 我尝试了一些事情 我在下面列出 获