kafka-offset手动提交和自动提交

2023-11-01

目录

首先回顾之前的知识点

 自动提交offset

 手动提交

消费者poll消息的细节

 完整代码:

 按照新方法进行消费消息

1.指定时间进行消息的消费

 2.指定分区开始从头消费+指定分区的偏移量开始消费

新消费组的消费offset规则


 

首先回顾之前的知识点

消费者消费消息,每消费offset+1,然后提交offset给到我们kafka中topic中的cousumer_offsets,该消费者宕机后,另外的消费者就会读取consumer_offsets读取我们的offset消费后面的消息

我们kafka消费者是自动拉取消息的,mq是队列push给消费者

自动提交:消息poll下来后(还没有消费)直接提交offset,速度很快,可能出现消费失败

手动提交:在消息消费时/消费后再提交offset

 自动提交offset

缺点:可能会丢消息,比如消费者poll了topic中partition的消息后,然后提交offset,可能消费者没有消费成功

提交的内容offset——>消费组+topic+offset

自动提交的配置

   /**
         * 1.1设置是否自动提交offset并设置offset的间隔时间
         */
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        

 

一poll就提交offset了 

 手动提交

分为手动同步提交+手动异步提交

手动同步提交:消息消费完后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示成功

  consumer.commitAsync();

手动异步提交:不需要等集群返回ack,直接执行后序的逻辑即可,我们可以设置一个回调方法

消费者poll消息的细节

定义:消费者会根据设置的消费时间来决定消费多少消息

properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500)//拉取0.5s消息

 默认消费者一次性poll500条信息(长轮询时间为1s),如果时间内poll了500条就结束for循环

 //长轮询拉取时间,1s:消费者拉取1s时间不管拉了多少条消息(除非时间内拉取完了zk维护的topic分区中所有消息)
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

 完整代码:

 while(true){
            //长轮询拉取时间,1s:消费者拉取1s时间不管拉了多少条消息(除非时间内拉取完了zk维护的topic分区中所有消息)
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("收到的消息:partition= %d,offset= %d,key= %s,value=%s %n",record.partition(),
                        record.offset(),record.key(),record.value());
            }

            /**
             * 4.1手动提交:所有消息消费完再提交offset给broker中_consumer_offsets
             */
            if(records.count()>0){
                //同步:阻塞,提交成功,等待broker的返回ack
               consumer.commitAsync();

                //异步:提交完后不需要等待broker返回ack,直接往下走

            }
        }

 如果两次poll的间隔>30s,集群会认为该消费者消费能力弱将其踢出,触发rebalance机制,消息交给消费组中的其他消费者

  properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

 按照新方法进行消费消息

1.指定时间进行消息的消费

1.根据时间将topic中partition分区信息全部放入map中——>2.然后指定时间,封装topic和分区与时间到map中——>3.最后再将map添加到更高级的map,key为分区,如果有两个分区就是2个map——>4.最后遍历,然后提取出value并得到offset打印

 2.指定分区开始从头消费+指定分区的偏移量开始消费

        //指定分区消费
        consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));

        /**
         * 4.回溯消费消息(指定某分区从头开始消费)
         */
        consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
        consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));

        /**
         * 4.1指定offset开始消费
         */
        consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
        consumer.seek(new TopicPartition(TOPIC_NAME,0),10);

新消费组的消费offset规则

新消费组在启动后,默认是从当前分区最后一条消息的offset+1开始消费,可以通过配置进行重新消费

  /**
         * 2.13设置下次换了消费组还是按照offset记录继续消费
         */
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"earliest");

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka-offset手动提交和自动提交 的相关文章

  • 如何为最终用户方便地启动Java GUI程序

    用户想要从以下位置启动 Java GUI 应用程序Windows 以及一些额外的 JVM 参数 例如 javaw Djava util logging config file logging properties jar MyGUI jar
  • 如何在 Play java 中创建数据库线程池并使用该池进行数据库查询

    我目前正在使用 play java 并使用默认线程池进行数据库查询 但了解使用数据库线程池进行数据库查询可以使我的系统更加高效 目前我的代码是 import play libs Akka import scala concurrent Ex
  • 在画布上绘图

    我正在编写一个 Android 应用程序 它可以在视图的 onDraw 事件上直接绘制到画布上 我正在绘制一些涉及单独绘制每个像素的东西 为此我使用类似的东西 for int x 0 x lt xMax x for int y 0 y lt
  • 给定两个 SSH2 密钥,我如何检查它们是否属于 Java 中的同一密钥对?

    我正在尝试找到一种方法来验证两个 SSH2 密钥 一个私有密钥和一个公共密钥 是否属于同一密钥对 我用过JSch http www jcraft com jsch 用于加载和解析私钥 更新 可以显示如何从私钥 SSH2 RSA 重新生成公钥
  • 使用 Android 发送 HTTP Post 请求

    我一直在尝试从 SO 和其他网站上的大量示例中学习 但我无法弄清楚为什么我编写的示例不起作用 我正在构建一个小型概念验证应用程序 它可以识别语音并将其 文本 作为 POST 请求发送到 node js 服务器 我已确认语音识别有效 并且服务
  • 无法展开 RemoteViews - 错误通知

    最近 我收到越来越多的用户收到 RemoteServiceException 错误的报告 我每次给出的堆栈跟踪如下 android app RemoteServiceException Bad notification posted fro
  • Android MediaExtractor seek() 对 MP3 音频文件的准确性

    我在使用 Android 时无法在eek 上获得合理的准确度MediaExtractor 对于某些文件 例如this one http www archive org download emma solo librivox emma 01
  • 加速代码 - 3D 数组

    我正在尝试提高我编写的一些代码的速度 我想知道从 3d 整数数组访问数据的效率如何 我有一个数组 int cube new int 10 10 10 我用价值观填充其中 然后我访问这些值数千次 我想知道 由于理论上所有 3d 数组都存储在内
  • Spring Data JPA 应用排序、分页以及 where 子句

    我目前正在使用 Spring JPA 并利用此处所述的排序和分页 如何通过Spring data JPA通过排序和可分页查询数据 https stackoverflow com questions 10527124 how to query
  • 如何在PreferenceActivity中添加工具栏

    我已经使用首选项创建了应用程序设置 但我注意到 我的 PreferenceActivity 中没有工具栏 如何将工具栏添加到我的 PreferenceActivity 中 My code 我的 pref xml
  • 从 127.0.0.1 到 2130706433,然后再返回

    使用标准 Java 库 从 IPV4 地址的点分字符串表示形式获取的最快方法是什么 127 0 0 1 到等效的整数表示 2130706433 相应地 反转所述操作的最快方法是什么 从整数开始2130706433到字符串表示形式 127 0
  • getResourceAsStream() 可以找到 jar 文件之外的文件吗?

    我正在开发一个应用程序 该应用程序使用一个加载配置文件的库 InputStream in getClass getResourceAsStream resource 然后我的应用程序打包在一个 jar文件 如果resource是在里面 ja
  • 总是使用 Final?

    我读过 将某些东西做成最终的 然后在循环中使用它会带来更好的性能 但这对一切都有好处吗 我有很多地方没有循环 但我将 Final 添加到局部变量中 它会使速度变慢还是仍然很好 还有一些地方我有一个全局变量final 例如android Pa
  • 如何在控制器、服务和存储库模式中使用 DTO

    我正在遵循控制器 服务和存储库模式 我只是想知道 DTO 在哪里出现 控制器应该只接收 DTO 吗 我的理解是您不希望外界了解底层域模型 从领域模型到 DTO 的转换应该发生在控制器层还是服务层 在今天使用 Spring MVC 和交互式
  • 如何从终端运行处理应用程序

    我目前正在使用加工 http processing org对于一个小项目 但是我不喜欢它附带的文本编辑器 我使用 vim 编写所有代码 我找到了 pde 文件的位置 并且我一直在从 vim 中编辑它们 然后重新打开它们并运行它们 重新加载脚
  • 声明的包“”与预期的包不匹配

    我可以编译并运行我的代码 但 VSCode 中始终显示错误 早些时候有一个弹出窗口 我不记得是什么了 我点击了 全局应用 从那以后一直是这样 Output is there but so is the error The declared
  • Firebase 添加新节点

    如何将这些节点放入用户节点中 并创建另一个节点来存储帖子 我的数据库参考 databaseReference child user getUid setValue userInformations 您需要使用以下代码 databaseRef
  • 当我从 Netbeans 创建 Derby 数据库时,它存储在哪里?

    当我从 netbeans 创建 Derby 数据库时 它存储在哪里 如何将它与项目的其余部分合并到一个文件夹中 右键单击Databases gt JavaDB in the Service查看并选择Properties This will
  • java.lang.IllegalStateException:驱动程序可执行文件的路径必须由 webdriver.chrome.driver 系统属性设置 - Similiar 不回答

    尝试学习 Selenium 我打开了类似的问题 但似乎没有任何帮助 我的代码 package seleniumPractice import org openqa selenium WebDriver import org openqa s
  • 节拍匹配算法

    我最近开始尝试创建一个移动应用程序 iOS Android 它将自动击败比赛 http en wikipedia org wiki Beatmatching http en wikipedia org wiki Beatmatching 两

随机推荐

  • 使用纯C语言定义通用型数据结构的方法和示例

    文章目录 前言 以实现优先队列来描述实现思想 基本类型的包装类型 比较函数 演示 总结 前言 最近一段时间在复习数据结构和算法 用的C语言 不得不说 不学个高级语言再回头看C语言根本不知道C语言的强大和完美 不过相比之下也有许多不便利的地方
  • 历时30个小时 更新到了25905.1000 版本 23H2

  • 【Vue3】之vuex的安装与配置

    安装 yarn add vuex 4 或 npm install save vuex 4 创建 新建store js store js import createStore from vuex export default createSt
  • Pyinstaller 使用说明

    安装 cmd pip install pyinstaller 也可以自己下载安装包 解压后通过执行python setup py install 使用 pyinstaller F myPython py 或者用python pyinstal
  • 用IDEA创建第一个SpringBoot程序,并开发一个JSON接口

    1 打开idea主界面选择 Create New Project 2 在弹出的页面中我们选择左侧的 Spring Initializr jdk版本选择自己安装的版本 PS jdk版本要1 8以上哦 3 下一个页面 在Group栏输入组织名
  • IDEA代码覆盖率测试

    代码覆盖率测试 1 使用idea自带的代码覆盖率工具 1 创建test文档 右击将 test 目录设置为测试文档 2 选中需要测试的类 按Ctrl shift T 创建测试类 并选中要测试的方法 在测试案例中 编写测试代码 点击Edit C
  • 小程序分包实现

    目录 一 使用场景 二 操作方式 1 建立分包文件夹 2 文件构建 3 文件配置 三 总结 一 使用场景 微小程序分包常用于代码量较大的小程序 发布时会受到大小限制 二 操作方式 1 建立分包文件夹 在项目根目录下创建分包文件夹 此处我创建
  • L1-8 乘法口诀数列

    本题要求你从任意给定的两个 1 位数字 a1 和 a2 开始 用乘法口诀生成一个数列 an 规则为从 a1 开始顺次进行 每次将当前数字与后面一个数字相乘 将结果贴在数列末尾 如果结果不是 1 位数 则其每一位都应成为数列的一项 输入格式
  • ad电阻原理图_光敏电阻的基础知识介绍

    39G电子技术 电路 电子元件等 全套资料免费领 干货下载 十天学会单片机完整版 100个实例 PPT 点击上方红字 即可获取 一 光敏电阻 光敏电阻是用硫化隔或硒化隔等半导体材料制成的特殊电阻器 表面还涂有防潮树脂 具有光电导效应 二 特
  • TCP 拥塞窗口原理

    学过网络相关课程的 都知道TCP中 有两个窗口 滑动窗口 在我们的上一篇文章中有讲 接收方通过通告发送方自己的可以接受缓冲区大小 这个字段越大说明网络吞吐量越高 从而控制发送方的发送速度 拥塞窗口 也就是本文要讲的 概念 一个连接的TCP双
  • element-plus elplus el-tree三种图标自定义 并且点击图标展开收起 点击文字获取数据

    前言 公司需求 需要实现如下样式的树形列表 基于vue3 element plus 当节点展开时 显示展开的文件夹图标 当节点收起时显示收起的文件夹 最后一级显示文件样式 废话没有了 代码如下
  • C规范编辑笔记(九)

    往期文章 C规范编辑笔记 一 C规范编辑笔记 二 C规范编辑笔记 三 C规范编辑笔记 四 C规范编辑笔记 五 C规范编辑笔记 六 C规范编辑笔记 七 C规范编辑笔记 八 正文 今天我们来分享一下C规范编辑笔记第九篇 话不多说 我们直接来看
  • 树莓派数据远程传输学习记录——TCP/IP协议连接OneNet云平台传输数据的方法

    目录 项目场景 问题描述 解决方案 OneNet云平台前期项目搭建准备 以网络调试助手模拟树莓派建立连接并发送数据 树莓派与OneNet云平台进行对接 最后总结 项目场景 本人在进行树莓派项目开发时进行数据远程传输 4G WiFi通信 过程
  • Spark 3.0.3 源码阅读及 idea 调试环境搭建

    目录 1 源码下载 2 源码解压并编译 3 使用 Idea 打开或导入 4 idea 调试环境设置 Master 设置 Worker 设置 1 源码下载 Downloads Apache Spark 2 源码解压并编译 编译前建议在环境变量
  • ingress 400 Bad Request The plain HTTP request was sent to HTTPS port

    问题现象 访问时返回400 Bad Request 并提示The plain HTTP request was sent to HTTPS port 问题原因 Ingress Controller到后端Pod请求使用了默认的HTTP请求 但
  • 效果:网页页面随机改变颜色+自定义样式背景颜色随机改变+20秒倒计时+时间一到马上跳转新页面

  • linux 安装flash

    2 將下载好的包拷到某个目录下并解压得到文件 得到如下libflashplayer so文件与usr文件夹 3 将libflashplayer so拷到firefox的插件目录 usr lib mozilla plugin 下 sudo c
  • 个人总结-基础算法

    文章目录 基础算法 各个算法的复杂度及稳定性等 冒泡排序 蛮力法 理解 函数代码 测试用例 选择排序 蛮力法 理解 函数代码 测试用例 归并排序 分治法 理解 函数代码 测试用例 快速排序 分治法 理解 函数代码 测试用例 插入排序 减治法
  • GitHub、GIT、Intellij集成github初探

    一 什么是Git 刚接触Git或github的童鞋可能会把它们的概念搞混淆 所以在这里稍微解释一下 Git和github是两个完全不同的概念 Git是一个版本管理系统 Version Control System 简称 VCS 早期版本管理
  • kafka-offset手动提交和自动提交

    目录 首先回顾之前的知识点 自动提交offset 手动提交 消费者poll消息的细节 完整代码 按照新方法进行消费消息 1 指定时间进行消息的消费 2 指定分区开始从头消费 指定分区的偏移量开始消费 新消费组的消费offset规则 首先回顾