在 Confluence 4.1 + Kafka 1.1 中为 Kafka Connect 打包自定义 Java `partitioner.class` 插件?

2024-05-02

我已经成功地将用 Java 编写的简单自定义 Partitioner 类用于 Confluence 3.2.x (Kafka 0.10.x) 上的 Kafka Connect 接收器。我想升级到 Confluence 4.1 (Kafka 1.1),但遇到错误。

Kafka Connect的插件加载机制似乎在CP 3.3.0中发生了变化。以前,只有 CLASSPATH 选项,但在 CP 3.3.0+ 中,有一个更新的推荐选项plugin.path机制。

如果我尝试继续使用旧的 CLASSPATH 插件机制,当我尝试使用我的插件时,我会得到:

java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.DefaultPartitioner

那是一个CP内部类。对于较旧的 CP 3.2.x,它可以在类路径上使用,但是随着 CP >= 3.3.0 中新的类路径隔离工作,我认为必须与插件一起提供。

我认为切换到较新的推荐是明智的plugin.path机制。我删除了 CLASSPATH 条目。在默认情况下/etc/kafka/connect-distributed.properties, I see plugin.path=/usr/share/java,所以我安装我的插件 .jar 到/usr/share/java/my-custom-partitioner/my-custom-partitioner.jar。我也尝试过添加和不添加依赖项 .jar 文件。

当 Kafka Connect 服务启动时,我的插件似乎已加载:

INFO Loading plugin from: /usr/share/java/my-custom-partitioner (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/my-custom-partitioner/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)

当我做:

curl -X PUT -H "Content-Type: application/json" --data-binary "@sink_test_1.json" my-dev-test-vm:8083/connectors/sink-test-1/config

我得到:

{"error_code":500,"message":null}%             

我可以在kafka connect systemd日志中看到:

java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:270)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:238)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:617)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:625)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:508)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:490)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)

目前尚不清楚出了什么问题或为什么我的分区器类未正确加载。

仅供参考,我已经使用 CP 4.1 + Kafka 1.1 依赖项重建了我的 Java 插件,并进行了一些小更新以匹配 API 更改,例如添加实现getSchemaGeneratorClass到我的分区器类。


自定义 Kafka Connect Partitioner 类将无法通过旧的 CLASSPATH 机制工作,并且它们将无法作为较新的 Kafka 0.11.0+ 独立插件机制的插件工作。

唯一可行的解​​决方案是将您的自定义 .jar 文件与自定义 Kafka Connect Partitioner 类复制到kafka-connect-storage-common插件目录位于/usr/share/java/kafka-connect-storage-common/。自定义 Kafka Connect Partitioner 插件类必须存在于同一目录中,以便它们位于同一个隔离的类加载器中。

仅供参考,您可以看到 Kafka 0.11.0+ 隔离插件机制将仅加载四个特定 Java 类的子类,这些类不涵盖此处的 Kafka Connect 分区器:

https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L279 https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L279

感谢 cricket_007 推荐这个确切的解决方案:将自定义 Kafka Connect 分区程序 .jar 文件放入/share/java/kafka-storage-common目录。我经历了惨痛的教训才明白为什么必须这样做以及为什么替代方案行不通。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Confluence 4.1 + Kafka 1.1 中为 Kafka Connect 打包自定义 Java `partitioner.class` 插件? 的相关文章

随机推荐