知根知底:Flink-KafkaConsumer 详解

2023-05-16

Flink-Kafka  Connector 是连接kafka 的连接器,负责对接kafka 的读写, 本篇主要介绍kafka consumer 的执行流程与核心设计。

逻辑执行流程

e20a4db339d48da06ac5d9a5b19b0bb4.png

  1. 分配当前task消费的partition与起始的offset : 根据从状态中恢复的数据与客户端指定的消费模式, 采取的方式是状态中offset优先, 即从状态中能够找到对应的offset 就使用该offset , 否则就根据客户端指定的方式

  2. 从kafka 中不断拉取数据, 发送到下游,并且保存当前的offset

  3. 为了保证整个任务的全局一致性,需要将offset 提交到状态中

  4. 如果开启了分区发现模式,那么需要将检测到新的分区添加到消费线程中。

两个重要接口

Flink 保证全局数据一致性是通过全局状态快照checkpoint 完成的, 也就是周期性的执行checkpoint 将当前的任务状态保存起来, Flink 在整个checkpoint 的执行过程中提供了两个接口,方便用户去做一些自定义的操作, 例如操作状态、两阶段提交实现等等。

CheckpointedFunction接口

提供了initializeState方法与snapshotState方法,initializeState方法是在任务初始化时候执行,常见的就是获取的checkpoint 中的状态数据;snapshotState方法是在每次checkpoint触发都会执行,常见的就是将数据存放在状态对象中,以便能够被持久化。

CheckpointListener接口

提供了notifyCheckpointComplete方法与notifyCheckpointAborted方法,这两个方法都是在一次checkpoint 完成之后执行,那么有可能是通知成功回调(notifyCheckpointComplete)也有可能失败回调(notifyCheckpointAborted)。


具体实现

对于Flink 来说source端的标准对接接口是SourceFunction ,主要实现其run方法,在run 中执行数据的pull操作;另外为了保证整个状态的一致性,在checkpoint时需要记录checkpoint时的offset, 并且保证其失败重启时也能够从checkpoint 记录的offset开始消费, 因此同时实现了CheckpointedFunction接口与CheckpointListener接口,这两个接口提供了可操作状态的一些方法。

FlinkKafkaConsumerBase 实现SourceFunction、CheckpointedFunction、CheckpointListener接口的抽象类,包含了整个流程的核心方法,如下:

initializeState

从checkpoint 中 恢复最近一次或者是指定批次checkpoint 中offset,  并将其存放在TreeMap<KafkaTopicPartition,Long> 结的 restoredState 对象中

open

主要作用就是分配当前task消费的partitioin 的offset 位置

1.  partition 分配策略:姑且认为是当前task的下标与 partition%numTask 相等就分配给当前task

2.  offset 分配策略:有状态数据就使用状态数据的offset ; 没有就根据客户端指定的StartupMode作为消费起点

run

开始消费kafka 中数据, 通过 KafkaFetcher 完成 : 

1.  启动了一个消费线程 KafkaConsumerThread 从kakfa 中拉取数据,将其存储到 Handover 的next 对象中

2.  循环从Handover 的next 中获取数据

3.  记录下当前的offset, 更新到subscribedPartitionStates 中去 

createAndStartDiscoveryLoop

在run 方法中被调用, 开启了异步分区发现的线程discoveryLoopThread,会按照指定的时间间隔检查是否有新的分区(默认情况下不开启),  当发现有新的分区时会将其添加到unassignedPartitionsQueue中, 以便被KafkaConsumerThread 线程检测到

snapshotState

将记录的subscribedPartitionStates 中消费进度数据写入到 unionOffsetStates 状态中与临时对象pendingOffsetsToCommit中

notifyCheckpointComplete

提交offset 至kafka中:将pendingOffsetsToCommit 中记录当前批次checkpoint 的offset 数据提交到kafka 中

核心流程

b9a35248b0a56843549eedd8aa64680b.png

  1. KakfaConsumerThread 线程不断从Kafka 中消费数据

  2. 消费的数据存储handover 中

  3. kafkaFetch 不断从handover 获取数据进行处理

其他流程  

  1. initializeState、snapshotState 这两个方法是实现了CheckpointedFunction接口里面的对应方法,CheckpointedFunction 接口是Flink 提供的两个hook, 任务初始化执行initializeState,用于从状态中恢复数据, 优于open先执行, 用于其恢复offset数据;snapshotState 每次触发checkpoint 时执行,提供用户操作hook, 用于将offset 数据保存在状态中。

  2. notifyCheckpointComplete 是实现了CheckpointListener 接口中的方法, checkpoint 完成之后的回调方法, 提交状态中的offset数据至kafka中。

offset 提交

对于整个offset的提交至kafka中, 类似于两阶段的提交过程:

  • 第一阶段:执行checkpoint 时即调用snapshotState方法,  offset 保存到状态中

  • 第二阶段:checkpoint 执行完成时回调notifyCheckpointComplete方法,offset 提交到kafka中

对于第一阶段失败任务直接重启,从最近一次checkpoint记录的位点开始消费,对于第二阶段提交offset至kafka如果失败,并不会导致任务重启,只是做了日志记录,因为提交offset到kafka成功与否并不会影响任务的执行。

启动时offset指定

  • 如果是从checkpoint 恢复,那么就会忽略客户端所指定的startMode , 也就是checkpoint 状态数据优先

总结

本篇主要介绍了FlinkKafkaConsumer的核心设计流程与实现,同时介绍了与checkpoint流程结合完成offset的管理。

历史推荐

AliExpress 基于Flink的实时数仓建设

Flink 程序设计之道

数仓指标一致性

关于Event-Time 所带来的的问题

不得不掌握的三种BitMap

66ec26acfca6cc99a453cd8cc3b835e9.png

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

知根知底:Flink-KafkaConsumer 详解 的相关文章

随机推荐

  • 通过设置JDK解决存在多个Gradle后台进程的问题

    使用Android Studio经常会在Event Log窗口遇到如下报错 xff1a 简单解释下就是如果后台有一个常驻的gradle守护进程 xff0c 可以提高我们构建效率 因为这样不但可以避免每次都重新启动JVM xff0c 并且可以
  • intellij idea关闭vim模式

    tools gt vim emulator
  • 最大似然估计和最小二乘估计的区别与联系

    看似最小二乘估计与最大似然估计在推导得到的结果很相似 xff0c 但是其前提条件必须引起大家的注意 xff01 xff01 xff01 对于最小二乘估计 xff0c 最合理的参数估计量应该使得模型能最好地拟合样本数据 xff0c 也就是估计
  • 关于Web API 2.0中的Options请求返回405的问题

    关于Web API 2 0中的Options请求返回405的问题 前提 xff1a IIS寄宿的网站 当你向服务器发送非简单请求时 xff0c 客户端会先发送一条预检请求 xff0c 借以确认当前请求源和待请求方法是否被网站允许 xff08
  • 得到指针指向的数组的长度

    1 定义数组 xff0c 要给定其长度 xff0c 也可以用Type a 61 的方式 在对数组进行操作时 xff0c 可能需要计算数组长度 xff0c 方法是 xff1a sizeof 数组名 sizeof 元素类型 数组int a 4
  • Redis的应用--分布式锁

    Redis 文章目录 Redis应用分布式锁解决方案 应用 分布式锁 互斥死锁容错 解决方案 正常使用redis的nx数据 xff0c 下面的语句 xff0c key使用对应的前缀 43 主键 xff0c value使用一个随机值UUID
  • Golang调用FFmpeg转换视频流

    问题背景 问题背景是在 xff0c 由于视频采集端使用的是H264编码采集的裸流 xff0c 而网络流媒体大多是以FLV为主的直播方式进行的 xff0c 为了实现实时直播 xff0c 当前是打算直接使用FFmpeg将H264裸流实时转成FL
  • 【Deepin Debian 系统安装RPD远程桌面工具Remmina】

    Remmina 是一款在 Linux 和其他类 Unix 系统下的自由开源 功能丰富 强大的远程桌面客户端 xff0c 它用 GTK 43 3 编写而成 它适用于那些需要远程访问及使用许多计算机的系统管理员和在外出行人员 它以简单 统一 同
  • Linux部署yapi

    一 安装node 1 获取资源node资源 8 x版本 curl sL https rpm nodesource com setup 8 x bash 耐心等待 2 安装nodejs yum install y nodejs 3 查看nod
  • vim环境设定:~/.vimrc(语法高亮等一些的设置)

    Centos里的VI只默认安装了vim minimal xff0d 7 x 所以无论是输入vi或者vim查看文件 xff0c syntax功能都无法正常启用 因此需要用yum安装另外两个组件 xff1a vim common 7 x和vim
  • HTML中meta标签都有什么作用?

    一直以来 xff0c 对HTML中的meta标签一知半解 xff0c 这次抽时间好好总结一下 meta标签 定义 xff1a meta元素提供有关页面的元信息 meta information 比如针对 搜索引擎和更新频度的描述和关键词 搜
  • 自增运算符的用法

    a 43 43 和 43 43 a 都属于自增运算符 a 43 43 是先进行取值 xff0c 后进行自增 43 43 a是先进行自增 xff0c 后进行取值
  • 使用七牛云CDN加速并绑定阿里云域名详细教程

    昨天晚上在某个群里看到群友问 xff0c 七牛云能不能绑定自己的域名作为静态资源文件的前缀 xff0c 忽然想起来我已经有快两年时间没有登录过我的七牛云账号了 xff0c 不禁老脸一红 xff0c 这是有多久没有自己前后端都弄了 xff0c
  • 一些中间件的思维导图

    一些中间件的思维导图 文章目录 一些中间件的思维导图写在前面Redis博客连接Redis的应用 分布式锁Redis的基础Redis的生产问题 缓存雪崩 缓存穿透 双写一致性 并发竞争Redis的cluster集群Redis的replicat
  • VS2012 下配置gsl-1.8库

    GSL的安装配置如下 xff1a 1 下载安装 从http gnuwin32 sourceforge net packages gsl htm 下载安装gsl 1 8 exe和gsl 1 8 src exe两个exe文件 下载好后如下图 x
  • 一种初始化结构体数组的方法

    typedef struct int requestNumber void dispatchFunction Parcel amp p struct RequestInfo pRI int responseFunction Parcel a
  • OpenCV学习笔记——新版本的数据结构core

    2 0新版本对数据结构进行了大幅修改 xff1a 定义了DataType 类 定义了Point 模板类 xff0c 取代了之前版本的CvPoint CvPoint2D32f Point 类不用多言 xff0c 里面两个成员变量x xff0c
  • Java面向对象中类与对象的概念和使用(一)

    方法创建与重载 1 方法就是一段可以重复调用的代码段 2 定义格式 xff1a 访问修饰符 返回值类型 方法名 1 方法重载 xff1a 方法名称相同 xff0c 但是参数的类型和个数不同 xff0c 通过传递参数的个数和类型不同来完成不同
  • CCF-CSP考试介绍以及复习技巧指导

    CCF CSP考试时间及费用 时间一般是每年3 9 12月的中旬 xff0c 报名时间一般也是提前一个月 xff0c 不固定 非计算机协会会员300元 次 xff0c 会员180元 次 xff08 学生会员需缴纳50元 年的会费 xff09
  • 知根知底:Flink-KafkaConsumer 详解

    Flink Kafka Connector 是连接kafka 的连接器 xff0c 负责对接kafka 的读写 xff0c 本篇主要介绍kafka consumer 的执行流程与核心设计 逻辑执行流程 分配当前task消费的partitio