Rebalance&多线程实例消费(十二)

2023-10-27

上篇文章说了,kafka位移提交通过enable.auto.commit控制手动提交还是自动提交,手动提交又分为异步提交和同步提交,还可以指定分区进行提交,默认是提交给所有分区。手动提交可以对应不同的业务场景,当需要业务全部处理完才提交位移,则可以选择手动提交,但这时候需要做幂等性处理,因为当业务执行完毕,但系统宕机,这时候consumer重启则因为位移没提交会重复消费之前的数据。

Consumer位移管理-Kafka从入门到精通(十一)icon-default.png?t=M7J4https://blog.csdn.net/ke1ying/article/details/126387415

一、Rebalance

Rebalance是什么?

它本质是一组协议,规定了consumer group如何达成一致性来分配订阅所有分区的。假设有20个consumer,需要订阅100个分区的topic,这时候就会每个consumer会平均订阅5个分区,这个过程就是rebalace。

和旧版本依托于zookeeper不同,新版本consumer使用了kafka内置一个权限的协调协议(group coordination protocol)。Kafka的某个broker会被选举为组协调者(group coordinator),他负责对组的状态进行管理,他的主要职责是当新成员到达时促进组内所有的成员重新分配,即coordinator负责rebalance。

什么时候他会触发rebalance呢?

  1. 组成员发生变化,比如新的consumer加入组,或者有consumer离开组,或者consumer崩溃时候触发。
  2. 消费组订阅的topic发生变化。
  3. 组订阅的topic分区发生变更。

真实应用场景中引用rebalance最常见原因违背了第一条件,特别是consumer崩溃情况,崩溃不一定是consumer进程宕机或者挂掉,当consumer无法在指定时间内完成消息处理时候,那么coordinator则会认为consumer已经崩溃,从而引发新一轮的rebalance。当group程序下业务处理逻辑过重,这时候就会导致消费超时,从而导致coordinator认为consumer挂掉,引发rebalance,这时候就要注意这些参数的配置request.timeout.ms、max.poll.interval.ms、max.poll.records等。

Rebalance分区配置?

之前提到过rebalance时group下所有consumer会一起协调共同参与分区分配,kafka新版本consumer默认提供了三种分区策略,分别是range、round-robin、sticky。

Range策略主要是基于范围思想,它将单个topic的所有分区按照顺序排列,然后把这些分区划分为固定大小的分区并且依次分给各个consumer。而round-robin策略则会把所有topic的所有分区顺序摆开,然后轮询式的分配给各个consumer。最新发布的sticky策略有效避免上诉两种策略完全无视历史分配方案缺陷,采用“有粘性”对所有consumer实例进行分配,可以最大程度的避免分配倾斜。

新版本consumer默认的分配策略是range,用户根据consumer参数partition.assignment.strategy来进行设置,另外也可以通过自定义来分配策略。

Rebalance协议:

前面说了rebalance本质就是一组协议,group与coordinator共同使用这组协议来完成group的rebalance,最新版本的kafka中提供下面五种协议来处理rebalance。

Joingroup请求:consumer请求加入组。

SyncGroup请求:group leader吧分配方案同步更新到组内所有成员中。

Heartbeat请求:consumer定期向coordinator汇报心跳表明依然存活。

LeaveGroup请求:consumer主动通知coordinator该consume即将离组。

DescribeGroup请求:查看组的所有信息,包括成员信息,协议信息,分配方案,订阅信息。该请求类型主要提供管理员使用。Coordinator不使用该请求执行rebalance。

在rebalance过程中,coordinator主要处理consumer发过来的joinGroup和syncGroup请求,当consumer主动离组时会发送leaveGroup请求给coordinator。

在成功rebalance后,组内所有consumer都需要定期向coordinator发送heartbeat请求,而每个consumer也是根据heartBeat请求的响应中是否包含rebalance_in_progress来判断当前group是否开启了新一轮的rebalance。

rebalance监听器:

在位移提交章节中,consumer默认在新版本是把位移提交到_consumer_offsets中。其实kafka也支持把位移提交到外部存储中,比如数据库。若要实现这个功能,则必须使用rebalance监听器,而使用监听器的前提是用户必须使用consumer group。如果使用独立的consumer或者直接手动分配分区,那么rebalance监听是无效的。

多线程实例消费

如前所述,kafkaConsumer是非线程安全的,他和kafkaProducer不同,后者是线程安全的,因此可以在多个线程中使用同一个kafkaProducer实例,而且这样的效率是比每个线程维护一个kafkaProducer更高。

Consumer group分为 每个线程单独维护一个kafkaConsumer,和 单kafkaConsumer+多work线程。

两者区别是,后者在全局维护一个或者多个kafkaConsumer实例执行消息获取任务。使用全局的kafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的work线程执行工作,之后work线程完成处理上报位移状态,由全局的consumer提交位移。

那么他们的优缺点呢?

每个线程维护专属consumer:优点:实现简单,速度快,因为无线程之间的交互管理,方便管理位移,易于维护分区间的消费顺序。缺点:socker连接开销大;consumer受限与topic分区,扩展性差。Broker端处理负载高(因为发往broker请求多);rebalance可能性大。

单consumer+多worker模式:优点:消息获取处理解耦;扩展性强,独立扩展consumer数量和worker。缺点:实现负载;难以维护分区内的顺序消息;处理链路变长,导致位移管理困难;worker线程异常导致数据丢失。

独立consumer

前面说的都是group consumer消费者组形式出现,group自动实行分区分配和rebalance。对于需要多个consumer共同读取某个topic来说,使用group非常方便。但有的时候用户需要精准消费某个consumer消费某个分区。

  1. 如果进程自己维护分区状态,那么它就可以固定消费某些分区而不用担心状态丢失问题。
  2. 如果进程本身已经是高可用且能够自动重启恢复错误,那么它就不需要让kafka来帮它完成错误检测和状态恢复。

以上两种情况中consumer group都无用武之地,而独立consumer更合适(standlone consumer)。

使用standalone方法就是调用kafkaConsumer.assign,前面我们订阅则是使用kafkaConsumer.subscribe。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rebalance&多线程实例消费(十二) 的相关文章

  • AutoMapper基本使用

    导包 AutoMapper AutoMapper Extensions Microsoft DependencyInjection 假如需要将Student映射为StudentCopy namespace WebApplication14

随机推荐

  • 国内比较快的DNS服务器IP汇总

    DNS是什么 DNS Domain Name System 域名系统 简单的说 就是把我们输入的网站域名翻译成IP地址的系统 比如我们想访问百度 我们会在网页里键入www baidu com 但是电脑不会理解这串字符的含义 于是就把这串字符
  • ElementUI浅尝辄止33:Form 表单

    Form 表单 日常业务中很常见 由输入框 选择器 单选框 多选框等控件组成 用以收集 校验 提交数据 常见于表单请求 登录 数据校验等业务操作中 1 如何使用 包括各种表单项 比如输入框 选择器 开关 单选框 多选框等 在 Form 组件
  • 数据结构与算法(六):图结构

    一 基本概念 二 图的存储结构 1 邻接矩阵 2 邻接表 3 十字链表 三 图的遍历 1 深度优先遍历 2 广度优先遍历 四 最小生成树 1 Prim算法 2 Kruskal算法 五 最短路径 1 Dijkstra算法 图是一种比线性表和树
  • 15:00面试,15:06就出来了,问的问题有点变态。。。

    从小厂出来 没想到在另一家公司又寄了 到这家公司开始上班 加班是每天必不可少的 看在钱给的比较多的份上 就不太计较了 没想到8月一纸通知 所有人不准加班 加班费不仅没有了 薪资还要降40 这下搞的饭都吃不起了 还在有个朋友内推我去了一家互联
  • uniapp 发布前隐私条款、用户协议等配置

    隐私类型 1 android隐私与政策提示框 2 网页版隐私条款 android ios都会用到 3 app内部隐私条款 一 android隐私与政策提示框 根据工业和信息化部关于开展APP侵害用户权益专项整治要求 App提交到应用市场必须
  • CVE-2021-41773&&CVE-2021-42013复现

    CVE 2021 41773 漏洞原理 Apache HTTP Server 是 Apache 基础开放的流行的 HTTP 服务器 在其 2 4 49 版本中 引入了一个路径体验 满足下面两个条件的 Apache 服务器将受到影响 版本等于
  • 汇编寄存器介绍

    1 通用寄存器 名称 全称 32位 16位 8位 编号 功能 rax 累加器 Accumulator eax ax ah al 0 0000 返回值 rcx 计数器 Count Register ecx cx ch cl 1 0001 第二
  • pytorch自带的模型剪枝工具prune的使用

    torch nn utils prune可以对模型进行剪枝 官方指导如下 https pytorch org tutorials intermediate pruning tutorial html 直接上代码 首先建立模型网络 impor
  • 游标应用例子

    drop procedure if exists test create procedure test IN MGR varchar 10 IN ACCESST DATETIME begin declare no more record I
  • 【MCS-51】时钟电路和复位

    单片机的处理器内部具有众多模块 但是要想协调这些模块统一工作并不是一件易事 为了确保各部分能够统一有序工作 因为单片机已经是一个同步时序电路 所以要想让它内部能够有序工作 我们需要从外部输入一个时钟信号 目录 振荡方式 内部振荡 外部振荡
  • Spring源码:Bean的实例化及初始化过程

    1 概述 在前面的文章中 我们把 refresh 中 obtainFreshBeanFactory invokeBeanFactoryPostProcessors 方法都梳理了一遍 其中还有一个 registerBeanPostProces
  • (已解决)STM32报错error: L6236E: No section matches selector - no section to be FIRST/LAST.

    前言 在最近自学32过程中使用另一台电脑开发时出现报错 error L6236E No section matches selector no section to be FIRST LAST 在原本电脑开发时并未遇到 特地查阅资料解决此问
  • Web3到底是什么?

    本文原作者 Max Parasol 编译 黑色马里奥 Web3 加密货币持有者喜欢称之为的 Web 3 0 它是一个只有非常模糊定义的热门流行语 每个人都浅显的认为它是互联网基于区块链进一步发展的相关事物 但除此之外 它到底是什么 然而 围
  • linxu下CUDA静态库-下

    本机系统环境 lucas lucas wellcom usr local cuda lib64 cat proc version Linux version 3 13 0 40 generic buildd comet gcc versio
  • Java多线程-线程内如何抛出异常?

    设置setUncaughtExceptionHandler就能将线程内的异常抛出 否则只会存放在日志或者什么的 public class ThreadThrowExcption public static void main String
  • 自学python能学成吗-大家觉得自学python多久能学会?

    作为一名从业多年的IT人 同时也是一名教育工作者 我来回答一下这个问题 首先 学习Python语言确实并不困难 即使对于没有任何计算机基础的人来说 只要有一个系统的学习过程 大部分人都能够掌握Python的基本语法 但是要想在一周之内掌握P
  • 去除chrome网站https的安全检测

    chrome net internals hsts 访问该网址 把要禁止检测的网址放在下面 转载于 https www cnblogs com xiaozhumaopao p 11005282 html
  • Java 基础--- 异常 Exception

    Java 基础 异常 Exception Exception 类型 使用Exception Throwable Class Example finally 关键字 Chained Exception 自定义Exception Excepti
  • 通过银行卡号,识别相应的银行信息

    https github com navyxie bankcardinfo blob master index js 可以进行封装 转载于 https www cnblogs com haoran5544 p 11526244 html
  • Rebalance&多线程实例消费(十二)

    上篇文章说了 kafka位移提交通过enable auto commit控制手动提交还是自动提交 手动提交又分为异步提交和同步提交 还可以指定分区进行提交 默认是提交给所有分区 手动提交可以对应不同的业务场景 当需要业务全部处理完才提交位移