我有一个 Flink 会话集群(作业管理器 + 任务管理器),版本 1.11.1,配置了 log4j-console.properties 以包含 Kafka 附加程序。此外,在作业管理器和任务管理器中,我都启用了 flink-s3-fs-hadoop 内置插件。
我已将 kafka-clients jar 添加到 flink/lib 目录中,这是容器运行所必需的。但当 S3 插件被实例化(并初始化记录器)时,我仍然收到以下类加载错误。
引起原因:org.apache.kafka.common.config.ConfigException:配置 key.serializer 的值 org.apache.kafka.common.serialization.ByteArraySerializer 无效:找不到类 org.apache.kafka.common.serialization.ByteArraySerializer 。
(完整的堆栈跟踪位于底部)
据我了解,插件有一个专用的动态类加载,它与系统类加载是分开的。因此,我在flink-conf.yaml文件中添加了以下配置:
classloader.parent-first-patterns.additional: org.apache.kafka
classloader.resolve-order: parent-first
但错误仍然出现。
调试时,我没有看到附加模式添加到插件类加载器的“allowedFlinkPackages”中。
我在这里缺少什么?
java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3FileSystemFactory could not be instantiated
at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.ExceptionInInitializerError
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
... 11 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:481) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[kafka-clients-2.5.0.jar:?]
at org.apache.logging.log4j.core.appender.mom.kafka.DefaultKafkaProducerFactory.newKafkaProducer(DefaultKafkaProducerFactory.java:40) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.startup(KafkaManager.java:136) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.start(KafkaAppender.java:164) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:304) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:579) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194) ~[log4j-api-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138) ~[log4j-api-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48) ~[log4j-api-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.<clinit>(AbstractS3FileSystemFactory.java:88) ~[?:?]
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
... 11 more
[2020-12-06T09:15:45,892][Error] {} [o.a.f.c.f.FileSystem]: Failed to load a file system via services
java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3AFileSystemFactory could not be instantiated
at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.fs.s3hadoop.S3FileSystemFactory
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
... 11 more
正如您所说,Flink 插件是通过自己的类加载器加载的,并且与任何其他插件完全隔离。
如果我们深入研究源代码,就会发现在集群启动时使用另一个密钥(不幸的是它没有记录在任何地方):
plugin.classloader.parent-first-patterns.additional
这允许您使用 PluginClassLoader 将外部 jar 添加到类路径中
声明+用法:https://github.com/apache/flink/blob/53a4b4407816c2780fed2f8995affbebc1f58c3c/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java#L156-L174 https://github.com/apache/flink/blob/53a4b4407816c2780fed2f8995affbebc1f58c3c/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java#L156-L174
将以下内容添加到 flink-conf.yaml 中。
plugin.classloader.parent-first-patterns.additional: org.apache.kafka
这应该够了吧
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)