Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题

2023-12-09

我有一个主题,最终会有很多不同的模式。目前它只有一个。 我已经通过 REST 创建了一个连接作业,如下所示:

{
 "name":"com.mycompany.sinks.GcsSinkConnector-auth2",
 "config": {
    "connector.class": "com.mycompany.sinks.GcsSinkConnector",
    "topics": "auth.events",
    "flush.size": 3,
    "my.setting":"bar",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry-service:8081",
    "value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "group.id":"account-archiver"

 }
}

然后,我使用字符串键和 avro 序列化负载向该主题推送一条消息。如果我在控制中心检查该主题,我会看到正确的反序列化数据。 查看连接实例的输出,尽管我在日志中看到了这一点

RROR WorkerSinkTask{id=com.mycompany.sinks.GcsSinkConnector-auth2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic auth.events to Avro:
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 7
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

从这里你可以看到有两个相关的问题:

  • Error retrieving Avro schema for id 7
  • Subject not found.; error code: 40401

让我烦恼的是,我已将策略指定为 RecordNameStrategy,我认为应该使用魔术字节来获取模式而不是主题名称,但它在未找到主题上出错。我不确定它实际上是在寻找主题名称还是通过 ID 获取架构。 无论哪种方式,都可以通过 ssh-ing 到连接实例并执行卷曲来http://schema-registry-service:8081/schemas/ids/7我确实得到了返回的架构。 此堆栈跟踪上方有一些额外的日志记录,令人失望的是它看起来仍然使用错误的名称策略:

INFO AvroConverterConfig values:
    schema.registry.url = [http://schema-registry-service:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = false
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

有谁知道如何解决这个问题?我正在使用以下图像:

  • confluenceinc/cp-kafka-connect:5.2.0
  • confluenceinc/cp-kafka:5.1.0

Thanks


在踪迹中,lookUpSubjectVersion意味着它试图在下面进行查找/subjects/:name/versions对于那里列出的每个 ID,然后找不到schemaId=7 (Note: notversion=7),虽然从日志中不太清楚什么:name它试图在这里使用,但如果没有找到,那么你会得到你的Subject not found错误。如果我的PR被接受了,主题名称会更清楚

我相信这可能是由于使用RecordNameStrategy. 查看该房产的 PR,我认为它实际上只是针对生产者/消费者代码进行了测试,而不是在 Connect API 中进行了彻底的测试。与默认行为相比TopicNameStrategy

其中,你可以看到它尝试使用

value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

但仔细一看,我认为你可能配置错误。

类似于你的方式value.converter.schema.registry.url,你实际上需要设置value.converter.value.subject.name.strategy反而。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题 的相关文章

  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试
  • 如何将我的 json 字符串 avro 二进制编码为字节数组?

    我有一个实际的 JSON 字符串 我需要将其 avro 二进制编码为字节数组 在经历了Apache Avro 规范 http avro apache org docs 1 7 7 spec html 我想出了下面的代码 我不确定这是否是正确
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • 使用 kafka java api 的 Avro 序列化器和反序列化器

    Kafka Avro 序列化器和反序列化器无法工作 我尝试使用 kafka 控制台消费者消费消息 我可以看到发布的消息 public class AvroProducer
  • 卡夫卡主题查看器? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想调试一些 Kafka 主题 这样我就知道消费者或生产者是否有问题 Kafka 是否有一个 UI 我
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • Kafka:如何获取主题的最后修改时间,即添加到主题的任何分区的最后一条消息

    我们的用例是从 kafka 中删除陈旧 未使用的主题 即如果某个主题 在所有分区上 在过去 7 天内没有任何新消息 那么我们会将其视为陈旧 未使用并删除它 许多谷歌结果建议向消息添加时间戳 然后解析它 对于新主题和消息 灵魂可以工作 但我们
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 如何检测 KTable 连接的哪一侧触发了更新?

    当您在 Kafka 中连接两个表时 每次更新两个 KTable 之一时 您的输出 Ktable 也会更新 想象一下你正在加入Customers与一个列表Orders你已经适当减少了 再次想象一下 您使用此连接的结果来为最终客户提供特别优惠和
  • Kafka不启动空白输出

    我正在努力安装 Kafka 和 Zookeeper 我已经运行了 Zookeeper 并且它当前正在运行 我将所有内容设置为 https dzone com articles running apache kafka on windows
  • kafka新版本2.1.0 Broker无故挂起

    起初 集群中的所有代理都可以启动并正常工作 但有时其中一个代理会遇到问题 并且会出现一些现象 整个集群挂了 生产者和消费者也不工作 因此从监视器来看网络流量降至零 使用kafka topic sh描述主题消息 每个副本都很好 即使是异常的b
  • 未能在kafka-storm中将偏移量数据写入zookeeper

    我正在设置一个风暴集群来计算实时趋势和其他统计数据 但是我在将 恢复 功能引入到这个项目中时遇到了一些问题 方法是允许上次读取的偏移量kafka spout 源代码为kafka spout来自https github com apache
  • Spring Kafka Acknowledgement.acknowledge 线程安全吗?

    我正在实现一个基于卡夫卡的应用程序 我想在其中手动确认传入消息 架构迫使我在单独的线程中完成它 问题是 在与消费者不同的线程中执行 Acknowledgement acknowledge 是否可能且安全 是的 只要你使用MANUAL并不是M
  • Kafka中如何同时实现分布式处理和高可用?

    我有一个由 n 个分区组成的主题 为了进行分布式处理 我创建了两个在不同机器上运行的进程 他们使用相同的 groupd id 订阅主题并分配 n 2 个线程 每个线程处理单个流 每个进程 n 2 个分区 这样我就可以实现负载分配 但现在如果
  • 获取:导入 Spark 模块时出错:没有名为“pyspark.streaming.kafka”的模块

    我需要将从 pyspark 脚本创建的日志推送到 kafka 我正在做 POC 所以在 Windows 机器上使用 Kafka 二进制文件 我的版本是 kafka 2 4 0 spark 3 0 和 python 3 8 1 我正在使用 p
  • 卡夫卡监听器中的钩子

    kafka 监听消息之前 之后是否有任何类型的钩子可用 使用案例 必须设置MDC关联id才能进行日志溯源 我在寻找什么 之前 之后回调方法 以便可以在进入时设置 MDC 关联 ID 并最终在退出时清除 MDC 编辑后的场景 我将关联 id
  • 无法使用 jolokia 从 Kafka 提取 JMX 数据

    我已经在 centos 7 机器上安装了 Jolokia 并尝试使用 Jolokia 代理提取 Kafka 指标 并使用 Nagios 插件 check jmx4perl 与 Icinga 监控工具集成 以下是我遵循的配置步骤 步骤1 下载
  • KafkaConsumer Java API subscribe() 与 allocate()

    我是 Kafka Java API 的新手 我正在研究使用来自特定 Kafka 主题的记录 我明白我可以使用方法subscribe 开始从主题轮询记录 Kafka还提供了方法assign 如果我想开始从主题的选定分区轮询记录 我想了解这是否
  • Avro ENUM 字段

    我正在尝试在 Avro 模式中创建 Union 字段并用它发送相应的 JSON 消息 但要拥有其中一个字段 null https avro apache org docs 1 8 2 spec html Unions https avro

随机推荐

  • 在 PHP 中调整图像大小的智能方法[关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心以获得指导 我想知道是否有人可以帮助我
  • 有这样的RTSP Ping吗?

    我目前正在开发一个 WinForm 应用程序 使用 C 中的 RTSP 协议从 IP 摄像机流式传输视频 一切都很好 该应用程序的部分要求包括检查网络摄像机是否在线的功能 因此 我使用 System Net NetworkInformati
  • 《Head First Design Patterns》一书中的接口与接口关联

    这本书首先设计模式将以下 UML 作为观察者模式的示例 这张图中让我印象深刻的是之间的关联关系Subject and Observer接口 据我了解Java接口 它们不能以这种方式实现 Has a 关系 当我查看几页后提供的实现示例时 我发
  • 如何在Python中对嵌套列表的外部和内部子列表进行排序?

    首先 如果这太天真 我深表歉意 我是初学者 我有以下类型的列表列表 我想首先按内部列表的最后一个成员按升序排序 data 1 45 0 2 49 2 3 98 0 4 82 1 5 77 1 6 98 2 我通过使用以下方法来实现此目的 s
  • 具有不同内容的跨路由的通用组件

    我有一个名为Header它存在于所有路线中 而应用程序的其余部分则发生变化 为了实现这一点 我的主要渲染代码如下所示 使用 ES6 render return div div
  • 如何检查用户是否登录

    我创建了一个登录页面 用户必须提供用户名和密码才能访问某些特定资源 他们可以在其中上传图像 或者只是编辑一些有关自己的描述 我的 web config 文件如下所示
  • php:获取ip地址

    我想获取访客的IP地址 你能告诉我什么元素吗 SERVER 我应该使用 SERVER HTTP CLIENT IP SERVER HTTP X FORWARDED FOR or SERVER REMOTE ADDR UPDATE 如果您的客
  • Xcode 4 中的这些图标代表什么?

    我以前从未见过这些 但是文件浏览器中文件名旁边的小 A 和 M 是做什么用的 让我根据SVN的知识猜测一下 A gt 新添加的文件 M gt 修改现有文件
  • 如何在 Titanium JS 中创建带有按钮的标题栏?

    我在用着钛合金构建一个应用程序 我尝试创建一个带有按钮的标题栏 类似于联系人应用程序 如下图所示 该标题的标题位于中间 按钮位于任一站点 我一直在到处寻找一种在钛中做到这一点的方法 但我还没有找到任何东西 文档中似乎没有这个内容 我需要创建
  • 如何在配置单元中保留驼峰式大小写的列名

    选择 12345 作为 EmpId 输出是 empid 值为 12345 有任何线索可以保持与 EmpId 相同的列名吗 不可能 这是 HIVE 元存储的限制 它以全小写形式存储表的模式 Hive 使用此方法来标准化列名称 请参阅表 jav
  • 内部访问修饰符与私有访问修饰符

    两者有什么区别internal and privateC 中的访问修饰符 internal适用于程序集范围 即只能从同一 exe 或 dll 中的代码访问 private适用于类范围 即只能从同一类中的代码访问
  • 为什么char数据的地址不显示?

    class Address int i char b string c public void showMap void void Address showMap void cout lt lt address of int lt lt i
  • 没有 Web 服务器的 Spring Boot

    我有一个简单的 Spring Boot 应用程序 它从 JMS 队列获取消息并将一些数据保存到日志文件中 但不需要 Web 服务器 有没有办法在没有Web服务器的情况下启动Spring Boot 春季启动 2 x 3 x 应用程序属性 sp
  • Laravel 4 不刷新

    我在 laravel 4 中遇到一个奇怪的问题 因为每次我尝试刷新页面时都不会出现更改 肯定不是浏览器的缓存 任何帮助表示赞赏 我遇到了同样的问题并找到了答案 尝试在 php ini 中禁用 OPcache 如果您使用MAMP 可以在 Ap
  • 隐藏已编译应用程序可执行代码的实践

    反编译和逆向工程 net 程序集是一种标准做法 我想发布一些将添加到现有应用程序的插件程序集 但我不希望它们被其他人使用 有哪些方法可以隐藏这些程序集的来源 除非控制目标硬件 否则理论上不可能实现 100 的保护 如果 CPU 能够执行它
  • 咖啡 | solver.prototxt值设置策略

    在 Caffe 上 我正在尝试实现一个用于语义分割的全卷积网络 我想知道是否有一个具体的策略来设置你的 solver prototxt 以下超参数的值 测试迭代器 测试间隔 迭代大小 max iter 这是否取决于您的训练集的图像数量 如果
  • c语言中绝对值的写法

    我知道该解决方案很丑陋并且在技术上不正确 但我不明白为什么代码不起作用 include
  • 包括 ACL 条件下的功能

    我有一个名为 MedicalFile 的资产 其中包含对组织的引用 参与者 HealthCareProfessional 也属于一个组织 现在我想定义一个 ACL 规则 限制医疗保健专业人员只能查看 MedicalFile 与其组织连接的医
  • Java 中 JESS 的输出

    我想将 事实 发送到java中的JESS文件并获取结果 我基本上对 JESS 文件进行批处理 然后通过 add 将我的数据 此处的结构 发送到引擎中 我试图将 JESS 结果 应该是一个字符串 转换为 值 Rete engine new R
  • Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题

    我有一个主题 最终会有很多不同的模式 目前它只有一个 我已经通过 REST 创建了一个连接作业 如下所示 name com mycompany sinks GcsSinkConnector auth2 config connector cl