是否可以反序列化 Avro 消息(使用来自 Kafka 的消息)而不在 ConfluenceRegistryAvroDeserializationSchema 中提供 Reader 模式

2023-12-04

我在 Apache Flink 中使用 Kafka Connector 来访问由汇流卡夫卡.

除了 schema 注册表 url 之外ConfluentRegistryAvroDeserializationSchema.forGeneric(...)期待“读者”模式。 我不想提供读取模式,而是想使用同一作者的模式(在注册表中查找)来读取消息,因为消费者不会有最新的模式。

FlinkKafkaConsumer010<GenericRecord> myConsumer =
        new FlinkKafkaConsumer010<>("topic-name", ConfluentRegistryAvroDeserializationSchema.forGeneric(<reader schema goes here>, "http://host:port"), properties);
myConsumer.setStartFromLatest();

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html“使用这些反序列化模式记录将与从模式注册表检索并转换为静态提供的模式一起读取”

由于我不想在消费者端保留架构定义,如何使用编写者的架构反序列化来自 Kafka 的 Avro 消息?

感谢你的帮助!


我认为不能直接使用ConfluentRegistryAvroDeserializationSchema.forGeneric。它旨在与读者模式一起使用,并且他们有对此进行检查的先决条件。

你必须实施你自己的。两个重要的东西:

  • Set specific.avro.reader为 false(否则您将获得特定记录)
  • The KafkaAvroDeserializer必须延迟初始化(因为它本身不可序列化,因为它保存对模式注册表客户端的引用)
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;

public class KafkaGenericAvroDeserializationSchema
    implements KeyedDeserializationSchema<GenericRecord> {

  private final String registryUrl;
  private transient KafkaAvroDeserializer inner;

  public KafkaGenericAvroDeserializationSchema(String registryUrl) {
    this.registryUrl = registryUrl;
  }

  @Override
  public GenericRecord deserialize(
      byte[] messageKey, byte[] message, String topic, int partition, long offset) {
    checkInitialized();
    return (GenericRecord) inner.deserialize(topic, message);
  }

  @Override
  public boolean isEndOfStream(GenericRecord nextElement) {
    return false;
  }

  @Override
  public TypeInformation<GenericRecord> getProducedType() {
    return TypeExtractor.getForClass(GenericRecord.class);
  }

  private void checkInitialized() {
    if (inner == null) {
      Map<String, Object> props = new HashMap<>();
      props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
      props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
      SchemaRegistryClient client =
          new CachedSchemaRegistryClient(
              registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
      inner = new KafkaAvroDeserializer(client, props);
    }
  }
}

env.addSource(
  new FlinkKafkaConsumer<>(
    topic, 
    new KafkaGenericAvroDeserializationSchema(schemaReigstryUrl), 
    kafkaProperties));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

是否可以反序列化 Avro 消息(使用来自 Kafka 的消息)而不在 ConfluenceRegistryAvroDeserializationSchema 中提供 Reader 模式 的相关文章

  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • Apache Kafka 是否提供异步订阅回调 API?

    我的项目正在将 Apache Kafka 视为老化的基于 JMS 的消息传递方法的潜在替代品 为了让这个过渡尽可能的顺利 如果替代的排队系统 Kafka 有一个异步订阅机制那就更理想了 类似于我们当前项目使用的JMS机制MessageLis
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • 使用 offsets_for_times 从时间戳消费

    尝试使用 confluence kafka AvroConsumer 来消费给定时间戳的消息 if flag creating a list topic partitons to search list map lambda p Topic
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB
  • 我的 Kafka 流应用程序刚刚退出,代码为 0,什么也不做

    为了尝试 Kafka 流 我这样做了 public static void main String args final StreamsBuilder builder new StreamsBuilder final Properties
  • Kafka:如何获取主题的最后修改时间,即添加到主题的任何分区的最后一条消息

    我们的用例是从 kafka 中删除陈旧 未使用的主题 即如果某个主题 在所有分区上 在过去 7 天内没有任何新消息 那么我们会将其视为陈旧 未使用并删除它 许多谷歌结果建议向消息添加时间戳 然后解析它 对于新主题和消息 灵魂可以工作 但我们
  • Flink 窗口:聚合并输出到接收器

    我们有一个数据流 其中每个元素都是这种类型 id String type Type amount Integer 我们想要聚合这个流并输出总和amount每周一次 目前的解决方案 Flink 管道示例如下所示 stream keyBy ty
  • 当我们在 Apache Spark 中使用时,无法找到 Set([TOPIC NAME,0])) 的领导者

    我们使用 Apache Spark 1 5 1 和 kafka 2 10 0 8 2 1 以及 Kafka DirectStream API 通过 Spark 从 Kafka 获取数据 我们使用以下设置在 Kafka 中创建了主题 复制因子
  • 频繁出现“offset out of range”消息,分区被消费者抛弃

    我们正在运行 3 节点 Kafka 0 10 0 1 集群 我们有一个消费者应用程序 它有一个连接到多个主题的消费者组 我们在消费者日志中看到奇怪的行为 有了这些线 Fetch offset 1109143 is out of range
  • kafka新版本2.1.0 Broker无故挂起

    起初 集群中的所有代理都可以启动并正常工作 但有时其中一个代理会遇到问题 并且会出现一些现象 整个集群挂了 生产者和消费者也不工作 因此从监视器来看网络流量降至零 使用kafka topic sh描述主题消息 每个副本都很好 即使是异常的b
  • Apache Flink 中的并行度

    我可以为 Flink 程序中任务的不同部分设置不同的并行度吗 例如 Flink 如何解释以下示例代码 两个自定义实践者MyPartitioner1 MyPartitioner2 将输入数据划分为两个4和2个分区 partitionedDat
  • 如何避免连续“重置偏移量”和“寻找最新偏移量”?

    我正在尝试遵循本指南 https spark apache org docs latest structed streaming kafka integration html https spark apache org docs late
  • Flink - 无法从检查点恢复

    我使用一个作业管理器和两个任务管理器在 kubernetes 上运行集群 我通过在作业运行时杀死一个任务管理器 Pod 来测试检查点机制 我在作业管理器和重新启动的任务管理器上遇到以下异常 工作经理例外 java lang Exceptio
  • 《使用 Apache Flink 进行流处理》如何从 IntelliJ 运行书籍代码?

    如中所述这个帖子 https stackoverflow com questions 61043860 how to run first example of apache flink我无法成功运行 使用 Apache Flink 进行流处
  • 卡夫卡监听器中的钩子

    kafka 监听消息之前 之后是否有任何类型的钩子可用 使用案例 必须设置MDC关联id才能进行日志溯源 我在寻找什么 之前 之后回调方法 以便可以在进入时设置 MDC 关联 ID 并最终在退出时清除 MDC 编辑后的场景 我将关联 id
  • 如何在kafka中定义多个序列化器?

    比如说 我发布和使用不同类型的 java 对象 对于每个对象 我必须定义自己的序列化器实现 我们如何在 serializer class 属性下提供kafka消费者 生产者属性文件中的所有实现 我们有一个类似的设置 不同主题中的不同对象 但
  • 使用 scala 在 Flink 中进行实时流预测

    弗林克版本 1 2 0斯卡拉版本 2 11 8 我想使用 DataStream 来使用 scala 中的 flink 模型进行预测 我在使用 scala 的 flink 中有一个 DataStream String 其中包含来自 kafka
  • KafkaConsumer Java API subscribe() 与 allocate()

    我是 Kafka Java API 的新手 我正在研究使用来自特定 Kafka 主题的记录 我明白我可以使用方法subscribe 开始从主题轮询记录 Kafka还提供了方法assign 如果我想开始从主题的选定分区轮询记录 我想了解这是否

随机推荐

  • Perl 更改调用者的工作目录

    我想编写一个 perl 脚本 将其工作目录更改为其他位置 执行某些操作 然后在从 shell 调用它后将我留在该目录中 chdir只做第一部分 如何更改调用者的工作目录 This is可能的 但是 您必须打开其中之一 dev mem设备处于
  • 使用 Facebook Graph Api 搜索帖子

    我想使用图形 API 搜索过去 30 天的数据的帖子 新闻源 这样做的最佳做法是什么 Facebook Graph API 是否有 API 限制来限制 HTTP 请求的请求 注意 以下内容都不再有效 从 Facebook Graph API
  • NSDate isEqualToDate:不起作用 - 它是否查看秒和分秒?

    我不明白为什么这个方法不起作用 使用 isEqualToDate 时是否查看秒数和分秒数 Test that the NSDate category s DatePlusDays method works void testNSDateCa
  • 在 JavaScript 中通过 ID 引用 ASP.NET 控件?

    当 ASP NET 控件呈现时 它们的 id 有时会发生变化 就像它们位于命名容器中一样 Button1实际上可能有一个 idctl00 ContentMain Button1例如 当它被渲染时 我知道您可以将 JavaScript 作为字
  • jQuery Mobile 默认选项卡

    我想在 jQuery Mobile 中设置默认选项卡 我的源代码 div div ul li a href one one a li li a href two two a li li a href ajax content html th
  • 将 bash 脚本添加到路径

    我想向 linux PATH 添加一个小脚本 这样我就不必在磁盘上物理放置的位置实际运行它 该脚本非常简单 就是通过代理提供 apt get 访问权限 我这样做是这样的 bin bash array len array args array
  • JQuery Datepicker 返回 Date 对象类型

    Datepicker返回的对象类型是什么 假设我有以下内容 txtbox datepicker onClose function date something What is date 我有兴趣从另一个日期选择器读取日期对象进行比较 例如
  • Wix 安装、服务器、客户端或两者

    我想创建 Wix 安装程序来安装 Client Server或两者都基于用户选择 我的问题是当用户选择服务器时如何限制安装客户端 它被安装 因为它的级别 1 我已经定制了WixUI Mondo 我尝试使用组件条件但没有成功
  • 任何 iPhone 应用程序使用的内存

    关于iOS内存管理 我有一些不明白的地方 我想知道 iPhone 应用程序在设备上运行时通常需要多少内存 是否有像 10MB 这样的固定数字 如果应用程序包含大量大图像 对内存有何影响 它们仅在加载时影响内存吗 当有多个应用程序运行时 iO
  • WebBrowser 控件报告什么 UserAgent?

    只是想知道 VB NET 在访问网页时读取的浏览器类型是什么 例如 在我的网站上 它显示了访问我的网站的所有不同浏览器的详细信息 您没有为您的问题提供太多背景信息 但我认为您正在谈论用户代理字符串当您使用时发送的WebBrowser con
  • 使用路径变量在 golang 中调用 GET REST API

    我是第一次尝试Golang 我正在尝试调用具有路径变量的 GET REST API 我正在使用 net http 我正在尝试如下所示 但到目前为止还没有运气 我需要知道如何使用路径变量并从代码中传递该变量 任何帮助或代码示例将不胜感激 这似
  • SQL Server - 重叠数据的累积总和 - 获取总和达到给定值的日期

    在我们公司 我们的客户执行我们在不同表中记录的各种活动 面试出勤 课程出勤和其他一般活动 我有一个数据库视图 它将所有这些表中的数据结合在一起 为我们提供了如下所示的 ActivityView 正如您所看到的 一些活动是重叠的 例如 在参加
  • 生成 1D 张量作为 2D 张量的行的唯一索引

    假设我们通过为每个不同的行提供不同的索引来将 2D 张量转换为 1D 张量 从0 to the number of rows 1 1 2 1 3 1 4 gt 0 1 2 但如果有相同的行 那么我们重复索引 如下所示 1 2 1 2 1 4
  • 将自定义小部件添加到 QTableWidget 单元格

    我有使用 qt 设计器制作的自定义小部件 我想将其添加到 QTableWidget 单元格中 但这不起作用 这是代码 int nRows 10 for int row 0 row lt nRows row QTableWidgetItem
  • 使用sscanf多次读取字符串

    我正在尝试读取多维数组内字符串的内容 问题是 当我这样做时 sscanf 继续仅读取第一个字符 我的绳子上有这个 A1 A2 A3 A4 我想读取 c d 如果只是 A1 我可以读取这个 但是当这种情况发生时 它只读取 A1 我这样做是为了
  • global-variable-exists 在 Sass 中触发错误

    我正在使用类似三元的语句来初始化 Sass 中的变量 这允许我将一些默认变量设置为 Zurb Foundation 正在使用的相同变量 但如果我决定不包含 Foundation 模块 那么事情就不应该落在他们头上 nav link icon
  • 如何用 Python 解释 JavaScript

    可以用Python运行JavaScript吗 有任何库可以实现这一点吗 我需要执行一些 JavaScript 我知道这对于某些 Java 库是可能的 但我更喜欢 Python 有人可以给我一个线索吗 此致 你可以检查蜘蛛猴
  • 有没有一种方法可以在不使用 IIS 的情况下使用 WCF 自定义友好 URL?

    有没有一种方法可以在不使用 IIS 的情况下使用 WCF 自定义友好 URL 特别是 我希望通过我自己的 Windows 服务中托管的 app config 执行类似的操作 WebGet UriTemplate foo id public
  • JSF Eclipse 设计器

    是否有任何 Eclipse 插件可以让我像 Visual Studio 一样在网页上拖放 JSF 组件 这样我就不用写了
  • 是否可以反序列化 Avro 消息(使用来自 Kafka 的消息)而不在 ConfluenceRegistryAvroDeserializationSchema 中提供 Reader 模式

    我在 Apache Flink 中使用 Kafka Connector 来访问由汇流卡夫卡 除了 schema 注册表 url 之外ConfluentRegistryAvroDeserializationSchema forGeneric