Flink消费kafka出现空指针异常

2023-11-18


tombstone : Kafka中提供了一个墓碑消息(tombstone)的概念,如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息.

出现场景:

双流join时,采用的是left join的方式,众所周知该方式会产生回撤流,下游kafka连接器使用的是upsert-kafka,在产生回撤流时,kafka会删除未join上的消息,填充join后的消息进去。

表现:

在这里插入图片描述

问题:

此时消费该topic的flink程序会出现,空指针异常

DataStream Api会出现,Table Api 未发现

解决:

自定义kafka反序列化器过滤Null值,flink1.14.4
代码:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("")
                .setTopics("test")
                .setGroupId("gid")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new MySimpleStringSchema())
                .setProperty("auto.offset.commit", "false")
                .build();
        DataStreamSource<String> kfkDs = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk");
        kfkDs.print();

        env.execute();
    }

    // 自定义反序列化器
    static class MySimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String>{

        @Override
        public String deserialize(byte[] message) {
            if (message != null) return new String(message, StandardCharsets.UTF_8);
            else{
                return deserialize(new byte[1]); // 返回空 不是Null
            }
        }

        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }

        @Override
        public byte[] serialize(String element) {
            return element.getBytes(StandardCharsets.UTF_8);
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink消费kafka出现空指针异常 的相关文章

  • 2021春招已正式开启,阿里巴巴企业智能事业部内推,有意者看下文!

    前言 说一说已经拿到内推的两个朋友的面试经验 你们可以看一下准备一下 同事A阿里巴巴一面 55分钟 先介绍一下自己吧 说一下自己的优缺点 具体讲一下之前做过的项目 你觉得项目里给里最大的挑战是什么 Hashmap为什么不用平衡树 AQS知道
  • java脚本引擎Groovy实战

    前言 互联网时代随着业务的飞速发展 不仅产品迭代 更新的速度越来越快 个性化需求也是越来越多 如何快速的满足各种业务的个性化需求是我们要重点思考的问题 我们开发的系统如何才能做到热部署 不重启服务就能适应各种规则变化呢 实现业务和规则的解耦
  • ZooKeeper踩坑

    一 下载安装包时要下载文件名中带有bin的安装包 否则会报错 找不到或无法加载主类 org apache zookeeper server quorum QuorumPeerMain Error contacting service 这是由
  • 一文打通Sleuth+Zipkin 服务链路追踪

    1 为什么用 微服务架构是一个分布式架构 它按业务划分服务单元 一个分布式系统往往有很多个服务单元 由于服务单元数量众多 业务的复杂性 如果出现了错误和异常 很难去定位 主要体现在 一个请求可能需要调用很多个服务 而内部服务的调用复杂性 决
  • Hadoop 完全分布式运行实战

    Hadoop运行模式包括 本地模式 伪分布式模式以及完全分布式模式 Hadoop官方网站 Apache Hadoop 流程步骤 准备3台客户机 关闭防火墙 静态ip 主机名称 安装JDK 配置环境变量 安装Hadoop 配置环境变量 配置集
  • Springboot结合Redis实现分布式定时任务

    一 背景 之前分享过分布式定时任务的技术选型方案 分布式定时任务技术选型方案 个人青睐xxl job 分享了搭建接入流程 xxl job搭建方案 本次项目需求较为简单 同时时间紧张 下面介绍利用Redis锁实现分布式定时任务的方案 二 Sc
  • 2023测试工程师核心软技能「情绪管理」

    大家好呀 我是小码哥 我之前经常提到一句话 大多数时候所谓的 技术之玻璃天花板 其实只是缺乏软技能而已 所以粉丝朋友们 我们除了需要关注技术 更需要注重软技能的提高 关于软技能相关的文章 之前写过学习方法 职业规划 时间管理 项目管理 团队
  • 项目实战之RabbitMQ冗余双写架构

    作者名称 DaenCode gt https blog csdn net 2302 79094329 作者简介 啥技术都喜欢捣鼓捣鼓 喜欢分享技术 经验 生活 人生感悟 尝尽人生百味 方知世间冷暖 所属专栏 项目所感所想 gt https
  • 深入理解软件测试中的Web请求流程!

    在软件开发的过程中 软件测试是不可或缺的一环 它有助于确保软件系统的稳定性 可靠性和安全性 而在众多测试中 Web请求流程的测试显得尤为重要 因为几乎所有的现代应用都离不开网络交互 接下来我们将深入探讨软件测试中完整的Web请求流程 帮助大
  • 一文弄懂事件Event与Kafka的区别

    事件 Event 和 Apache Kafka 是两个概念层面上有所不同的东西 它们在应用程序中的作用和使用场景也有很大的差异 1 概念和定义 事件 Event 事件是 系统内发生 的特定事情或状态变化的表示 在编程和软件设计中 事件通常被
  • Apache Flink(十五):Flink任务提交模式

    个人主页 IT贫道 大数据OLAP体系技术栈 Apache Doris Clickhouse 技术 CSDN博客 私聊博主 加入大数据技术讨论群聊 获取更多大数据资料 博主个人B栈地址 豹哥教你大数据的个人空间 豹哥教你大数据个人主页 哔哩
  • Kafka基础—3、Kafka 消费者API

    一 Kafka消费者API 1 消息消费 当我们谈论 Kafka 消费者 API 中的消息消费时 我们指的是消费者如何从 Kafka 主题中拉取消息 并对这些消息进行处理的过程 消费者是 Kafka 中的消息接收端 它从指定的主题中获取消息
  • 【分布式算法】Gossip协议详解

    一 为什么需要 Gossip 协议 为了实现 BASE 理论中的 最终一致性原则 两阶段提交协议和 Raft 算法需要满足 大多数服务节点正常运行 原则 如果希望系统在少数服务节点正常运行的情况下 仍能对外提供稳定服务 这时就需要实现最终一
  • Zookeeper 和 Dubbo 的关系?

    Zookeeper的作用 zookeeper用来注册服务和进行负载均衡 哪一个服务由哪一个机器来提供必需让调用者知道 简单来说就是ip地址和服务名称的对应关系 当然也可以通过硬编码的方式把这种对应关系在调用方业务代码中实现 但是如果提供服务
  • 使用 Helm Chart 部署分布式 GreptimeDB

    GreptimeDB 作为云时代基础设施的时序数据库 从第一天开始就积极拥抱云原生技术 将数据库部署在 Kubernetes 上可以提供可伸缩性 自愈能力和简化的部署和管理 从而为应用程序提供了强大的弹性和可靠性 Helm 是一个用于管理
  • 阿里技术官亲笔力作:Kafka限量笔记,一本书助你掌握Kafka的精髓

    前言 分布式 堪称程序员江湖中的一把利器 无论面试还是职场 皆是不可或缺的技能 而Kafka 这款分布式发布订阅消息队列的璀璨明珠 其魅力之强大 无与伦比 对于Kafka的奥秘 我们仍需继续探索 要论对Kafka的熟悉程度 恐怕阿里的大佬们
  • spark相关

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 pandas是什么 二 使用步骤 1 引入库 2 读入数据 总结 前言 提示 这里可以添加本文要记录的大概内容 例如 随着人工智能的不断发展 机器学习这门
  • 考虑极端天气线路脆弱性的配电网分布式电源配置优化模型【IEEE33节点】(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Matlab代码实现
  • Kafka速度之谜:高性能的幕后秘密大揭秘

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 kafka高性能的原因 Page Cache ZeroCopy 零拷贝 前言 Kafka的介绍 kafka是linkedIn开源的分布式消息系统 归给Ap
  • RabbitMQ环境配置

    文章目录 安装Erlang 安装RabbitMQ 安装Erlang 下载地址 http erlang org download otp win64 25 3 2 7 exe 安装RabbitMQ 下载地址 https www rabbitm

随机推荐

  • OC中的分类与类扩展

    在OC中 对于已有的类进行扩展 我们有两种方式 1 在原始类的定义中 进行代码扩展 2 通过继承的方式 扩展子类 3 使用分类的方式 第一 二种方式不用多说 第三种方式则是OC中比较有特色的功能 分类允许我们在不更改类的原始代码的情况下 实
  • 接口设计之幂等性设计

    幂等性设计 今天我们来聊聊接口的幂等性设计 所谓幂等 就是任意多次执行所产生的影响均与一次执行的影响相同 幂等性接口是指可以使用相同参数重复执行 并能获得相同结果的接口 这里就不展开数学中的定义了 有兴趣的可以自行google 为什么接口需
  • 关于mysql_free_result和mysql_close的解惑

    之前用mysql的时候一直是在用短链接 调用mysql store result获取一次数据之后就直接调用 以下是代码片段 mysql free result m result mysql close m Database 但是有两个问题
  • 找个好用的录屏软件,怎么这么难?

    真的要被录屏软件给搞疯了 本来公司说要给新人做个培训视频 想着把视频录屏一下 然后简单的剪辑一下就可以了 可谁知道录屏软件坑这么多 弄来弄去头都秃了 不过在头秃了几天之后 终于让我发现了一个值得 私藏 的录屏软件 咱就说这是什么神仙软件 把
  • 编码器测速,获取实际速度

    本例程中使用的电机为带霍尔编码器的减速电机 电机由三部分组成 减速器 电机以及霍尔编码器 霍尔编码器工作原理 霍尔编码器通过电磁转换 将机械的位移转化为脉冲信号 并且输出A B两相的方波信号 A B两相脉冲信号相位相差90 通过检测规定时间
  • Android Studio快捷键从Mac OS改为Win

    原理将Mac的Control映射为Command Command映射为Option Option映射为control 这样与win的快捷键按键习惯应该相同 未长时间测试
  • iOS App上传到苹果应用市场构建版本的图文教程

    使用hbuilderx的h5 或uniapp框架写的前端 进行云打包ios应用 会生成一个ipa后缀的应用文件 这个文件是没有办法像安卓应用那样直接安装在手机上面的 需要上架到苹果应用商店 用户才能下载安装使用 因此 我们这篇文章讲详细介绍
  • 5G基础信令

    一 4 5G高层协议规范框架对比 4G 5G 36 300 LTE整体 38 300 NR整体 36 401 E URTAN整体架构 38 401 NG RAN整体架构 36 321 LTE MAC 38 321 NR MAC 36 322
  • Dockerfile部署mysql并初始化

    文件目录结构 Dockerfile FROM centos 7 ADD jdk 8u261 linux x64 tar gz usr local ADD check mysql sh home datasong release bin CO
  • Gogs使用详解

    Gogs使用介绍 Gogs是一款类似Github 国内有码市 的开源文件 代码管理系统 基于Git 目前功能基本介绍 远程代码仓库管理 代码仓库权限分配 管理 团队管理 代码审查 1 注册 2 基本功能介绍 主面板说明 图中 表示自己个人账
  • 【测试入门】测试用例经典设计方法 —— 因果图法

    01 因果图设计测试用例的步骤 1 分析需求 阅读需求文档 如果User Case很复杂 尽量将它分解成若干个简单的部分 这样做的好处是 不必在一次处理过程中考虑所有的原因 没有固定的流程说明究竟分解到何种程度才算简单 需要测试人员根据自己
  • 直线检测方法—LSD论文翻译

    附原文链接 LSD a Line Segment Detector 摘 要 LSD是一个线段检测器 能够在线性时间内得到亚像素级精度的检测结果 它无需调试参数就可以适用于任何数字图像上 并且能够自我控制错误数量的检测 平均来说 一个图像中允
  • Bazel install Tips

    Bazel Fast Correct Choose two Build and test software of any size quickly and reliably Speed up your builds and tests Ba
  • Android接口一般定义格式,Android开发规范

    原标题 Android开发规范 一 书写规范 1 编码方式统一用UTF 8 2 花括号不要单独一行 和它前面的代码同一行 而且 花括号与前面的代码之间用一个空格隔开 3 空格的使用 if else for switch while等逻辑关键
  • 一个不错的关于CPU和GPU(CUDA)的性能比较讨论话题

    http topic csdn net u 20081027 23 67ff3857 3c71 4d5c acf6 095f3497c7a9 html这里是今天的一个论坛的一个帖子 大家可以讨论一下 1 那些程序适合用cpu来做 那些适合用
  • 【Transformer系列】深入浅出理解Tokenization分词技术

    一 参考资料 NLP技术中的Tokenization是什么 核心任务是什么 二 Tokenization相关介绍 1 Tokenization的概念 NLP技术中Tokenization被称作是 word segmentation 直译为分
  • 深入理解工具链-自己搭建STM32编程IDE

    目录 一 前言 二 编译器组成与编译流程 2 1 编译流程概述 2 2 Gcc For Arm编译器 2 3 预编译 2 4 编译 2 5 汇编 2 6 链接 2 7 生成HEX镜像 2 8 通过Makefile编译代码 三 调试流程 3
  • 解决在使用 Ant Design Vue组件库更改Modal对话框样式使用深度作用选择符不生效的问题

    项目场景 在使用 Ant Design Vue组件库更改Modal对话框样式使用深度作用选择符不生效 问题描述 Modal对话框官方样式如下图 现在要修改为下图所示样式 使用深度作用选择符样式没有生效 原因分析 因为modal是直接插入到b
  • 【SQL】10 SQL UPDATE 语句

    UPDATE 语句用于更新表中的记录 SQL UPDATE 语句 UPDATE 语句用于更新表中已存在的记录 SQL UPDATE 语法 UPDATE table name SET column1 value1 column2 value2
  • Flink消费kafka出现空指针异常

    文章目录 出现场景 表现 问题 解决 tombstone Kafka中提供了一个墓碑消息 tombstone 的概念 如果一条消息的key不为null 但是其value为null 那么此消息就是墓碑消息 出现场景 双流join时 采用的是l