Kafka Connect 不支持主题策略

2024-01-11

Context

我编写了几个小代码卡夫卡连接 https://docs.confluent.io/current/connect/index.html连接器。一个每秒生成随机数据,另一个将其记录在控制台中。它们集成了一个模式注册表 https://docs.confluent.io/current/schema-registry/docs/index.html所以数据被序列化为Avro https://avro.apache.org/.

我使用以下命令将它们部署到本地 Kafka 环境中Landoop 提供的 fast-data-dev Docker 镜像 https://github.com/Landoop/fast-data-dev

基本设置有效并每秒生成一条记录的消息

但是,我想改变主题名称策略 https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#subject-name-strategy。默认生成两个主题:

  • ${topic}-key
  • ${topic}-value

根据我的用例,我需要生成具有不同模式的事件,这些事件最终将涉及同一主题。因此,我需要的主题名称是:

  • ${topic}-${keyRecordName}
  • ${topic}-${valueRecordName}

As per the docs https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#subject-name-strategy,我的需求符合主题记录名称策略 https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/subject/TopicRecordNameStrategy.java

我尝试过什么

我创建了avroData用于发送值进行连接的对象:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

然后用它来创建SourceRecord响应对象

文档 https://docs.confluent.io/current/schema-registry/docs/connect.html声明为了在 Kafka Connect 中使用架构注册表,我必须在连接器配置中设置一些属性。因此,当我创建它时,我添加它们:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

Problem

连接器似乎忽略了这些属性并继续使用旧的${topic}-key and ${topic}-value科目。

Question

Kafka Connect 应该支持不同的主题策略。我设法通过编写自己的版本来解决这个问题AvroConverter https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java#L118并硬编码主题策略是我需要的。然而,这看起来并不是一个好的方法,并且在尝试使用 Sink Kafka 连接器使用数据时也会带来问题。我复制了主题,因此有一个带有旧名称的版本(${topic}-key)并且它有效

为 Kafka Connect 指定主题策略的正确设置是什么?


你错过了key.converter and value.converter前缀,用于将配置传递到转换器。所以而不是:

key.subject.name.strategy
value.subject.name.strategy

你要:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

Source https://docs.confluence.io/current/connect/managing/configuring.html https://docs.confluent.io/current/connect/managing/configuring.html:

要将配置参数传递给键和值转换器,请在它们前面加上前缀key.converter. or value.converter.就像在工作配置中定义默认转换器时一样。请注意,只有在相应的转换器配置中指定时才使用这些key.converter or value.converter特性。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Connect 不支持主题策略 的相关文章

随机推荐

  • 将 javascript 添加到扩展 django 模板以进行 google 分析

    我有一个漂亮的小index html 文件 它是一个扩展模板 它的父文件是一个base html 文件 在我的例子中是base2 html 我试图将谷歌分析代码片段添加到我网站上的某些文件中 事实证明 如果我在扩展模板的标签中添加任何内容
  • Rails - 从我的观点中移出计算?

    目前我正在按照我的观点进行一些计算 这当然是一件坏事 我正在研究可以帮助我重构上述问题的方法 一件事是将计算移至我的控制器 category sum transaction sum amount cents 这可能是更好的解决方案 但你知道
  • 在 Matlab 中制作球的动画

    我有一个包含这两个方程的球 x t v0 cos t 并且 y t h v0 sin t 1 2gt 2 其中 t 0 t Final 是时间变量 h 是高度 v0 是初始速度 是 v0 与水平面形成的角度 g 9 8 m s 2 地板位于
  • 一些Python代码的解释

    这是屏障的基本示例 说明一些线程如何等待进入一个入口屏障和另一个出口屏障 虽然代码很好 但我并没有真正深入了解它是如何工作的 例如 我不明白为什么进入 Barrier 函数的线程在执行 n n 1 后 可以立即使 n n 1 从而影响全局
  • 正则表达式非贪婪(惰性)

    我正在尝试非贪婪地解析 TD 标签 我从这样的事情开始 td stuff td align right More stuff td align left 返回的记录如下 stuff td align right More stuff td
  • 将 À 等特殊字符与常规 A 进行比较

    在某些语言中 有类似的字母 我看到对于表视图部分 本机 iOS 将 在同一部分下A 我想做同样的事情 我通过比较第一个字母来构建我的部分 所以我需要那个 将等于 A 我尝试使用localizedCompare但我仍然不知道这两者是相等的 有
  • 使用 CustomAttributes 与 GetCustomAttributes() 的优点

    今天我注意到我的智能感知中出现了一些新属性System Type我的 NET 4 5 项目的对象 其中有一个叫做CustomAttributes 我对此很感兴趣 因为我之前就明白GetCustomAttributes是最昂贵的反射调用之一
  • C 编程自动八进制解释

    Code 1 int a 0987654321 printf d a Code 2 int a scanf d a printf d a 在这里 如果我们输入 0987654321 那么它会打印相同的内容 但在第一个代码片段中 它会给出一个
  • 基本PHP MySQL数组分组问题

    快速问题 我认为对于像我一样拥有最基本的 PHP MySQL 知识的人来说 这是一个非常简单的解决方案 我有一个存储在数据库中的各个州的城市列表 其中包含城市 州和一些其他变量 现在 它们被提取为按城市名称排序的列表 阿拉斯加安克雷奇 马里
  • DataGridView显示行标题单元格

    我正在尝试显示链接到 DataTable 的简单 DataGridView 并且最终我希望 DataTable 中的第一列成为 DataGridView 的行标题单元格 此时 我将满足于在行标题单元格中包含任何值 我可以显示带有所有行和列以
  • 标识符未定义

    我使用 VS2012 Express 用 C 编写了以下代码 void ac search uint num patterns uint pattern length const char patterns uint num records
  • 卷曲远程图像并调整其大小

    我使用此脚本来下载远程图像并调整其大小 在调整大小部分出现问题 它是什么
  • Android 使用自签名证书连接到服务器

    编辑 下面的代码工作正常 没有错误 没有异常 我知道关于这个主题的大量问题 以及谷歌想到的许多博客 我已通读它们并设法想出我将要解释的内容 我的疑问在于 我的方法正确吗 它有副作用吗 以及在我解释我的方法时最好提出的另一个问题 我基于此方法
  • NIO getParentFile().mkdir() [重复]

    这个问题在这里已经有答案了 有没有一种方法可以一次性创建文件和目录 如下所示 使用 Java 7 和 NIO 路径和文件静态方法 在哪里您不必键入路径 然后将文件分成单独的行 代码 File file new File Library te
  • 当调用clock_gettime()时返回的tv_nsec字段实际上可能超过一秒吗?

    当你调用clock gettime 它返回一个 timespec 结构 struct timespec time t tv sec seconds long tv nsec nanoseconds 我在手册页中没有找到 tv nsec 不会
  • 从连续的字序列中提取任意范围的位的最有效方法是什么?

    假设我们有一个std vector 或任何其他序列容器 有时它是一个双端队列 它存储uint64 t元素 现在 让我们将该向量视为一个序列size 64连续的位 我需要找到由给定的位组成的单词 begin end 范围 鉴于end begi
  • UItableVIew 中的效果或动画

    当我单击 tableView 时 它会显示类似这样的内容以显示详细信息 我怎样才能做到这一点 我认为你需要的是一个类似于手风琴的实现 以下是一些示例参考 您可以从这里开始 如何为 iPhone SDK 应用程序实现手风琴视图 https s
  • 一个由两个弹性项目组成的弹性盒网格,其中一个弹性项目旁边有一个[重复]

    这个问题在这里已经有答案了 我想在左侧放置一个 div 在右侧放置两个 div 这bottomright应始终低于topRight分区这topRight是唯一一个高度可变的 div 我目前正在尝试使用flexbox你可以在我下面的代码中看到
  • OpenCV 上的 Libpng 冲突?

    我正在尝试使用以下代码在 XCode 4 4 Mountain Lion 上打开 png 文件 适用于 jpg 文件 Mat image imread Users user name Desktop result png imshow im
  • Kafka Connect 不支持主题策略

    Context 我编写了几个小代码卡夫卡连接 https docs confluent io current connect index html连接器 一个每秒生成随机数据 另一个将其记录在控制台中 它们集成了一个模式注册表 https