我现在的情况是想从代码中停止/取消 flink 作业。这是在我的集成测试中,我正在向我的 flink 作业提交任务并检查结果。当作业异步运行时,即使测试失败/通过,它也不会停止。我想在考试结束后在车站工作。
我尝试了一些事情,我在下面列出:
- 获取工作经理演员
- 获取正在运行的作业
- 对于每个正在运行的作业,向作业管理器发送取消请求
当然,这不会运行,但我不确定 jobmanager actorref 是否错误或缺少其他内容。
我得到的错误是: [flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[ akka://flink/temp/$a] 到 Actor[akka://flink/user/jobmanager_1] 未交付。 [1]遇到的死信。可以使用配置设置“akka.log-dead-letters”和“akka.log-dead-letters-during-shutdown”关闭或调整此日志记录
这意味着作业管理器参与者引用错误或发送给它的消息不正确。
代码如下所示:
val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
try {
val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
if(result.isInstanceOf[RunningJobsStatus]){
val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
val itr = runningJobs.iterator()
while(itr.hasNext){
val jobId = itr.next().getJobId
val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
try {
Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
}
catch {
case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
}
}
}
}
catch{
case e : Exception => "Could not retrieve running jobs from the JobManager." + e
}
}
有人可以检查这是否是正确的方法?
编辑 :
要完全停止作业,需要先停止Task Manager,再停止JobManager,顺序是先Task Manager,再停止JobManager。