为 Flink 集群中的插件添加自定义依赖项

2024-01-11

我有一个 Flink 会话集群(作业管理器 + 任务管理器),版本 1.11.1,配置了 log4j-console.properties 以包含 Kafka 附加程序。此外,在作业管理器和任务管理器中,我都启用了 flink-s3-fs-hadoop 内置插件。

我已将 kafka-clients jar 添加到 flink/lib 目录中,这是容器运行所必需的。但当 S3 插件被实例化(并初始化记录器)时,我仍然收到以下类加载错误。

引起原因:org.apache.kafka.common.config.ConfigException:配置 key.serializer 的值 org.apache.kafka.common.serialization.ByteArraySerializer 无效:找不到类 org.apache.kafka.common.serialization.ByteArraySerializer 。

(完整的堆栈跟踪位于底部)

据我了解,插件有一个专用的动态类加载,它与系统类加载是分开的。因此,我在flink-conf.yaml文件中添加了以下配置:

classloader.parent-first-patterns.additional: org.apache.kafka
classloader.resolve-order: parent-first

但错误仍然出现。

调试时,我没有看到附加模式添加到插件类加载器的“allowedFlinkPackages”中。

我在这里缺少什么?

java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3FileSystemFactory could not be instantiated
        at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
        at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.ExceptionInInitializerError
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
        at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:481) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.logging.log4j.core.appender.mom.kafka.DefaultKafkaProducerFactory.newKafkaProducer(DefaultKafkaProducerFactory.java:40) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.startup(KafkaManager.java:136) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.start(KafkaAppender.java:164) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:304) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:579) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.<clinit>(AbstractS3FileSystemFactory.java:88) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more
[2020-12-06T09:15:45,892][Error] {} [o.a.f.c.f.FileSystem]: Failed to load a file system via services
java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3AFileSystemFactory could not be instantiated
        at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
        at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.fs.s3hadoop.S3FileSystemFactory
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more

正如您所说,Flink 插件是通过自己的类加载器加载的,并且与任何其他插件完全隔离。

如果我们深入研究源代码,就会发现在集群启动时使用另一个密钥(不幸的是它没有记录在任何地方):

plugin.classloader.parent-first-patterns.additional

这允许您使用 PluginClassLoader 将外部 jar 添加到类路径中

声明+用法:https://github.com/apache/flink/blob/53a4b4407816c2780fed2f8995affbebc1f58c3c/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java#L156-L174 https://github.com/apache/flink/blob/53a4b4407816c2780fed2f8995affbebc1f58c3c/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java#L156-L174

将以下内容添加到 flink-conf.yaml 中。

plugin.classloader.parent-first-patterns.additional: org.apache.kafka

这应该够了吧

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为 Flink 集群中的插件添加自定义依赖项 的相关文章

随机推荐

  • Android 模拟器以不同的分辨率启动

    有时我的 AVD 启动时的分辨率比预期的要高 我在运行它之前不会修改任何设置 此外 重新启动时 它会以正确的分辨率启动 虽然并不重要 但有时确实会浪费时间 我不确定为什么会这样 但我只是在启动之前编辑了详细信息并启用了从快照选项启动 这确实
  • 用于值类型 int 的 ASP.NET MVC 2 编辑器模板

    我想为值类型 即 int 创建一个 MVC 2 编辑器模板 有人用预览 1 位完成此操作吗 非常感谢 当您在回发时提交值时 尼克 克拉克的答案会起作用吗 在 MVC2 预览版 2 中 调用 Html Textbox abc Model To
  • Rails 控制器中的 Process.fork

    我们正在对一个新应用程序进行一些原型设计 并注意到其中一个操作需要很长时间才能加载 80 120 秒 由于很多处理不需要在页面加载时进行 我们可以稍后通过 Ajax 请求数据 我想到使用Process fork允许页面立即返回 而处理仍在
  • 如何更改 iPhone 中文本字段的背景颜色?

    我想将文本字段的背景颜色更改为透明的颜色 我努力了 option5btn backgroundColor UIColor grayColor 但我想要一些其他颜色 谁能告诉我 iPhone 文本字段的背景中可以使用哪些所有颜色 谁能帮我这个
  • 数组上的 MongoDB 地理空间索引(多键 + 地理空间)

    这是我的数据的简化版本 gt db foo insert name jim locations 10 10 3 6 1 2 gt db foo insert name john locations 1 5 2 4 我希望能够做类似的事情 g
  • 不需要公司名称 WooCommerce(第 3 方插件)

    我想让 WooCommerce 中不需要计费公司和运输公司 由于某种原因 我使用的代码适用于除公司部分之外的所有内容 事实证明 第三方插件需要公司名称 以下是该插件的完整代码
  • MongoDB 中的 UpdateMany 使用 $inc 运行两次

    感谢我在上一个问题中得到的帮助 使用文档中的值更新许多 mongodb 文档 https stackoverflow com questions 63530102 updatemany mongodb documents with valu
  • 聚合和聚合根的混淆

    我被分配了一个非常简单的项目作为考试 我有想法使用领域驱动设计来开发它 你们中的许多人可能会说该应用程序非常简单 使用存储库和 UoW 只是浪费时间 您可能是对的 但我认为这是一个学习更多东西的机会 该应用程序是一个 机票 系统 从下图中您
  • 视差部分初始背景位置与页面滚动时不一致

    我花了一整天的时间尝试创建一个视差部分 无论它放在页面上的哪个位置 它都能正常工作 但我对代码所做的每一次更改都会解决一个问题并产生另一个问题 到目前为止 这是我的代码 function use strict parallax sectio
  • 如何在 antd select 上添加分页?因为从接口获取数据是巨大的。所以我想实现分页

    如何在 Antd 的 Select 上添加分页 因为从接口获取数据是巨大的 所以我想实现分页 但文档api不支持 import Select from antd const Option Select Option let province
  • 使用地理定位和 Google 地图 API [帮助]

    我是移动开发领域的新手 现在我正在构建一个使用 jQuery mobile 和 PhoneGap 的应用程序 这是我的逻辑 我有一个包含用户及其地址的表 我获取用户地址并通过地图 API 传递它以捕获位置 但我对每条记录都这样做 有时 AP
  • Firestore Web 代码示例给出了无效的参数类型

    我正在尝试 Firebase 的新 Firestore 当我运行代码示例时https firebase google com docs firestore manage data add data authuser 0 https fire
  • css在表单中浮动2个输入字段

    我在理解 css 时遇到以下问题 我有一张登记表 在这种形式中 我使用字段集 现在我想在每行旁边放置两个输入字段 上面的每个字段还应该有一个标签 所以我想要实现的是 label 1 label 2 label 3 label 4
  • Intellij 自动构建 - 不存在

    如果我删除一个方法 则在我手动构建项目之前 不会显示错误 由于不存在方法而在代码中的其他位置 有没有办法让它在保存 更改时自动构建 我在某个地方缺少一个选项吗 你可以尝试理念 12 EAP http confluence jetbrains
  • TFLite 的硬刷操作

    我有一个用 Tensorflow Keras 编写的自定义神经网络 并应用 Hard swish 函数作为激活 如 MobileNetV3 论文中使用的那样 执行 def swish x return x tf nn relu6 x 3 6
  • 嵌套 jquery 选择器触发父级和子级特定事件

    我有一个具有以下结构的表 表 主 tbody tr Row td 输入 EditRow 我的 jquery 看起来像这样 table Main gt tbody gt tr Row live click function e RowClic
  • PhantomJS/CasperJS AssertExists() 失败

    我试图检查我的网页中是否存在选择器 但 casperjs 从未找到它 我尝试过两种方法 1 无需等待 casper then function search for casperjs from google form this test a
  • 出站 ChannelHandler 的捕获所有异常处理

    在 Netty 中 您有入站和出站处理程序的概念 只需在管道的末尾 尾部 添加一个通道处理程序并实现一个捕获所有入站异常处理程序即可实现exceptionCaught覆盖 如果未沿途处理 沿入站管道发生的异常将沿着处理程序传播 直到遇到最后
  • Visual Studio 2012 的 Intellisense 不再接受按空格键的建议

    我一直在多台 PC 上使用 Visual Studio 2005 2008 2010 由其他人或我安装 始终对它的智能感知建议是这样工作的 我开始打字 弹出带有建议的窗口 突出显示当前建议 然后我可以按空格键接受建议 然而 由于我在当前的
  • 为 Flink 集群中的插件添加自定义依赖项

    我有一个 Flink 会话集群 作业管理器 任务管理器 版本 1 11 1 配置了 log4j console properties 以包含 Kafka 附加程序 此外 在作业管理器和任务管理器中 我都启用了 flink s3 fs had