六、Kafka consumer及ConsumerRebalanceListener实现

2023-11-17

1. comsumer代码示例

public class ConsumerMessage {

    private static final String TOPIC_NAME = "topic-07";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.111.129:9092,192.168.111.129:9093,192.168.111.129:9094");
        // 指定groupId
        props.setProperty("group.id", "ConsumerGroup2");
        /* 是否自动确认offset */
        props.setProperty("enable.auto.commit", "true");
        /* 自动确认offset的时间间隔 */
        props.setProperty("auto.commit.interval.ms", "1000");
        // earliest 最早的偏移量
        // latest   最后提交的偏移量
        // none     未找到消费者组的之前偏移量,则向消费者抛出e异常
        // props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");

        // 序列化类
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        try {
            for (; ; ) {
                // poll一次会返回一批数据
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

2. 客户端负载均衡

public class RebalanceListener implements ConsumerRebalanceListener {
    /**
     * 伪代码,这里记录的是当前的偏移量
     */
    static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    private KafkaConsumer<String, String> consumer;
    /**
     * 初始化方法,传入consumer对象,否则无法调用外部的consumer对象,必须传入
     */
    public RebalanceListener(KafkaConsumer<String, String> consumer) {
        this.consumer = consumer;
    }
    /**
     * 该方法会在再均衡开始之前和消费者停止读取之后被调用。
     * 如果在这个方法中提交偏移量,则下一个消费者就可以获得读取的偏移量。
     */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("分区再均衡,提交当前偏移量:" + currentOffsets);
        consumer.commitSync(currentOffsets);
    }
    /**
     * 该方法会在再均衡之后和消费者读取之前被调用
     */
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        long committedOffset = -1;
        for (TopicPartition topicPartition : partitions) {
            // 获取该分区已经提交的偏移量
            committedOffset = consumer.committed(topicPartition).offset();
            System.out.println("重新分配分区,提交的偏移量:" + committedOffset);
            // 重置偏移量到上一次提交的偏移量的下一个位置处开始消费
            consumer.seek(topicPartition, committedOffset + 1);
        }
    }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

六、Kafka consumer及ConsumerRebalanceListener实现 的相关文章

  • Zookeeper的常见面试题

    1 Zookeeper 1 1 Zookeeper基本概念 Zookeeper作为一个优秀高效且可靠的分布式协调框架 ZooKeeper 在解决分布式数据一致性问题时并没有直接使用Paxos算法 而是专门定制了一致性协议叫做 ZAB Zoo
  • cdh下spark2-yarn运行sparkstreaming获取kafka数据使用spark-streaming-kafka-0-10_2.11报错解决

    报错问题 20 07 15 17 20 51 INFO utils AppInfoParser Kafka version 0 9 0 kafka 2 0 0 20 07 15 17 20 51 INFO utils AppInfoPars
  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs
  • Kafka【命令行操作】

    Kafka 命令行操作 Kafka 主要包括三大部分 生产者 主题分区节点 消费者 1 Topic 命令行操作 也就是我们 kafka 下的脚本 kafka topics sh 的相关操作 常用命令行操作 参数 描述 bootstrap s
  • kafka详解及集群环境搭建

    一 kafka详解 安装包下载地址 https download csdn net download weixin 45894220 87020758 1 1Kafka是什么 1 Kafka是一个开源消息系统 由Scala写成 是由Apac
  • Kafka一致性

    一 存在的一致性问题 1 生产者和Kafka存储一致性的问题 即生产了多少条消息 就要成功保存多少条消息 不能丢失 不能重复 更重要的是不丢失 其实就是要确保消息写入成功 这可以通过acks 1来保证 保证所有ISR的副本都是一致的 即一条
  • 大数据技术之Kafka——Kafka入门

    目录 一 概述 1 1 为什么要有Kafka 1 2 定义 1 3 消息队列 1 消息队列的应用场景 2 消息队列的两种模式 1 4 基础架构 二 Producer生产者 2 1 生产者消息发送流程 2 1 1 发送原理 2 2 异步发送A
  • flink 1.4版本flink table方式消费kafka写入hive方式踩坑

    最近在搞flink 搞了一个当前比较新的版本试了一下 当时运行了很长时间 hdfs里面查询有文件 但是hive里面查询这个表为空 后面用了很多种方式 一些是说自己去刷新hive表 如下 第一种方式刷新 alter table t kafka
  • kafka消费者客户端线程安全以及多线程实现并发读取消息

    kafka的生产者客户端Producer是线程安全的 但是消费者客户端是非线程安全的 每次操作时都会调用accqure方法用来确定当前只有一个线程操作 如果有多个线程在操作 会抛出CME异常 针对这种情况 为了能够多线程更快速的读取消息 可
  • Kafka : KafkaProducer Closing the kafka producer with timeoutMillis

    1 美图 2 背景 一段kafka写入程序 不晓得为啥突然发现很多奇怪的日志 kafka 多线程发送数据 然后在本地是可以的 在服务器上是偶现的 我写了一个本地程序多线程生产数据 发现是没有问题的 Test public void mult
  • Kafka生产者模式生成10亿条数据

    生产者生产消息 public class MyProducer2 public static void main String args throws InterruptedException 生产者 Properties properti
  • explain查看sql语句执行计划

    explain sql 执行结果字段描述 id select唯一标识 select type select类型 table 表名称 type 连接类型 possible keys 可能的索引选择 key 实际用到的索引 key len 实际
  • Kafka 架构及原理分析

    Kafka 架构及原理分析 文章目录 Kafka 架构及原理分析 简介 使用场景 架构 Broker Topic 副本机制 存储 消费分组 消费编号 数据多写支持 基于 binlog 实现主从复制 Kafka 的进阶功能 消息幂等性 事务
  • kafka系列——KafkaProducer源码分析

    实例化过程 在KafkaProducer的构造方法中 根据配置项主要完成以下对象或数据结构的实例化 配置项中解析出 clientId 用于跟踪程序运行情况 在有多个KafkProducer时 若没有配置 client id则clientId
  • 在windows系统下使用IDEA对kafka源码进行编译环境搭建以及配置

    目录 一 前期准备工作 step1 安装JDK1 8 step2 安装zookeeper单机版 step3 安装Gradle 5 4 step4 安装scala 2 11 12 二 将kafka源代码部署到编辑器IDEA并测试 step1
  • shell脚本,一次性启动kafka集群

    版本centos6 5 64位操作系统 已配置JDK1 8 三个节点 在s121节点上可以免密登录到另外两个节点 另外kafka0 9 0 1的安装目录相同 修改了主机名 并在每个节点的hosts文件中设置了映射 脚本内容 bin bash
  • Kafka 监控系统Eagle 使用教程 V1.4.0

    1 下载安装zookeeper 2 下载安装kafka 3 下载安装kafka eagle http download kafka eagle org tar zvxf kafka eagle bin 1 4 0 tar gz 4 配置JA
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比

随机推荐

  • ChatGPT大解密:带您探讨机器学习背后的秘密、利用与发展

    一 什么是机器学习 二 ChatGPT 的运作原理 三 ChatGPT 生活利用 1 自然语言处理 2 翻译 3 自动回复 四 ChatGPT vs 其他相关技术 五 ChatGPT 的未来 1 未来发展 2 职业取代 3 客服人员 4 翻
  • Loaded runtime CuDNN library: 8.0.5 but source was compiled with: 8.1.0. CuDNN library needs to hav

    2021 08 16 Loaded runtime CuDNN library 8 0 5 but source was compiled with 8 1 0 CuDNN library needs to have matching ma
  • 行业经验

    原文地址 http www blogjava net ITdavid archive 2008 01 21 176730 html 很多职场新人都谈到了工作经验的问题 似乎招聘公司不给你机会 你就没办法获得必要的工作经验 其实并不一定 很多
  • 前端用js实现粘贴图片实现上传图片功能

    前端实现粘贴复制图片 引入实现粘贴复制功能的外部js文件 设置放置图片的盒子 监听粘贴事件 实现上传功能 粘贴即上传 实现效果截图 引入实现粘贴复制功能的外部js文件 引入复制的外部js文件 设置放置图片的盒子 给放置图片的盒子设置可编辑里
  • React (七)

    React 组件 API 在本章节中我们将讨论 React 组件 API 我们将讲解以下7个方法 设置状态 setState 替换状态 replaceState 设置属性 setProps 替换属性 replaceProps 强制更新 fo
  • Qt5.15.2安装

    解释一下 Qt 的版本号 比如 5 15 2 是完整的 Qt 版本号 第一个数字 5 是大版本号 major 第二个数字 15 是小版本号 minor 第三个数字 2 是补丁号 patch 只要前面两个数字相同 Qt 的特性就是一致的 最后
  • Jenkins (二)

    Jenkins 二 使用pipeline script 简单编译 发布war工程到远程tomcat中 配置所需 下载 apache maven 3 9 3 tar gz 解压 apache maven 3 9 3 bin tar gz 拷贝
  • XP 和 win10 系统 bios配制

    因搞嵌入式 家里有好多老式MCU 用到XP系统相关的编译器和相关的调试工具 专门买了台X230 配两个硬盘XP和win10 发现BIOS配置不一样 在这里记录一下 1 config 下面的 serial ATA 系统 serial ATA
  • 工作流选型

    对比 Activiti Flowable
  • 什么是护网(HVV)_需要什么技能?

    HVV介绍 1 什么是护网 护网的定义是以国家组织组织事业单位 国企单位 名企单位等开展攻防两方的网络安全演习 进攻方一个月内采取不限方式对防守方展开进攻 不管任何手段只要攻破防守方的网络并且留下标记即成功 直接冲到防守方的办公大楼 然后物
  • spring-boot 实现定时任务@Scheduled

    Scheduled 只适合处理简单的计划任务 不能处理分布式计划任务 优势 是spring框架提供的计划任务 开发简单 执行效率比较高 且在计划任务数量太多的时候 可能出现阻塞 崩溃 延迟启动等问题 启动类中加入 EnableSchedul
  • 【转载】Linux 之 makefile详细教程

    什么是makefile 或许很多Winodws的程序员都不知道这个东西 因为那些Windows的IDE都为你做了这个工作 但我觉得要作一个好的和 professional的程序员 makefile还是要懂 这就好像现在有这么多的HTML的编
  • docker run -it 和 docker exec -it具有什么功能呢?

    Docker Docker是一个开源的应用容器引擎 让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中 然后发布到任何流行的 Linux或Windows操作系统的机器上 也可以实现虚拟化 容器是完全使用沙箱机制 相互之间不会有任何接口
  • 有关联想拯救者Y7000重装window10系统

    文章目录 1 联想拯救者使用U盘重装系统不需要进入bios 2 总结 由于C盘爆满了 所以选择重装系统来重新给C盘分下区 给C盘分大点 然后重装系统的具体流程参照的是博客使用U盘重装Windows10系统详细步骤及配图 官方纯净版 然后写这
  • 03 什么是预训练(Transformer 前奏)

    博客配套视频链接 https space bilibili com 383551518 spm id from 333 1007 0 0 b 站直接看 配套 github 链接 https github com nickchen121 Pr
  • 20145334赵文豪《网络对抗》—— 网络欺诈技术防范

    1 实验后回答问题 1 通常在什么场景下容易受到DNS spoof攻击 局域网内的攻击 连接公共场所的wifi 2 在日常生活工作中如何防范以上两种攻击方法 输入个人信息前 仔细检查核对域名是否正确 使用入侵检测系统 使用防火墙进行保护 d
  • 关闭bat运行python时的chromedriver窗口

    运行python 查看三方库的安装位置 打开文件位置 默认conda在这里 找到selenium webdriver common service py 添加依赖 修改70行左右代码 self process subprocess Pope
  • 深度学习之心得——dropout

    作用 防止过拟合 什么时候过拟合 网络参数多 训练数据少的时候 容易过拟合 原理 前向传播过程中暂时屏蔽一些节点 暂时不更新它的参数 这样就可以训练多个不同的网络 降低过拟合的可能 Dropout层的位置 Dropout一般放在全连接层防止
  • python垃圾回收 (GC) 机制

    Python 能够自动进行内存分配和释放 但了解 python 垃圾回收 garbage collection GC 的工作原理可以帮助你写出更好更快的 Python 程序 Python 使用两种算法进行垃圾回收 分别是引用计数 Refer
  • 六、Kafka consumer及ConsumerRebalanceListener实现

    1 comsumer代码示例 public class ConsumerMessage private static final String TOPIC NAME topic 07 public static void main Stri