带有 kafka-avro-console-consumer 的未知魔法字节

2024-04-26

我一直在尝试将 Confluence 中的 kafka-avro-console-consumer 连接到我们的旧版 Kafka 集群,该集群是在没有 Confluence Schema Registry 的情况下部署的。 我使用以下属性显式提供了架构:

kafka-console-consumer --bootstrap-server kafka02.internal:9092 \
    --topic test \
    --from-beginning \
    --property key.schema='{"type":"long"}' \
    --property value.schema='{"type":"long"}'

但我收到“未知的魔法字节!”错误与org.apache.kafka.common.errors.SerializationException

是否可以使用 Confluence kafka-avro-console-consumer 使用来自 Kafka 的 Avro 消息,这些消息未使用 Confluence 中的 AvroSerializer 和 Schema 注册表进行序列化?


Confluence Schema Registry 序列化器/反序列化器使用有线格式 https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format其中在消息的初始字节中包含有关架构 ID 等的信息。

如果您的消息尚未使用架构注册表序列化程序进行序列化,那么您将无法使用它反序列化它,并且将得到Unknown magic byte! error.

因此,您需要编写一个消费者来提取消息,使用 Avro avsc 架构进行反序列化,然后假设您想要保留数据,使用架构注册表序列化器 https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#sr-serializer-and-formatter

Edit:我最近写了一篇文章,更深入地解释了整个事情:https://www.confluence.io/blog/kafka-connect-deep-dive-converters-serialization-explained https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

带有 kafka-avro-console-consumer 的未知魔法字节 的相关文章

  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • Spark shell (spark 3.0.0) 添加包 confluence kafka 5.5.1 javax.ws.rs-api 问题

    我本地的win10 WSL回到ubuntu 在ubuntu上 我安装了spark3 0 0 confluence平台5 5 1 手动下载 当我尝试运行spark shell或spark submit时 下面是shell示例 spark sh
  • 为每个键使用主题中的最新值

    我有一个 Kafka 生产者 它正在以高速率生成消息 消息键是用户名 值是他在游戏中的当前分数 Kafka消费者处理消费消息的速度相对较慢 在这里 我的要求是显示最新的分数并避免显示陈旧的数据 但代价是某些分数可能永远不会显示 本质上 对于
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 频繁出现“offset out of range”消息,分区被消费者抛弃

    我们正在运行 3 节点 Kafka 0 10 0 1 集群 我们有一个消费者应用程序 它有一个连接到多个主题的消费者组 我们在消费者日志中看到奇怪的行为 有了这些线 Fetch offset 1109143 is out of range
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或
  • 有没有办法使用 .NET 中的 Kafka Ksql Push 查询

    我目前正在 NET 中使用 Kafka 消费者处理大量 Kafka 消息 我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息 我不想首先处理 特别是不下载 那些不需要的消息 看起来 kSql 查询 写为推送查
  • Spring Boot 和 Kafka,Producer 抛出 key='null' 异常

    我正在尝试使用Spring Boot with Kafka and ZooKeeper with Docker docker compose yml version 2 services zookeeper image wurstmeist
  • 嵌入式 Kafka 测试随机失败

    我使用 EmbededKafka 实现了一系列集成测试 以测试使用 spring kafka 框架运行的一个 Kafka 流应用程序 流应用程序正在从 Kafka 主题读取消息 将其存储到内部状态存储中 进行一些转换并将其发送到另一个微服务
  • 如何使用 Python 在 Kafka 中生成 Tombstone Avro 记录?

    我的水槽属性 name jdbc oracle config connector class io confluent connect jdbc JdbcSinkConnector tasks max 1 topics orders con
  • 带有 spring-kafka 的 Kafka 死信队列 (DLQ)

    最好的实施方式是什么死信队列 DLQ Spring Boot 2 0 应用程序中的概念 使用 spring kafka 2 1 x 来处理无法处理的所有消息 KafkaListener某些bean发送到某些预定义的Kafka DLQ主题的方
  • Kafka 是否保证具有任何配置参数值的单个分区内的消息排序?

    如果我在 Producer 中将 Kafka 配置参数设置为 1 retries 3 2 max in flight requests per connection 5 那么一个分区内的消息很可能不按 send order 排列 Kafka
  • 从kafka获取特定时间段的结果

    这是我的代码 它使用kafka python now datetime now month ago now relativedelta month 1 topic some topic name consumer KafkaConsumer
  • 将 Kafka 输入流动态连接到多个输出流

    Kafka Streams 中是否内置了允许将单个输入流动态连接到多个输出流的功能 KStream branch允许基于真 假谓词进行分支 但这并不是我想要的 我希望每个传入的日志都确定它将在运行时流式传输到的主题 例如日志 date 20
  • 如何使用PySpark结构流+Kafka

    我尝试将 Spark 结构流与 kafka 一起使用 并且在使用 Spark 提交时遇到问题 消费者仍然从生产中接收数据 但 Spark 结构出错 请帮我找到我的代码的问题 这是我在 test py 中的代码 from kafka impo
  • 如何在kafka中定义多个序列化器?

    比如说 我发布和使用不同类型的 java 对象 对于每个对象 我必须定义自己的序列化器实现 我们如何在 serializer class 属性下提供kafka消费者 生产者属性文件中的所有实现 我们有一个类似的设置 不同主题中的不同对象 但
  • Windows 上的 Apache Kafka 错误 - 无法找到或加载主类 QuorumPeerMain

    我刚刚从 Apache 网站下载了 Kafka 2 8 0 我正在尝试使用网站上给出的说明进行设置 但是当我尝试启动 Zookeper 服务器时 出现以下错误 错误 无法找到或加载主类 org apache zookeeper server
  • 如何获取 Kafka 偏移量以进行结构化查询以进行手动且可靠的偏移量管理?

    Spark 2 2引入了Kafka的结构化流源 据我了解 它依赖 HDFS 检查点目录来存储偏移量并保证 恰好一次 消息传递 但是旧码头 比如https blog cloudera com blog 2017 06 offset manag
  • kafka中的Bootstrap服务器与zookeeper?

    为什么在 kafka consumer 中不推荐使用 Zookeeper 以及为什么建议使用 bootstrap 服务器 bootstrap server 有什么优点 Kafka消费者需要将偏移量提交给kafka并从kafka获取偏移量 由

随机推荐