flink 中的 Kafka 消费者

2023-12-10

我正在使用 kafka 和 apache flink。我正在尝试使用 apache flink 中的 kafka 主题的记录(采用 avro 格式)。下面是我正在尝试使用的代码片段。

使用自定义反序列化器对主题中的 avro 记录进行反序列化。

我发送到主题“test-topic”的数据的 Avro 架构如下。

{
  "namespace": "com.example.flink.avro",
  "type": "record",
  "name": "UserInfo",
  "fields": [
    {"name": "name", "type": "string"}
  ]
}

我正在使用的自定义解串器如下。

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

    private static final long serialVersionUID = 1L;

    private final Class<T> avroType;

    private transient DatumReader<T> reader;
    private transient BinaryDecoder decoder;

    public AvroDeserializationSchema(Class<T> avroType) {
        this.avroType = avroType;
    }


    public T deserialize(byte[] message) {
        ensureInitialized();
        try {
            decoder = DecoderFactory.get().binaryDecoder(message, decoder);
            T t = reader.read(null, decoder);
            return t;
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void ensureInitialized() {
        if (reader == null) {
            if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                reader = new SpecificDatumReader<T>(avroType);
            } else {
                reader = new ReflectDatumReader<T>(avroType);
            }
        }
    }


    public boolean isEndOfStream(T nextElement) {
        return false;
    }


    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avroType);
    }
}

这就是我的 flink 应用程序的编写方式。

public class FlinkKafkaApp {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers", "localhost:9092");
        kafkaProperties.put("group.id", "test");

        AvroDeserializationSchema<UserInfo> schema = new AvroDeserializationSchema<UserInfo>(UserInfo.class);

        FlinkKafkaConsumer011<UserInfo> consumer = new FlinkKafkaConsumer011<UserInfo>("test-topic", schema, kafkaProperties);

        DataStreamSource<UserInfo> userStream = env.addSource(consumer);

        userStream.map(new MapFunction<UserInfo, UserInfo>() {

            @Override
            public UserInfo map(UserInfo userInfo) {
                return userInfo;
            }
        }).print();

        env.execute("Test Kafka");

    }

我正在尝试打印发送到主题的记录,如下所示。{"name" :"sumit"}

Output:

我得到的输出是{"name":""}

任何人都可以帮助找出这里的问题是什么以及为什么我没有得到{"name" : "sumit"}作为输出。


Flink 文档说: Flink 的 Kafka Consumer 称为 FlinkKafkaConsumer08(对于 Kafka 0.9.0.x 版本为 09 等,对于 Kafka >= 1.0.0 版本则称为 FlinkKafkaConsumer)。它提供对一个或多个 Kafka 主题的访问。

我们不必编写自定义反序列化器来使用来自 Kafka 的 Avro 消息。

-读取特定记录:

DataStreamSource<UserInfo> stream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<>("test_topic", AvroDeserializationSchema.forSpecific(UserInfo.class), properties).setStartFromEarliest());

要读取 GenericRecords :

Schema schema = Schema.parse("{"namespace": "com.example.flink.avro","type": "record","name": "UserInfo","fields": [{"name": "name", "type": "string"}]}");
DataStreamSource<GenericRecord> stream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<>("test_topic", AvroDeserializationSchema.forGeneric(schema), properties).setStartFromEarliest());

更多细节 :https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumer

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink 中的 Kafka 消费者 的相关文章

  • 如何让 BlazeDS 忽略属性?

    我有一个 java 类 它有一个带有 getter 和 setter 的字段 以及第二对 getter 和 setter 它们以另一种方式访问 该字段 public class NullAbleId private static final
  • Junit:如何测试从属性文件读取属性的方法

    嗨 我有课ReadProperty其中有一个方法ReadPropertyFile返回类型的Myclass从属性文件读取参数值并返回Myclass目的 我需要帮助来测试ReadPropertyFile方法与JUnit 如果可能的话使用模拟文件
  • 如何通过 javaconfig 使用 SchedulerFactoryBean.schedulerContextAsMap

    我使用 Spring 4 0 并将项目从 xml 移至 java config 除了访问 Service scheduleService 带注释的类来自QuartzJobBean executeInternal 我必须让它工作的 xml 位
  • 动态选择端口号?

    在 Java 中 我需要获取端口号以在同一程序的多个实例之间进行通信 现在 我可以简单地选择一些固定的数字并使用它 但我想知道是否有一种方法可以动态选择端口号 这样我就不必打扰我的用户设置端口号 这是我的一个想法 其工作原理如下 有一个固定
  • Spring AspectJ 在双代理接口时失败:无法生成类的 CGLIB 子类

    我正在使用Spring的
  • Pig Udf 显示结果

    我是 Pig 的新手 我用 Java 编写了一个 udf 并且包含了一个 System out println 其中的声明 我必须知道在 Pig 中运行时该语句在哪里打印 假设你的UDF 扩展了 EvalFunc 您可以使用从返回的 Log
  • 如何更改javaFX中按钮的图像?

    我正在使用javaFX 我制作了一个按钮并为此设置了图像 代码是 Image playI new Image file c Users Farhad Desktop icons play2 jpg ImageView iv1 new Ima
  • Java 公历日历更改时区

    我正在尝试设置 HOUR OF DAY 字段并更改 GregorianCalendar 日期对象的时区 GregorianCalendar date new GregorianCalendar TimeZone getTimeZone GM
  • 从最终实体获取根证书和中间证书

    作为密码学的菜鸟 我每天都会偶然发现一些简单的事情 今天只是那些日子之一 我想用 bouncy castle 库验证 java 中的 smime 消息 我想我几乎已经弄清楚了 但此时的问题是 PKIXparameters 对象的构建 假设我
  • 将 MOXy 设置为 JAXB 提供程序,而在同一包中没有属性文件

    我正在尝试使用 MOXy 作为我的 JAXB 提供程序 以便将内容编组 解组到 XML JSON 中 我创建了 jaxb properties 文件 内容如下 javax xml bind context factory org eclip
  • jdbc mysql loginTimeout 不起作用

    有人可以解释一下为什么下面的程序在 3 秒后超时 因为我将其设置为在 3 秒后超时 12秒 我特意关闭了mysql服务器来测试mysql服务器无法访问的这种场景 import java sql Connection import java
  • Hibernate 的 PersistentSet 不使用 hashCode/equals 的自定义实现

    所以我有一本实体书 public class Book private String id private String name private String description private Image coverImage pr
  • Spring Boot Data JPA 从存储过程接收多个输出参数

    我尝试通过 Spring Boot Data JPA v2 2 6 调用具有多个输出参数的存储过程 但收到错误 DEBUG http nio 8080 exec 1 org hibernate engine jdbc spi SqlStat
  • 如何在谷歌地图android上显示多个标记

    我想在谷歌地图android上显示带有多个标记的位置 问题是当我运行我的应用程序时 它只显示一个位置 标记 这是我的代码 public class koordinatTask extends AsyncTask
  • Eclipse 选项卡宽度不变

    我浏览了一些与此相关的帖子 但它们似乎并不能帮助我解决我的问题 我有一个项目 其中 java 文件以 2 个空格的宽度缩进 我想将所有内容更改为 4 空格宽度 我尝试了 正确的缩进 选项 但当我将几行修改为 4 空格缩进时 它只是将所有内容
  • Cucumber 0.4.3 (cuke4duke) 与 java + maven gem 问题

    我最近开始为 Cucumber 安装一个示例项目 并尝试使用 maven java 运行它 我遵循了这个指南 http www goodercode com wp using cucumber tests with maven and ja
  • 如何使用mockito模拟构建器

    我有一个建造者 class Builder private String name private String address public Builder setName String name this name name retur
  • 包 javax.el 不存在

    我正在使用 jre6 eclipse 并导入 javax el 错误 包 javax el 不存在 javac 导入 javax el 过来 这不应该是java的一部分吗 谁能告诉我为什么会这样 谢谢 米 EL 统一表达语言 是 Java
  • Spring Rest 和 Jsonp

    我正在尝试让我的 Spring Rest 控制器返回jsonp但我没有快乐 如果我想返回 json 但我有返回的要求 完全相同的代码可以正常工作jsonp我添加了一个转换器 我在网上找到了用于执行 jsonp 转换的源代码 我正在使用 Sp
  • Java中super关键字的范围和使用

    为什么无法使用 super 关键字访问父类变量 使用以下代码 输出为 feline cougar c c class Feline public String type f public Feline System out print fe

随机推荐

  • DuplicateHandle(),在第一个或第二个进程中使用?

    Windows API DuplicateHandle http msdn microsoft com en us library ms724251 VS 85 aspx需要复制对象句柄以及原始进程和要在其中使用复制句柄的其他进程的句柄 我
  • 如何使用flask创建进度条? [复制]

    这个问题在这里已经有答案了 只是想在我的 html 页面中插入一个进度条 它应该从我的 app py 中的 for 加载 这就是我到目前为止所做的 app py from flask import Flask render template
  • 创建具有不同样式的大量文本 - JavaFX FXML

    在我的 JavaFx 应用程序的 fxml 类中 我想使用最少的组件添加大量文本 而不是每行添加多个标签 我还想在同一组件中创建不同样式的文本 我应该使用什么组件 例如 TextArea 以及如何在其中创建多种样式 使用 css Use a
  • 如何使用标准输入在 Swift 3.0 中运行进程

    我在使用 Swift Process 运行 MySQL 恢复转储文件时遇到问题 let command usr local bin mysql h theHost P 3306 u root pTheInlinePassword examp
  • 使用对象映射器解析嵌套的字典数组

    我正在解析一个 Web api 响应 它是一个字典数组 每个字典又都有一个嵌套的字典数组 我该如何解析它 请提供一些代码示例 我的 API 响应是 FilingStatusId 0 FormName MISC OrderId 0 Recip
  • 如何将 HTTPS 与 Microsoft.AspNet.Server.WebListener 结合使用

    在本文的最后http www asp net vnext overview aspnet vnext create a web api with mvc 6它描述了如何使用 Microsoft AspNet Server WebListen
  • 如何实现在更改时自动更新的可变 PickleTypes

    SQLAlchemy 提供PickleType和优惠突变追踪对于任何可变的类型 如字典 SQLAlchemy 文档提到这是实现可变的方法PickleType但它没有具体说明如何进行 Note 我想在中存储一个字典PickleType 你如何
  • 何时使用 HtmlControls 与 WebControls

    我喜欢 HtmlControls 因为没有 HTML 魔法 asp 源代码看起来与客户端看到的类似 我无法否认 GridView Repeater CheckBoxLists 等的实用性 因此当我需要这些功能时我会使用它们 另外 混合和匹配
  • 使用 intptr_t 而不是 void*?

    使用是一个好主意吗intptr t作为通用存储 保存指针和整数值 而不是void 如下所示 http www crystalspace3d org docs online manual Api1 005f0 64 002dBit Porta
  • 如何使用 pyinstaller 将多个 python 文件编译为单个 .exe 文件

    我已经在 python 中创建了一个 GUI 使用 Tkinter 并且使用 os system python file py 从 GUI 单击按钮即可运行 python 文件 我想使用 pyinstaller 将所有这些 python 文
  • 在 KUbuntu 22.04 上的 Visual Studio Code 中点击快速修复键盘快捷键会生成“e”

    在我的 KUbuntu 22 04 中 当我按下键盘快捷键进行快速修复时 即ctrl 在应用程序中 它产生一个小 e 而不是做任何它期望做的事情 我在网上搜索了这个问题 只找到了这个link 但是 它没有给出解决此问题的任何指导 有人遇到过
  • 安全性:tcl 中的会话标识符未更新

    我正在开发开源应用程序 项目 开放 在扫描过程中我发现了以下漏洞 Medium Session Identifier Not Updated Issue 13800882 Severity Medium URL https
  • 如何在 mysql 查询的“IN”子句中使用 PHP 中的值数组?

    get all id s of ur friend that has installed your application friend pics facebook gt api array method gt fql query quer
  • Next.js getServerSideProps 始终未定义

    我已经开始使用新的 Next 应用程序 并尽可能使用功能组件而不是基于类的组件 继文档 我设置了以下内容但没有运气 import React from react import GetServerSideProps InferGetServ
  • ui grid 将更新的单元格数据保存到数据库

    我正在研究 ui 网格编辑单元格功能 我需要使用 REST API 将编辑后的单元格值更新到数据库 另外 我如何获取控制器中选择的行列表 我的工作代码 var app angular module app ngTouch ui grid u
  • 使用JNA加载多个依赖库

    JNA中有没有办法用Java加载多个依赖库 我通常使用Native loadLibrary 加载一个 DLL 但我猜它不会以这种方式工作 因为我将此函数调用分配给实例成员 假设我有图书馆foo和图书馆bar bar依赖于foo 它也依赖于b
  • 多数独人工智能方法

    我正在概念化一个求解器的变体sudoku called 多重数独 其中多个板重叠 如下所示 如果我正确理解游戏 那么您必须以这样的方式解决每个网格 即任何两个或多个网格之间的重叠都是每个网格解决方案的一部分 我不确定我应该如何思考这个问题
  • 为什么 IntelliJ 的 Java 编辑器中添加灰色的 var:colon

    我安装了IntelliJ 2016 3 2 构建 IC 163 10154 41 建于2016年12月21日 灰色的 var colon 会自动添加到 Java 编辑器中调用方方法的参数前面 如下所示 添加灰色的 a b 为什么会发生这种情
  • 通过 Cordova config.xml 将条目添加到 iOS .plist 文件

    我是 Cordova CLI 的新手 我需要通过 Cordova 以编程方式执行以下步骤 在项目 plist中添加一个新行 在新行中输入以下值 Key GD库模式Type 字符串 默认 Value GD企业模拟 我想我需要在项目根目录下的
  • flink 中的 Kafka 消费者

    我正在使用 kafka 和 apache flink 我正在尝试使用 apache flink 中的 kafka 主题的记录 采用 avro 格式 下面是我正在尝试使用的代码片段 使用自定义反序列化器对主题中的 avro 记录进行反序列化