【MQ】kafka(四)——kafka消费者如何消费的?如何防止重复消费?如何顺序消费?

2023-11-01

一、前言

前面博客小编向大家分享了 kafka如何保证消息不丢失?,基本是从producer和broker来分析的,producer要支持重试和acks,producer要做好副本和及时刷盘落地。

这篇博客呢,就跟大家一起聊一下 kafka 消费者如何消费的?如何避免重复消费?

二、消费者消费流程

消费流程:

  1. 从zk获取要消费的partition 的leader的位置 以及 offset位置
  2. 拉数据,这里拉数据是直接从broker的pagecash拉取,零拷贝 ,所以很快。
  3. 如果pagecash数据不全,就会从磁盘中拉取,并发送
  4. 消费完成后,可以手动提交offset,也可以自动提交offset。
    在这里插入图片描述

消费策略有哪些?如何配置

一般我们消费测试是不会变的,都使用默认的,也就是第一种,range策略。

  • Range 范围分配策略(默认)

默认策略,保证基本是均衡的。
计算公式 :
n = 分区数/消费者数
m = 分区数%消费者数
前m个消费者,消费n+1个,剩余的消费n个
在这里插入图片描述
在这里插入图片描述
eg:12个partition,9个消费者
12/9 = 1
12%9 = 3
前3台 消费2个partition,后6台各消费1个partition。

  • RoundRobin 轮询

先根据topic 和 topic的partition的hashcode进行一个排序,然后以轮询的方式分配给各个消费者。

在这里插入图片描述

  • stricky粘性分配策略

在没有reblence的时候和轮询策略一样
当发生rebalence的时候,尽可能的保证与上一次分配一致

比如默认是
在这里插入图片描述
比如consumer2 挂了,topicA p1 和topicB p2就没有消费者了,这个时候要进行消费组的rebalence。
在这里插入图片描述

然后按照轮询策略分配一下。
在这里插入图片描述

可以在配置消费配置的时候,指定消费策略:

//Range
propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.RangeAssignor.class);

//RoundRobin
propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.RoundRobinAssignor.class);

//stricky
propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.StickyAssignor.class);

什么是零拷贝?

普通把文件发送到远程服务器的方法:
在这里插入图片描述
1.读磁盘内容,拷贝到内核缓冲区
2.cpu把内核缓冲区数据拷贝到用户空间缓冲区
3.调用write(),把用户空间缓冲区数据拷贝到内核的Socket Buffer中
4.把sb中的数据拷贝到网卡缓冲区 NIC Buffer ,网卡在传输

从上面的流程看, 1和3 其实是多余的,用户和内核相互转换,会带来cpu上下文切换,对cpu性能有影响。

零拷贝 就是对这两次的拷贝忽略掉,应用程序可以直接把磁盘中的数据从内核中,直接传输到socket,不用互相拷贝。其中用到了Direct Memory Access 技术,可以把数据直接从内核空间传递到网卡设备,kafka中把数据直接从磁盘复制到 pagecash,给消费者读取,如图:

在这里插入图片描述
在这里插入图片描述
零拷贝其实不是没有拷贝,只是减少了不必要的拷贝次数,比如内核到用户空间的拷贝。
linux 中使用sendfile()实现零拷贝
java中nio用到零拷贝,比如filechannel.transferTo()。

mmap 文件映射机制:把磁盘文件映射到内存,用户通过修改内存,就可以修改磁盘文件。提高io效率,减少了复制开销。

三、如何避免重复消费?

分析原因:

1.生产者重复提交
2.rebalence引起重复消费

超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发Rebalance,提交offset失败。其他消费者会从没有提交的位置消费,从而导致重复消费。

解决方案:

1.提高消费速度

  • 增加消费者
  • 多线程消费
  • 异步消费
  • 调整消费处理时间

2.幂等处理

  • 消费者设置幂等校验

  • 开启kafka幂等配置,生产者开启幂等配置,将消息生成md5,然后保存到redis中,处理新消息的时候先校验。这个尽量不要开启,消耗性能。

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

四、如何顺序消费?

我们知道kafka,整个topic有多个partition,每个partition内的消息是有顺序的。

五、如何延迟消费?

kafka是无状态的,没有延迟的功能。pulsar和rabbitmq实现更加方便。
在这里插入图片描述
开发延迟推送服务,定时检索延迟消息,发送给kafka。

六、频繁rebanlence怎么解决?

再均衡,保证所有消费者相对均衡消费。rebalence的时候,所有消费者,停止消费,直到rebanlence完成。

触发时机:
1.consumer个数变化
2.订阅topic个数变化
3.订阅的topic的partition变化

解决方案:

使用消息队列Kafka版时消费客户端频繁出现Rebalance

频繁出现rebalence,可能是消费者的消费时间过长,超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发Rebalance。

1.参数调整:
session.timeout.ms:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s即可。
max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> * <max.poll.interval.ms>的积。
max.poll.interval.ms: 该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。

2.尽量提高客户端的消费速度,消费逻辑另起线程进行处理。
3.减少Group订阅Topic的数量,一个Group订阅的Topic最好不要超过5个,建议一个Group只订阅一个Topic。

附:批量消费代码

import com.ctrip.framework.apollo.ConfigService;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class BehaviorConsumerConfig {

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
            propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.StickyAssignor.class);

        propsMap.put("security.protocol", protocol);
        propsMap.put("ssl.truststore.location", truststoreLocation.replaceAll("file://", ""));
        propsMap.put("ssl.truststore.password", truststorePassword);
        propsMap.put("login.config.location", loginConfigLocation);
        propsMap.put("sasl.mechanism", mechanism);
        return propsMap;
    }

    @Bean("batchContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));

        // 并发创建的消费者数量
        factory.setConcurrency(4);
        factory.getContainerProperties().setPollTimeout(3000);

        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }
}

七、小结

本篇我们基本上把消费者的消费梳理干净了,以及消费会遇到的 重复消费,顺序消费,延迟消费等问题都也解释了给出了解决方案。方案一通百通。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【MQ】kafka(四)——kafka消费者如何消费的?如何防止重复消费?如何顺序消费? 的相关文章

  • 如何使用 Java 和 Selenium WebDriver 在 C 目录中创建文件夹并需要将屏幕截图保存在该目录中?

    目前正在与硒网络驱动程序和代码Java 我有一种情况 我需要在 C 目录中创建一个文件夹 并在该文件夹中创建我通过 selenium Web 驱动程序代码拍摄的屏幕截图 它需要存储在带有时间戳的文件夹中 如果我每天按计划运行脚本 所有屏幕截
  • Spring Batch 多线程 - 如何使每个线程读取唯一的记录?

    这个问题在很多论坛上都被问过很多次了 但我没有看到适合我的答案 我正在尝试在我的 Spring Batch 实现中实现多线程步骤 有一个包含 100k 条记录的临时表 想要在 10 个线程中处理它 每个线程的提交间隔为 300 因此在任何时
  • 如何默认将 Maven 插件附加到阶段?

    我有一个 Maven 插件应该在编译阶段运行 所以在项目中consumes我的插件 我必须做这样的事情
  • 在 java 类和 android 活动之间传输时音频不清晰

    我有一个android活动 它连接到一个java类并以套接字的形式向它发送数据包 该类接收声音数据包并将它们扔到 PC 扬声器 该代码运行良好 但在 PC 扬声器中播放声音时会出现持续的抖动 中断 安卓活动 public class Sen
  • Final字段的线程安全

    假设我有一个 JavaBeanUser这是从另一个线程更新的 如下所示 public class A private final User user public A User user this user user public void
  • 磁模拟

    假设我在 n m 像素的 2D 表面上有 p 个节点 我希望这些节点相互吸引 使得它们相距越远吸引力就越强 但是 如果两个节点之间的距离 比如 d A B 小于某个阈值 比如 k 那么它们就会开始排斥 谁能让我开始编写一些关于如何随时间更新
  • 我可以使用 HSQLDB 进行 junit 测试克隆 mySQL 数据库吗

    我正在开发一个 spring webflow 项目 我想我可以使用 HSQLDB 而不是 mysql 进行 junit 测试吗 如何将我的 mysql 数据库克隆到 HSQLDB 如果您使用 spring 3 1 或更高版本 您可以使用 s
  • 如何为俚语和表情符号构建正则表达式 (regex)

    我需要构建一个正则表达式来匹配俚语 即 lol lmao imo 等 和表情符号 即 P 等 我按照以下示例进行操作http www coderanch com t 497238 java java Regular Expression D
  • Java按日期升序对列表对象进行排序[重复]

    这个问题在这里已经有答案了 我想按一个参数对对象列表进行排序 其日期格式为 YYYY MM DD HH mm 按升序排列 我找不到正确的解决方案 在 python 中使用 lambda 很容易对其进行排序 但在 Java 中我遇到了问题 f
  • 在两个活动之间传输数据[重复]

    这个问题在这里已经有答案了 我正在尝试在两个不同的活动之间发送和接收数据 我在这个网站上看到了一些其他问题 但没有任何问题涉及保留头等舱的状态 例如 如果我想从 A 类发送一个整数 X 到 B 类 然后对整数 X 进行一些操作 然后将其发送
  • 使用Caliper时如何指定命令行?

    我发现 Google 的微型基准测试项目 Caliper 非常有趣 但文档仍然 除了一些示例 完全不存在 我有两种不同的情况 需要影响 JVM Caliper 启动的命令行 我需要设置一些固定 最好在几个固定值之间交替 D 参数 我需要指定
  • 总是使用 Final?

    我读过 将某些东西做成最终的 然后在循环中使用它会带来更好的性能 但这对一切都有好处吗 我有很多地方没有循环 但我将 Final 添加到局部变量中 它会使速度变慢还是仍然很好 还有一些地方我有一个全局变量final 例如android Pa
  • 如何在 javadoc 中使用“<”和“>”而不进行格式化?

    如果我写
  • AWS 无法从 START_OBJECT 中反序列化 java.lang.String 实例

    我创建了一个 Lambda 函数 我想在 API 网关的帮助下通过 URL 访问它 我已经把一切都设置好了 我还创建了一个application jsonAPI Gateway 中的正文映射模板如下所示 input input params
  • 获取 JVM 上所有引导类的列表?

    有一种方法叫做findBootstrapClass对于一个类加载器 如果它是引导的 则返回一个类 有没有办法找到类已经加载了 您可以尝试首先通过例如获取引导类加载器呼叫 ClassLoader bootstrapLoader ClassLo
  • 编译器抱怨“缺少返回语句”,即使不可能达到缺少返回语句的条件

    在下面的方法中 编译器抱怨缺少退货声明即使该方法只有一条路径 并且它包含一个return陈述 抑制错误需要另一个return陈述 public int foo if true return 5 鉴于Java编译器可以识别无限循环 https
  • 在 Maven 依赖项中指定 jar 和 test-jar 类型

    我有一个名为 commons 的项目 其中包含运行时和测试的常见内容 在主项目中 我添加了公共资源的依赖项
  • Firebase 添加新节点

    如何将这些节点放入用户节点中 并创建另一个节点来存储帖子 我的数据库参考 databaseReference child user getUid setValue userInformations 您需要使用以下代码 databaseRef
  • 使用 JMF 创建 RTP 流时出现问题

    我正处于一个项目的早期阶段 需要使用 RTP 广播DataStream创建自MediaLocation 我正在遵循一些示例代码 该代码目前在rptManager initalize localAddress 出现错误 无法打开本地数据端口
  • 将 List 转换为 JSON

    Hi guys 有人可以帮助我 如何将我的 HQL 查询结果转换为带有对象列表的 JSON 并通过休息服务获取它 这是我的服务方法 它返回查询结果列表 Override public List

随机推荐

  • OTA实现设备升级方案

    引言 空中下载技术 Over the Air Technology OTA 是通过移动通信的空中接口实现对移动终端设备进行远程管理的技术 该技术在IOT行业非常的重要 当随着市场上的产品越来越多 保有量也越来越多 随着时间的推移 很多设备的
  • STM8 学习笔记5:CLK

    Clock 1 概述 时钟是单片机的脉搏 是单片机的驱动源 使用任何一个外设都必须打开相应的时钟 这样的好处是 如果不使用一个外设的时候 就把它的时钟关掉 从而可以降低系统的功耗 达到节能 实现低功耗的效果 每个时钟tick 系统都会处理一
  • mysql使用定时器执行任务(event)

    1 查询是否开启定时事件 OFF或者0 关闭 ON或者1 开启 SHOW VARIABLES LIKE event sche 2 开启事件
  • inux c学习笔记----SCTP基础客户/服务编程(setsockopt,sctp_sendmsg等)

    在编程之前先了解一下sctp套接字选项 setsockopt 设置socket状态 相关函数 getsockopt 表头文件 include
  • 图书馆

    看新工艺的图书馆像看天书一样 多了很多内容 老驴打算挖个坑尝试去读一下lib中各个表格所代表的意义及用途 今儿开篇 LDM LDM 线性延迟模型 最简单的单元延迟模型 计算公式 D D0 D1 S D2 C 其中 D0 D1 D2是常值 S
  • nvm安装后,安装并切换版本,node报错

    1 下载 下载地址 https github com coreybutler nvm windows releases 这个版本是 v1 1 11 不喜欢的话 自己选版本 下载setup exe 安装 2 安装 安装时候 安装路径可以自选
  • 自用部署PorkbunDNS解析

    1 登录 2 域名管理设置 3 域名设置
  • 输入一个大于3的整数,判断它是否为素数(prime,又称质数)

    include
  • 人工智能大会机器人范围

    AI时代 看机器人七十二变 2020世界人工智能大会现场 一组 AI上海 机器人矩阵 观展区十分吸引眼球 格物斯坦表示 机器人的种类分很多 智能广告导引机器人 智能末端配送站 智能陪伴机器人 数字化门店等各路机器人各显神通 共同为沪上的人工
  • 手把手教你用 ChatGPT plugin 打造一个人知识库系统(一)

    为什么需要个人知识库 大概有很多人跟我一样 被现在信息过载弄得非常焦虑 很自然想到通过整理的方式来对抗信息过载 试图使用各种知识管理工具来整理这些信息 但最后折腾完各种工具后 才发现根本用不起来 因为这些工具常常需要我们按照预设的框架去管理
  • 如何融合多任务学习 (Multi-Task Learning ) 损失函数loss

    目录 1 Uncertainty Weighting 1 1 基础概念 1 2 方法 2 GradNorm 2 1 原理 2 2 方法 3 Multi Objective Optimisation 3 1 原理 3 2 方法 4 Geome
  • OAuth2基础知识

    什么是OAuth 2 OAuth 2 0是一种授权协议 它的核心是授权许可和令牌机制 它通过颁发访问令牌给第三方 允许第三方代表用户访问该用户的数据 而不是直接给第三方用户名和密码 它主要用来保护Web API接口 第三方只有得到授权和访问
  • Js文件中调用其它Js函数的方法

    转载 在一个js中引用另一个js的方法 方法 步骤 一个js调用另外一个js的方法 最简单的例子就是jquery的调用 首先准备好两个js文件 然后写一个html文件 通过方法 将两个js引用到html文件中 处于下方的js文件 就可以直接
  • 电路实验

    任务一 1 非门电路 2 表格 INPUT toggle switch OUTPUT led ON 1 Black 0 OFF 0 Red 1 任务二 1 A B C AB AC 2 表格 A B C O1 O2 0 0 0 0 0 0 1
  • 【C++编程题】回文串( 动态规划×,暴力求解√ )

    问题描述 回文串 是一个正读和反读都一样的字符串 比如 level 或者 noon 等等就是回文串 给你一个字符串 问最少在字符串尾添加多少字符 可以使得字符串变为回文串 输入格式 有多组测试数据 每组测试数据第一行是一个正整数N 表示字符
  • 土地利用现状分类2020_国土空间调查、规划、用途管制用地用海分类指南来了...

    自然资源部办公厅印发指南通知 据自然资源部官网今日消息 该部办公厅已于11月17日印发 国土空间调查 规划 用途管制用地用海分类指南 试行 这部指南主要内容包括总则 一般规定 用地用海分类等 其明确了国土空间调查 规划 用途管制用地用海分类
  • Pandora设置samba(文件共享)的步骤#openwrt通用#

    此方法为在线安装 进入后台 进入系统 gt 软件包 点击刷新列表 搜索Samba 安装两个插件 luci app samba和 samba4 server 重启路由器 在服务中可以查看配置 添加用户 在winscp中 找到etc passw
  • 代码随想录算法训练营第二十九天/46.全排列、47.全排列2

    文章目录 组合 排列辨析 46 全排列 思路 注意 实现代码 47 全排列2 思路 注意 实现代码 组合 排列辨析 组合 是无序的 例如 1 2 2 1 是同一个 排列 是有序的 例如 1 2 2 1 是不一样的 46 全排列 思路 判断结
  • Maven入门学习

    Maven 1 Maven 介绍 Maven 是跨平台的项目管理工具 作为 Apache 组织提供的一个颇为成功的开源项目 它是基于项目对象模型 POM project object model 可以通过一小段描述信息 配置 来管理项目的构
  • 【MQ】kafka(四)——kafka消费者如何消费的?如何防止重复消费?如何顺序消费?

    一 前言 前面博客小编向大家分享了 kafka如何保证消息不丢失 基本是从producer和broker来分析的 producer要支持重试和acks producer要做好副本和及时刷盘落地 这篇博客呢 就跟大家一起聊一下 kafka 消