Context
我编写了几个小代码卡夫卡连接 https://docs.confluent.io/current/connect/index.html连接器。一个每秒生成随机数据,另一个将其记录在控制台中。它们集成了一个模式注册表 https://docs.confluent.io/current/schema-registry/docs/index.html所以数据被序列化为Avro https://avro.apache.org/.
我使用以下命令将它们部署到本地 Kafka 环境中Landoop 提供的 fast-data-dev Docker 镜像 https://github.com/Landoop/fast-data-dev
基本设置有效并每秒生成一条记录的消息
但是,我想改变主题名称策略 https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#subject-name-strategy。默认生成两个主题:
${topic}-key
${topic}-value
根据我的用例,我需要生成具有不同模式的事件,这些事件最终将涉及同一主题。因此,我需要的主题名称是:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
As per the docs https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#subject-name-strategy,我的需求符合主题记录名称策略 https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/subject/TopicRecordNameStrategy.java
我尝试过什么
我创建了avroData
用于发送值进行连接的对象:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
然后用它来创建SourceRecord
响应对象
文档 https://docs.confluent.io/current/schema-registry/docs/connect.html声明为了在 Kafka Connect 中使用架构注册表,我必须在连接器配置中设置一些属性。因此,当我创建它时,我添加它们:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Problem
连接器似乎忽略了这些属性并继续使用旧的${topic}-key
and ${topic}-value
科目。
Question
Kafka Connect 应该支持不同的主题策略。我设法通过编写自己的版本来解决这个问题AvroConverter https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java#L118并硬编码主题策略是我需要的。然而,这看起来并不是一个好的方法,并且在尝试使用 Sink Kafka 连接器使用数据时也会带来问题。我复制了主题,因此有一个带有旧名称的版本(${topic}-key
)并且它有效
为 Kafka Connect 指定主题策略的正确设置是什么?