Kafka常见的导致重复消费原因和解决方案

2023-11-16

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

问题分析

导致kafka的重复消费问题原因在于,已经消费了数据,但是offset没来得及提交(比如Kafka没有或者不知道该数据已经被消费)。
总结以下场景导致Kakfa重复消费:

  • 原因1:强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)。

  • 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。

例如:

try {    consumer.unsubscribe();} catch (Exception e) {}
try {    consumer.close();} catch (Exception e) {}

上面代码会导致部分offset没提交,下次启动时会重复消费。

解决方法:设置offset自动提交为false

整合了Spring配置的修改如下配置
spring配置:

spring.kafka.consumer.enable-auto-commit=falsespring.kafka.consumer.auto-offset-reset=latest

整合了API方式的修改enable.auto.commit为false
API配置:

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "false");

一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。

  • 原因3:(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。

  • 原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。

  • 原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。

  • 原因6:并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费

问题描述:
我们系统压测过程中出现下面问题:异常rebalance,而且平均间隔3到5分钟就会触发rebalance,分析日志发现比较严重。错误日志如下:

08-09 11:01:11 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]        at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

这个错误的意思是,消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是由于当前消费者线程消费的分区已经被broker给回收了,因为kafka认为这个消费者死了,那么为什么呢?

问题分析:

这里就涉及到问题是消费者在创建时会有一个属性max.poll.interval.ms(默认间隔时间为300s),

该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

处理重复数据

因为offset此时已经不准确,生产环境不能直接去修改offset偏移量。

所以重新指定了一个消费组(group.id=order_consumer_group),然后指定auto-offset-reset=latest这样我就只需要重启我的服务了,而不需要动kafka和zookeeper了!

#consumerspring.kafka.consumer.group-id=order_consumer_groupspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.enable-auto-commit=falsespring.kafka.consumer.auto-offset-reset=latest

注:如果你想要消费者从头开始消费某个topic的全量数据,可以重新指定一个全新的group.id=new_group,然后指定auto-offset-reset=earliest即可。

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka常见的导致重复消费原因和解决方案 的相关文章

随机推荐

  • 国密SKF库调用开发指南(一)

    针对支持国密算法USB KEY设备的应用 国家颁布一个行业标准 智能密码钥匙应用接口规范 GM T0016 2012 市面上销售的国密算法的USB KEY设备也都 其实也是必须 支持这个接口规范 因此 只要根据这个规范开发的应用程序 就可以
  • 李开复:数位革命——创新创业的黄金时代

    李开复 数位革命 创新创业的黄金时代 创新工场观点 数位革命 今天的创业时代比任何一个历史时期都能带来更多的机会 何谓 数位革命 看创新工场创始人 董事长兼首席执行官李开复老师如何从我们身处这个时代的六个现象中 看出变革 看出机会 未来是年
  • Vue脚手架的使用

    1 全局安装 vue cli 仅第一次执行 npm install g vue cli 再次输入vue 如果出现版本号 则成功 2 切换到要创建项目的目录 使用命令创建项目 vue create xxxx 3 启动项目 npm run se
  • Egret的2D摄像机实现

    一个Egret的正交摄像机的简单实现 主要功能大致如下 可参考根据实际进行调整 module Camera 一个正交摄像机demo export class Camera 摄像机显示的对象 实际是镜头所对应的世界 m container e
  • OpenCV机器视觉-图片卷积

    图片卷积 图像滤波是尽量保留图像细节特征的条件下对目标图像的噪声进行抑制 是图像预处理中不可缺少的操作 其处理效果的好坏将直接影响到后续图像处理和分析的有效性和可靠性 线性滤波是图像处理最基本的方法 它允许我们对图像进行处理 产生很多不同的
  • 【微信小程序入门到精通】— button 和 image 超详细讲解

    目录 前言 一 button 组件 1 1 指定按钮 属性值 type 1 2 改变按钮尺寸 属性值 size 1 3 使按钮镂空 属性值 plain 1 4 补充 二 image 组件 2 1 构建图片控件 image 2 2 指定图片缩
  • python数据容器--列表的常用操作

    数据容器List 列表的常用操作 List列表的常用操作 1 查找某元素在列表内的下标索引 列表 index 元素 mylist itcast itheima python index mylist index itcast print i
  • 指针的删除

    1 在链表中 将某个指针delete 指向该指针的那个指针的next 不会自动赋值为NULL 需要手动赋值 2 删掉 某指针所指向的内存 该指针仍然可以使用 下面是一个带头指针的单向链表 void Stack Pop int value i
  • 每日一考错题整理

    1 JDK JRE JVM三者之间的关系 以及JDK JRE包含的主要结构有哪些 JDK JRE Java开发工具 javac exe java exe javadoc exe JRE JVM JAVASE核心类库 2 标识符的命名规范有哪
  • JDK8新特性之双冒号 :: 用法及详解

    JDK8新特性之双冒号 用法及详解 转自 https cloud tencent com developer article 1404786 JDK8的新特性有很多 最亮眼的当属函数式编程的语法糖 本文主要讲解下双冒号 的用法 类名 方法名
  • ubuntu:android studio 安装adb调试工具

    adb安装 apt get install android tools adb 远程连接 adb connect 172 26 0 119 5555 遇到端口占用 yangwenlong title71 Android AndroidPro
  • 微服务全栈:深入核心组件与开发技巧

    文章目录 1 服务注册与发现 1 1 客户端注册 ZooKeeper 1 2 第三方注册 独立的服务Registrar 1 3 客户端发现 1 4 服务端发现 1 5 Consul 1 6 Eureka 1 7 SmartStack 1 8
  • firefox 火狐浏览器安装java插件

    由于工作中用到决策引擎产品 FICO Blaze 该产品展示决策流 决策树 决策表等组件是依托的applet 需要浏览器启用java插件 经常碰到明明电脑上装了java 但是浏览器的附加组件中却没有显示 经过一下午的折腾发现了以下几条限制
  • SpringCloud系列教程(1)--开发环境的准备

    开发环境准备 eclipse apache maven 3 5 0 jdk1 8 说明 这个是本人的开发环境工具 也可以使用自己适应的环境 比如 IntelliJ IDEA 但是本系列以eclipse来简述 如果环境不会配置 请自行百度 因
  • Android Studio TraceView性能优化分析

    http blog csdn net androiddevelop article details 8223805 http www cnblogs com sunzn p 3192231 html Android 编程下的 TraceVi
  • ConstraintLayout各种居中设置

    1 全局居中 app layout constraintBottom toBottomOf parent app layout constraintEnd toEndOf parent app layout constraintStart
  • Python基于xlrd模块处理合并单元格

    Excel是我们日常工作中经常使用的电子表格软件 它可以方便地对数据进行整理 计算和分析 在Excel中 有时候需要将多个单元格合并成一个单元格 以便更好地展示数据 但是 在数据处理过程中 合并单元格也会带来不少麻烦 本文将介绍如何使用Py
  • SpringBoot实战(八)集成 Logback

    目录 1 简介 2 项目结构 3 配置文件 3 1 Maven 3 2 logback spring xml 3 3 application yml 4 自定义输出级别 5 项目地址 6 部分内容没有输出到日志文件中问题处理 7 根据开发
  • oauth2.0--基础--6.1--SSO的实现原理

    oauth3 0 基础 6 1 SSO的实现原理 1 什么是SSO 1 1 概念 在一个 多系统共存 的环境下 用户在一处登录后 就不用在其他系统中登录 就可以访问其他系统的资源 用户环境 浏览器 只能同一个浏览器 不会出现A浏览器登录成功
  • Kafka常见的导致重复消费原因和解决方案

    点击上方蓝色字体 选择 设为星标 回复 资源 获取更多资源 大数据技术与架构 点击右侧关注 大数据开发领域最强公众号 暴走大数据 点击右侧关注 暴走大数据 问题分析 导致kafka的重复消费问题原因在于 已经消费了数据 但是offset没来