kafka中partition数量与消费者对应关系以及Java实践

2023-10-27

kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

 

kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一般情况下partition-0存储a,c,e3个数据,partition-1存储b,d,f另外3个数据。

消费者组数量的不同以及partition数量的不同对应着不同的消费情况,下面分别进行梳理之:

1、单播模式,只有一个消费者组

      (1)topic只有1个partition,该组内有多个消费者时,此时同一个partition内的消息只能被该组中的一个consumer消费。当消费者数量多于partition数量时,多余的消费者是处于空闲状态的,如图1所示。topic,test只有一个partition,并且只有1个group,G1,该group内有多个consumer,只能被其中一个消费者消费,其他的处于空闲状态。

                                                    

                                                                                     图1

(2)该topic有多个partition,该组内有多个消费者,比如test 有3个partition,该组内有2个消费者,那么可能就是C0对应消费p0,p1内的数据,c1对应消费p2的数据;如果有3个消费者,就是一个消费者对应消费一个partition内的数据了。图解分别如图2,图3.这种模式在集群模式下使用是非常普遍的,比如我们可以起3个服务,对应的topic设置3个partiition,这样就可以实现并行消费,大大提高处理消息的效率。

  

                                            图2                                                                                    图3

2、广播模式,多个消费者组

如果想实现广播的模式就需要设置多个消费者组,这样当一个消费者组消费完这个消息后,丝毫不影响其他组内的消费者进行消费,这就是广播的概念。

(1)多个消费者组,1个partition

该topic内的数据被多个消费者组同时消费,当某个消费者组有多个消费者时也只能被一个消费者消费,如图4所示:

                                                      

                                                                                   图4

(2)多个消费者组,多个partition

该topic内的数据可被多个消费者组多次消费,在一个消费者组内,每个消费者又可对应该topic内的一个或者多个partition并行消费,如图5所示:

                                    

                                                                            图5

3、Java实践

这里使用Java服务进行实践,模拟2个parition,然后同一个组内有2个消费者的情况:

首先创建一个发送消息的controller方法:

 @ApiOperation(value = "向具有kafka-2个partition的topic发送信息")
    @RequestMapping(value = "/testSendMessage2", method = RequestMethod.POST)
    public String testSendMessage(@RequestParam("msg") String msg) {
        KafkaTemplate.send(KafkaTopicEnum.TEST_TWO_PARTITION_MSG.code,msg);
        System.out.println("发送的消息是:"+msg);
        return "2个partition的topic数据!--ok";
    }

然后再创建一个监听类监听该topic,这里的监听类即为消费者。

 /**
     * @date 2020-09-24
     * 两个partition的topic,同一个组的两个消费者就可以并行的消费了,需要kafka也是集群才行,单机版并不支持
     * @param consumerRecord
     * @param acknowledgment
     */
    @KafkaListener(topics = "two-partition-msg",groupId ="serverGroup1",containerFactory = "ackContainerFactory")
    public void receiveKafkaTwoParMsg(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment){
        InetAddress address = null;
        try {
            address = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        System.out.println("当前的IP地址是:"+address.getHostAddress());
        System.out.println("监听服务A-收到的消息是::");
        System.out.println(consumerRecord.value().toString());
        System.out.println("=================== end =================");
//        ack 提交掉,避免服务重启再次拉取到消息
        acknowledgment.acknowledge();
    }

然后我们给该服务起2个实例,即模拟该组内serverGroup1内的2个消费者,然后我们使用测试方法进行测试,向该topic内发送多个消息,观察2个实例的输出日志:

     实例1:    
     发送的消息是:111
      当前的IP地址是:10.244.3.114
      监听服务A-收到的消息是::
      "111"
      =================== end =================
      发送的消息是:222
      发送的消息是:333
      当前的IP地址是:10.244.3.114
      监听服务A-收到的消息是::
      "333"
      =================== end =================
      发送的消息是:444
      发送的消息是:555
      当前的IP地址是:10.244.3.114
      监听服务A-收到的消息是::
      "555"
      =================== end =================
      发送的消息是:666
      发送的消息是:777
      当前的IP地址是:10.244.3.114
      监听服务A-收到的消息是::
      "777"
      =================== end =================
      发送的消息是:888
      发送的消息是:999
      当前的IP地址是:10.244.3.114
      监听服务A-收到的消息是::
      "999"
	 实例2:
     当前的IP地址是:10.244.0.237
      监听服务A-收到的消息是::
      "222"
      =================== end =================
      当前的IP地址是:10.244.0.237
      监听服务A-收到的消息是::
      "444"
      =================== end =================
      当前的IP地址是:10.244.0.237
      监听服务A-收到的消息是::
      "666"
      =================== end =================
      当前的IP地址是:10.244.0.237
      监听服务A-收到的消息是::
      "888"

发现该组内的一个消费者消费到了111,333,555,777,999 ,另外一个消费者消费到了222,444,666,888,起到了均衡消费的效果。

所以在微服务的集群中,我们可以通过给topic设置多个partition,然后让每一个实例对应消费1个partition的数据,从而实现并行的处理数据,可以显著地提高处理消息的速度。         

4、 使用kafkaManager为topic增加partition数量

1)首先点击 Add Partitions 增加partition的数量,然后点击Generate Partition Assignments ,此时系统自动会为每个分区下的副本分配broker, 最后点击 Reassign Partitions,可以平衡集群的负载

想了解更多关于kafka、docker、k8s等云原生以及Java干货,欢迎关注下方公众号:

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka中partition数量与消费者对应关系以及Java实践 的相关文章

随机推荐

  • DRF---序列化组件

    目录 序列化器Serializer 序列化组件基本使用 使用序列化类 序列化多条数据 使用序列化类 序列化单条数据 反序列化 新增 修改 新增 视图类 序列化类 视图类 序列化类 序列化类的常见字段类和常见参数 常用字段类型 选项参数 通用
  • 【Linux线程同步】生产者消费者模型

    文章目录 1 peach 线程互斥中可能还会存在的问题 peach 2 peach 线程同步 peach 2 1 apple 同步概念与竞态条件 apple 2 2 apple 条件变量函数 apple lemon 初始化 lemon le
  • Qt5.15源码编译详解

    1 请先参考 https blog csdn net weixin 60395515 article details 127284046 spm 1001 2014 3001 5501 2 有以下几个不同的地方需要修改 Qt5的mkspec
  • 超详细解决困扰人的python典例:“有n个人围成一圈”式n里挑一

    自学python No 2 引语 题目 案例实现 range 函数 append 函数 pop 函数 完整代码 引语 记录学习路程 抛砖引玉 如有更好的算法或者出现错误 欢迎指点 题目 有n个人围成一圈 顺序排号 从第一个人开始报数 从1到
  • 汽车之家各种车型参数爬虫

    汽车之家各种车型参数爬虫 结果如下 本案例使用jupyter notebook 用到requests BeautifulSoup lxml urlencode pandas五个库 爬取下来的数据如下图所示 详细过程 整个过程分成三个部分 1
  • ubuntu系统信息查询(主板,内存,硬盘,网卡)

    1 主板型号 主板支持最大内存 单条内存的参数 sudo dmidecode t 2 查看主板信息 sudo dmidecode t 16 grep Maximum 查看主板支持最大内存 sudo dmidecode t memory 查看
  • JDBC、连接步骤(4步)、需要导入的第三方jar包、开发步骤

    1 JDBC Java Database Connectivity java连接数据库的工具 1 1 什么是JDBC 他是java提供的一组API 用来提供连接数据库中需要用到的类和接口 他是一组规范 为不同数据库封装相同接口的一组规范 让
  • 基于 Web 的 LDAP 认证,访问资源就是这么安全

    轻量级目录访问协议 即 LDAP 协议 是微软 Active Directory AD 和 OpenLDAP 等传统身份管理解决方案中的核心身份认证协议 然而 IT 环境的不断发展暴露了传统方案的问题 基于本地部署的设计逻辑无法适应新兴的云
  • Unity2D游戏无限刷新地图

    关于Unity2D游戏如何无限刷新地图的问题 首先在Unity中创建多个大小相同的物体当做刷新的地图对象 然后在创建一个名称为Endless cs的脚本 然后添加如下代码 public float distance void OnBecam
  • cmake(三十五)Cmake之include指令

    一 CMakeLists txt和cmake脚本的联系和区别 cmake脚本 1 cmake文件里面通常是 什么信息 information cmake文件 里包含了一些 公共 复用 的 cmake命令 和一些 宏 函数 当CMakeLis
  • java开发团队认知_一个优秀的研发团队应该具备什么特征

    1 计划执行 计划安排得当 不要老加班 不要老是现实和计划不匹配 不要做到哪儿计划就推后到哪儿 2 研发成果 成功产出几个重影响力级别的 完整成块的 有成就感自豪感的产品或项目 3 团队氛围 这个团队每个人都相处的很融洽 4 团队协作 每个
  • Pytorch 的 LSTM 模型的简单示例

    1 代码 完整的源代码 import torch from torch import nn 定义一个LSTM模型 class LSTM nn Module def init self input size hidden size num l
  • C. Doremy‘s IQ(二分/贪心)

    题目 题意 给定n个任务和艾米的智商q 艾米要按顺序处理这n个任务 每个任务有难度值a i 对于每个任务 艾米可以选择处理 也可以选择不处理 如果艾米当前的智商q大于等于任务a i 则艾米可以直接处理该任务 智商不受任何影响 如果艾米当前的
  • SpringCloud——微服务

    微服务技术栈 在之前的开发过程中 我们将所有的服务都部署在一台服务器中 当我们的服务开始越来越多 业务越来越复杂 当一台服务器不能承担我们的业务的时候 就需要将不同的业务分开部署在不同的服务器上 这每一个单独分离的服务 就是微服务 这些搭载
  • 数据结构 常见的八大数据结构汇总 为什么要学习数据结构?解读数据结构!进阶必看!

    文章目录 什么是数据结构 概念解释 为什么要学习数据结构 常见的数据结构 0 数组 1 链表 2 栈 3 队列 4 树 5 散列表 又叫哈希表 6 堆 7 图 思维导图 横向 纵向 什么是数据结构 数据结构 Data Structure 是
  • 使用Arduino开发ESP32:串口(Serial port) HardwareSerial库使用说明

    HardwareSerial库使用 使用演示 上图中通过HardwareSerial库实现了对串口1的使用 具体说明见下文 详细说明 使用HardwareSerial库需要先声明一个对象 例如上文图中的HardwareSerial mySe
  • MySQL基本操作(三)

    MySQL基本操作 三 看过MySQL基本操作 二 的应该已经体会到mysql数据库下user表的妙用了 我相信你也已经做过一些尝试了 比如自建用户名无需密码登录 自建任意主机登录等等 这样的尝试会增加你对mysql的兴趣 Ok 你第一次登
  • VC得到当前目录与得到应用程序目录的一个应用

    得到当前目录的函数 GetCurrentDirectory 得到应用程序目录的函数是 GetModuleFileName 用法都很简单 不知道就看MSDN吧 我先用这2个函数分别写一个实现同一个功能的函数 以下是代码片段 函 数 名 Rea
  • Candy算法--理解

    Canny边缘检测算子的目标是找到一个最优的边缘检测算法 最优边缘检测的含义是 好的检测 算法能够尽可能多地标识出图像中的实际边缘 好的定位 标识出的边缘要与实际图像中的实际边缘尽可能接近 边缘过粗 难以精确定位 最小响应 图像中的边缘只能
  • kafka中partition数量与消费者对应关系以及Java实践

    kafka是由Apache软件基金会开发的一个开源流处理平台 kafka是一种高吞吐量的分布式发布订阅消息系统 它可以处理消费者在网站中的所有动作流数据 kafka中partition类似数据库中的分表数据 可以起到水平扩展数据的目的 比如