【Flink】Flink 消费kafka报错 AMRMClientAsyncImpl Interrupted while waiting for queue InterruptedException

2023-11-15

在这里插入图片描述

1.背景

一个flink etl程序,读取一个kafka集群的数据,到两外一个集群,然后报错

2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Source: com.dbapp.ailpha.topic.securityalarm.copy -> filter_2 -> Sink: com.dbapp.ailpha.topic.securityalarm.copy (1/1) (e6a0562faebd649e008ca6b5f0e29804) switched from CANCELING to CANCELED.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Source: com.dbapp.ailpha.topic.securityevent.copy -> filter_4 -> Sink: com.dbapp.ailpha.topic.securityevent.copy (1/1) (aec115a27429dc50b9539b8ccbac3626) switched from CANCELING to CANCELED.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Try to restart or fail the job execute w11 (66eaf554a91fea36beb582a0392be44b) if no longer possible.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Job execute w11 (66eaf554a91fea36beb582a0392be44b) switched from state FAILING to FAILED.
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
	... 13 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.


后面还有一个错误

2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-2][ Class:org.apache.flink.runtime.dispatcher.MiniDispatcher] INFO:Stopping all currently running jobs of dispatcher akka.tcp://flink@1.datanode2:37818/user/dispatcher.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.hadoop.yarn.client.api.impl.NMClientImpl] INFO:Clean up running containers on stop.
2020-06-06 15:56:00 PM [Thread: AMRM Callback Handler Thread][ Class:org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl] INFO:Interrupted while waiting for queue
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
	at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.hadoop.yarn.client.api.impl.NMClientImpl] INFO:Stopping container_1587901917276_0063_01_000002

因为这个是info日志,网上说是没有问题的,请忽略,暂时没找到原因,先记录一下。

参考:https://github.com/DTStack/flinkx/issues/142

2. 第一个错误

根据您提供的日志,可以看到 Flink ETL 程序在从一个 Kafka 集群读取数据并写入另一个集群时发生了错误。错误消息显示了以下几点:

作业中的两个任务(Source、Filter、Sink)都从 CANCELING 状态转换为 CANCELED 状态,表示任务被取消执行。

作业尝试重新启动或失败(Try to restart or fail the job),但可能不再可能进行重新启动。

作业最终从 FAILING 状态转换为 FAILED 状态,表示作业执行失败。

错误堆栈跟踪中显示了 Could not forward element to next operator 的异常,指示在元素传递到下一个操作符时发生了问题。

最后,显示了 org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms 的异常,表示更新元数据超时。

根据这些信息,可能有以下原因导致了错误:

Kafka 集群问题:由于元数据更新超时,可能是源 Kafka 集群无法正确响应 Flink ETL 程序的请求。这可能是由于网络问题、Kafka 集群负载过高或其他原因导致的。

Flink 配置问题:可能是 Flink 的相关配置有问题,导致无法正确连接和操作 Kafka 集群。您可以检查 Flink 和 Kafka 的连接配置,确保其准确性和一致性。

数据处理问题:作业中的操作符可能无法正确处理从源 Kafka 主题读取的数据,并将其传递给下一个操作符。这可能是由于数据格式不匹配、数据处理逻辑错误或其他问题导致的。

为了进一步诊断和解决问题,您可以考虑以下步骤:

检查 Kafka 集群的健康状态,确保 Kafka 集群正常运行,并且能够响应 Flink ETL 程序的请求。

检查 Flink 和 Kafka 的连接配置,包括 Kafka 主题、ZooKeeper 地址、消费者组等信息,确保其准确性和一致性。

检查 Flink ETL 程序的数据处理逻辑,确保它能够正确地处理从源 Kafka 主题读取的数据,并将其传递给下一个操作符。

增加 Kafka 客户端的超时设置,如果连接和操作 Kafka 的超时值太短,可以尝试增加超时时间以适应网络延迟或 Kafka 集群的响应时间。

查看 Flink 和 Kafka 的日志文件,以获取更详细的错误和警告信息,有助于进一步定位问题所在。

根据具体情况,您可能需要进行更详细的排查和调试,可能需要查看更多的日志、和监控。

2. 第2个错误

根据提供的日志,可以看到以下信息:

MiniDispatcher 正在停止当前正在运行的所有作业,停止正在执行的任务。
NMClientImpl 正在清理正在运行的容器。
AMRMClientAsyncImpl 的回调处理线程被中断,等待队列时发生了中断异常。
Container container_1587901917276_0063_01_000002 被停止。
根据这些信息,可以得出以下结论:

Flink 的 MiniDispatcher 正在停止所有当前正在运行的作业。这可能是由于某种原因触发了作业的停止或终止操作。

NMClientImpl 是与 YARN(Apache Hadoop 的资源管理器)相关的组件,它负责与容器进行通信和管理。在停止过程中,正在进行容器的清理工作。

AMRMClientAsyncImpl 是 YARN 中用于与资源管理器进行异步通信的组件。回调处理线程等待队列时发生了中断异常,这可能是由于程序终止或其他中断原因导致的。

Container container_1587901917276_0063_01_000002 被停止。这可能是与 Flink 作业相关的容器,它正在被停止执行。

根据提供的日志,无法确定问题的具体原因。要进一步诊断和解决问题,可以尝试以下步骤:

检查 Flink 作业的配置和代码,确保没有异常或错误逻辑导致作业停止。

检查 YARN 的配置和状态,确保 YARN 正常运行,并且与 Flink 集成正常。

检查作业运行时的日志,查看是否有其他异常或错误信息。

确保系统资源(内存、CPU 等)足够支持作业的执行。

如果问题仍然存在,可能需要进一步调查和分析作业的配置、Flink 和 YARN 的日志以及系统状态,以便确定问题的根本原因。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【Flink】Flink 消费kafka报错 AMRMClientAsyncImpl Interrupted while waiting for queue InterruptedException 的相关文章

  • Java 中等效的并行扩展

    我在 Net 开发中使用并行扩展有一些经验 但我正在考虑在 Java 中做一些工作 这些工作将受益于易于使用的并行库 JVM 是否提供任何与并行扩展类似的工具 您应该熟悉java util concurrent http java sun
  • java.lang.NoClassDefFoundError:org.apache.batik.dom.svg.SVGDOMImplementation

    我在链接到我的 Android LibGDX 项目的 Apache Batik 库时遇到了奇怪的问题 但让我们从头开始 在 IntelliJ Idea 中我有一个项目 其中包含三个模块 Main Android 和 Desktop 我强调的
  • Java new Date() 打印

    刚刚学习 Java 我知道这可能听起来很愚蠢 但我不得不问 System out print new Date 我知道参数中的任何内容都会转换为字符串 最终值是 new Date 返回对 Date 对象的引用 那么它是如何打印这个的呢 Mo
  • Spring Batch 多线程 - 如何使每个线程读取唯一的记录?

    这个问题在很多论坛上都被问过很多次了 但我没有看到适合我的答案 我正在尝试在我的 Spring Batch 实现中实现多线程步骤 有一个包含 100k 条记录的临时表 想要在 10 个线程中处理它 每个线程的提交间隔为 300 因此在任何时
  • Play框架运行应用程序问题

    每当我尝试运行使用以下命令创建的新 Web 应用程序时 我都会收到以下错误Play http www playframework org Error occurred during initialization of VM Could no
  • Java JDBC:更改表

    我希望对此表进行以下修改 添加 状态列 varchar 20 日期列 时间戳 我不确定该怎么做 String createTable Create table aircraft aircraftNumber int airLineCompa
  • 制作一个交互式Windows服务

    我希望我的 Java 应用程序成为交互式 Windows 服务 用户登录时具有 GUI 的 Windows 服务 我搜索了这个 我发现这样做的方法是有两个程序 第一个是服务 第二个是 GUI 程序并使它们进行通信 服务将从 GUI 程序获取
  • JAXb、Hibernate 和 beans

    目前我正在开发一个使用 Spring Web 服务 hibernate 和 JAXb 的项目 1 我已经使用IDE hibernate代码生成 生成了hibernate bean 2 另外 我已经使用maven编译器生成了jaxb bean
  • 多个 Maven 配置文件激活多个 Spring 配置文件

    我想在 Maven 中构建一个环境 在其中我想根据哪些 Maven 配置文件处于活动状态来累积激活多个 spring 配置文件 目前我的 pom xml 的相关部分如下所示
  • 加速代码 - 3D 数组

    我正在尝试提高我编写的一些代码的速度 我想知道从 3d 整数数组访问数据的效率如何 我有一个数组 int cube new int 10 10 10 我用价值观填充其中 然后我访问这些值数千次 我想知道 由于理论上所有 3d 数组都存储在内
  • 列出jshell中所有活动的方法

    是否有任何命令可以打印当前 jshell 会话中所有新创建的方法 类似的东西 list但仅适用于方法 您正在寻找命令 methods all 它会打印所有方法 包括启动 JShell 时添加的方法 以及失败 被覆盖或删除的方法 对于您声明的
  • Java TestNG 与跨多个测试的数据驱动测试

    我正在电子商务平台中测试一系列商店 每个商店都有一系列属性 我正在考虑对其进行自动化测试 是否有可能有一个数据提供者在整个测试套件中提供数据 而不仅仅是 TestNG 中的测试 我尝试不使用 testNG xml 文件作为机制 因为这些属性
  • 如何将 pfx 文件转换为 jks,然后通过使用 wsdl 生成的类来使用它来签署传出的肥皂请求

    我正在寻找一个代码示例 该示例演示如何使用 PFX 证书通过 SSL 访问安全 Web 服务 我有证书及其密码 我首先使用下面提到的命令创建一个 KeyStore 实例 keytool importkeystore destkeystore
  • 仅将 char[] 的一部分复制到 String 中

    我有一个数组 char ch 我的问题如下 如何将 ch 2 到 ch 7 的值合并到字符串中 我想在不循环 char 数组的情况下实现这一点 有什么建议么 感谢您花时间回答我的问题 Use new String value offset
  • 无法捆绑适用于 Mac 的 Java 应用程序 1.8

    我正在尝试将我的 Java 应用程序导出到 Mac 该应用程序基于编译器合规级别 1 7 我尝试了不同的方法来捆绑应用程序 1 日食 我可以用来在 Eclipse 上导出的最新 JVM 版本是 1 6 2 马文 看来Maven上也存在同样的
  • 如何从泛型类调用静态方法?

    我有一个包含静态创建方法的类 public class TestClass public static
  • 编译器抱怨“缺少返回语句”,即使不可能达到缺少返回语句的条件

    在下面的方法中 编译器抱怨缺少退货声明即使该方法只有一条路径 并且它包含一个return陈述 抑制错误需要另一个return陈述 public int foo if true return 5 鉴于Java编译器可以识别无限循环 https
  • 在 Maven 依赖项中指定 jar 和 test-jar 类型

    我有一个名为 commons 的项目 其中包含运行时和测试的常见内容 在主项目中 我添加了公共资源的依赖项
  • 捕获的图像分辨率太大

    我在做什么 我允许用户捕获图像 将其存储到 SD 卡中并上传到服务器 但捕获图像的分辨率为宽度 4608 像素和高度 2592 像素 现在我想要什么 如何在不影响质量的情况下获得小分辨率图像 例如我可以获取或设置捕获的图像分辨率为原始图像分
  • 节拍匹配算法

    我最近开始尝试创建一个移动应用程序 iOS Android 它将自动击败比赛 http en wikipedia org wiki Beatmatching http en wikipedia org wiki Beatmatching 两

随机推荐

  • 系统分析与设计——UML图总结

    一 前言 期末考试之前 我复习系统分析与设计的时候对UML图做了一些知识汇总 现在放到博客上 如果有不对或不恰当的地方 欢迎各位指正 本文仅仅起记录作用 可随意转载 荣幸之至 截图来自网络或是老师的PPT 二 概述 系统模型的三个主要部分
  • flutter 键盘挡住输入框问题

    bool isShowKeyboard false double keyboardSize 260 软键盘高度 类添加with WidgetsBindingObserver 生命周期监听器 class SendRedEveDialogSta
  • 浏览器上实现CNN可视化——清楚看到每一层卷积后的图

    目录 本文作用 CNN神经网络可视化工具1 解释器学习笔记 CNN神经网络可视化工具2 本文作用 学习卷积神经网络时 我们只知道输入一张图片后 通过一顿操作 便可以提取图片中的特征 我们对于其内部的操作 只有理论了解 并没有做到眼见为实 这
  • 使用cloudflare tunnel免费内网穿透,实现网站的外网访问和远程桌面

    前言 Cloudflare Tunnel是Cloudflare Zero Trust中的一个产品 它能够帮助用户将位于内网中的服务暴露到公网上 从而使得外部用户可以通过互联网访问这些服务 相比较于frp ngrok等内网穿透工具 使用Clo
  • 人工智能数学基础--概率与统计9:概率运算、加法公理、事件的独立性、概率乘法定理、条件概率、全概率公式以及贝叶斯公式

    一 概述 这大半年都很忙 学习时间太少 导致概率论的学习停滞不前 期间AI大佬herosunly推荐了陈希孺老先生的概率论教材 与最开始学习的美版M R 斯皮格尔等著作的 概率与统计 表示差异比较大 具体请见 人工智能数学基础 概率与统计7
  • ESP32-IDF环境搭建以及使用

    1默认已经安装了esp32 idf和vscode配置 离线版的esp32idf安装 windows eap32安装这里参考博客ESP32c3开发环境搭建 IDF V4 4离线版安装使用 esp idf v4 4 2 可能会遇到的问题 问题篇
  • 修改elementUI样式未生效问题(挂载到了body标签上)

    修改挂载到body标签上elementUI样式问题 目录 修改挂载到body标签上elementUI样式问题 前言 一 适用范围 二 示例 1 目标 2 实现思路 修改自带样式方法 最后看效果 总结 前言 在使用element ui库的时候
  • Aspose.Slides for Java Crack

    Aspose Slides for Java Crack Added support for changing the color of leader lines in pie charts Added new AfterAnimation
  • 2012年终总结 - I T征途

    2012年终总结 I T征途 在2012年年初的时候 自己曾写了一个规划 2012 这一年我该做些啥 里面简单的介绍了一下2012年 我应该做的事儿 如今到了为2012结账的时候 我想借助那篇文档来总结这一年我的所作所为 2012年 我该给
  • 用Sipp 对Asterisk 进行性能测试的工作笔记-1

    公司需要 对Asterisk 进行一定的性能测试 测试目标 1 IVR 支持多少路2 一对一通话 支持多少路3 不同编解码的性能影响 4 通话中 录音 支持多少路 测试工具 sipp http sipp sourceforge net 辅助
  • createrepo:创建本地源

    4月20日 createrepo 创建本地源 repodata作为软件的仓库 其目录下有四个必要文件 filelists xml gz other xml gz primary xml gz 和repomd xml md 意思是 metad
  • IDEA 中 .properties文件的中文显示乱码问题的解决办法

    今天使用IDEA 搭建Spring Boot 项目 配置application properties 配置文件 录入中文 在右下角出现如下截图提示语 重新打开application properties 文件出现汉字乱码 依据提示信息修改源
  • “你爱我,我爱你,蜜雪冰城甜蜜蜜“秋天的第一杯奶茶!Python安排!!

    立秋了 大家秋天的第一杯奶茶都安排上了么 前一段时间我相信很多人都被 你爱我 我爱你 蜜雪冰城甜蜜蜜 这首歌洗脑了 所以今天就爬取了某度地图上蜜雪冰城门店分布 看看全国有多少家蜜雪冰城 能不能满足大家的需求啦 哈哈哈 数据采集 首先 我们打
  • Linux部署宝塔

    1 linux服务器安装宝塔 宝塔地址 https www bt cn new download html 点击上方地址 进入下方页面 点击安装版本 复制第一个命令 得确认你服务器是centos 远程连接服务器 复制此命令运行 运行成功后
  • [CISCN2019 华东南赛区]Web11 SSTI

    这道SSTI 差点给我渗透的感觉了 全是API 我还想去访问API看看 发现这里读取了我们的ip 我们抓包看看是如何做到的 没有东西 我们看看还有什么提示 欸 那我们可不可以直接修改参数呢 我们传递看看 发现成功了 是受控的 这里我就开始没
  • mysql某批量更新导致死锁

    查询当前数据库全部线程 show full processlist 查询当前运行的全部事务 select from information schema innodb trx 查询锁情况 select from information sc
  • 碰撞改变材质颜色_bp

    感谢来自程序员的暴击 学习资料来于 https www bilibili com video BV125411h7c4 p 22 最大的收获是 材质编辑器上 1维向量到4维向量的生成 会者不难 难者不会 方法很简单 鼠标左键 数字1就会生成
  • 2023年电赛E题完整设计暨电赛全记录

    目录 一 2023年E题完整设计 lt 1 gt 选择方案 任务一 实现按键按下复位 基础部分 任务二 实现激光点绕边框一周 基础部分 任务三 实现激光点绕A4纸边缘一周 基础部分 任务四 实现绿色激光追踪红色激光 发挥部分 lt 2 gt
  • 【信号与系统】傅里叶变换

    傅里叶变换 文章目录 傅里叶变换 傅里叶级数 基本公式 常用公式 基本性质 其他公式 卷积公式 周期信号的傅里叶变换 抽样信号的傅里叶变换 提供延时的理想滤波器 无失真传输 傅里叶级数 https blog csdn net lafea a
  • 【Flink】Flink 消费kafka报错 AMRMClientAsyncImpl Interrupted while waiting for queue InterruptedException

    1 背景 一个flink etl程序 读取一个kafka集群的数据 到两外一个集群 然后报错 2020 06 06 15 56 00 PM Thread flink akka actor default dispatcher 20 Clas