java客户端作为kafka消费者测试

2023-11-02

【README】

本文主要对 java客户端作为kafka 消费者进行测试, 生产者由 kafka客户端扮演; 

 

【1】普通消费者

设置消费者组;

重置消费者的offset, 即每次都从最头开始消费(默认仅保持7天内数据) ;

类似于 命令行 --from-beginning

kafka-console-consumer.sh --topic first --zookeeper centos201:2181 --from-beginning

小结:从头开始消费,必须满足2个条件;

条件1: 必须重新换组, 如本文中的消费者组 从 sichuan 更新为 sichuan1 ;
条件2: 需要设置offset, 修改为 earliest, 默认值是 lastest;

/**
 * 普通消费者
 */
public class MyConsumer {
	public static void main(String[] args) {
		/* 1.创建消费者配置信息 */
		Properties props = new Properties();
		/*2.给配置信息赋值*/
		/*2.1连接的集群*/
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
		/*2.2开启自动提交 */
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
		/*2.3 自动提交的延时*/
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		/*2.4 key value的反序列化 */
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		/*2.5 消费者组 */
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); 
		/*2.6 重置消费者的offset */ 
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest 
		
		/* 创建消费者 */
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
		/* 订阅主题 */
		consumer.subscribe(Arrays.asList("first", "second"));
		/* 循环拉取 */ 
		while(true) {
			/* 消费消息-获取数据 */
			ConsumerRecords<String, String> consumerRds  = consumer.poll(100);
			/* 解析并打印 ConsumerRecords  */
			/* 遍历 ConsumerRecords*/
			for(ConsumerRecord<String, String> rd : consumerRds) {
				System.out.println("[消费者] " + rd.key() + "--" + rd.value()); 
			}
		} 
		/* 关闭消费者 */ 
//		consumer.close(); 
	}
}

 从官网可以找到以上配置值; https://kafka.apache.org/0110/documentation.html#configuration

 

【2】kafka消费者-手动提交offset 

手动提交offset有3种方式:

  • 方式1:同步手动提交;
  • 方式2:异步手动提交; 
  • 方式3:自定义手动提交策略;

0)为啥需要手动提交?

kafka自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
很多情况下我们需要从kafka成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 , 所以这时我们就需要进行手动提交kafka的offset下标。

这里顺便说下offset具体是什么。
offset:指的是kafka的topic中的每个消费组消费的下标。
简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。

1)关闭自动提交(默认为true)

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

第一次启动 consumer 从 90 开始消费;
第2次启动相同 consumer ,还是从90开始消费;

2) 如何使用手动提交?

kafka提供了手动提交offset的api;
方法1:commitSync 同步提交:  ;
方法2:commitAsync 异步提交;
两者相同点:都会将本次 poll  的一批数据最高的偏移量提交; 
不同点是, commitSync 阻塞当前线程,一直到提交成功, 并且会自动失败重试;
而 commitAsync 没有失败重试机制, 可能提交失败; 

3)同步手动提交offset

/**
 * 手动同步提交offset 
 */
public class ManSyncCommitOffsetConsumer {
	public static void main(String[] args) {
		/* 1.创建消费者配置信息 */
		Properties props = new Properties();
		/*2.给配置信息赋值*/
		/*2.1连接的集群*/
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
		/*2.2 关闭自动提交(默认为true) */ 
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
		
		/*2.3 自动提交的延时*/
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		/*2.4 key value的反序列化 */
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		/*2.5 消费者组 */
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); 
		/*2.6 重置消费者的offset */ 
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest 
		
		/* 创建消费者 */
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
		/* 订阅主题 */
		consumer.subscribe(Arrays.asList("first", "second"));
		/* 循环拉取 */ 
		while(true) {
			/* 消费消息-获取数据 */
			ConsumerRecords<String, String> consumerRds  = consumer.poll(100);
			/* 解析并打印 ConsumerRecords  */
			/* 遍历 ConsumerRecords*/
			for(ConsumerRecord<String, String> rd : consumerRds) {
				System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); 
			}
			/* 【同步提交】,当前线程会阻塞直到 offset提交成功 */ 
			consumer.commitSync();
		} 
		/* 关闭消费者 */ 
//		consumer.close(); 
	}
}

4)异步手动提交offset 

/**
 * 异步手动提交offset  
 */
public class ManASyncCommitOffsetConsumer {
	public static void main(String[] args) {
		/* 1.创建消费者配置信息 */
		Properties props = new Properties();
		/*2.给配置信息赋值*/
		/*2.1连接的集群*/
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
		/*2.2 关闭自动提交(默认为true) */ 
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
		
		/*2.3 自动提交的延时*/
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		/*2.4 key value的反序列化 */
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		/*2.5 消费者组 */
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); 
		/*2.6 重置消费者的offset */ 
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest 
		
		/* 创建消费者 */
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
		/* 订阅主题 */
		consumer.subscribe(Arrays.asList("first", "second"));
		/* 循环拉取 */ 
		while(true) {
			/* 消费消息-获取数据 */
			ConsumerRecords<String, String> consumerRds  = consumer.poll(100);
			/* 解析并打印 ConsumerRecords  */
			/* 遍历 ConsumerRecords*/
			for(ConsumerRecord<String, String> rd : consumerRds) {
				System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); 
			}
			/* 【异步提交】 当前线程会阻塞直到 offset提交成功 */  
			consumer.commitAsync(new OffsetCommitCallback() {
				@Override 
				public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
						Exception exception) {
					if (exception !=null) {
						System.out.println("异步提交失败");
					} else {
						System.out.println("异步提交成功"); 
					}
				}
			}); 
		} 
		/* 关闭消费者 */ 
//		consumer.close(); 
	}
}

5)自定义手动提交offset策略

5.0)为啥需要自定义?

因为异步提交有一些问题,如下:
先消费数据,后提交offset, 可能导致数据重复消费; 
先提交offset, 后走业务逻辑,可能会丢数据; 

5.1)应用场景:

把 offset 存储到本地库 和 消息消费逻辑 在同一个数据库事务里面;

5.2)如何实现?需要实现 ConsumerRebalanceListener 来实现。

/**
 * 自定义手动提交offset策略  
 */
public class DiyCommitOffsetConsumer {
	public static void main(String[] args) {
		/* 1.创建消费者配置信息 */
		Properties props = new Properties();
		/*2.给配置信息赋值*/
		/*2.1连接的集群*/
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
		/*2.2 关闭自动提交(默认为true) */ 
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
		
		/*2.3 自动提交的延时*/
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		/*2.4 key value的反序列化 */
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		/*2.5 消费者组 */
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); 
		/*2.6 重置消费者的offset */ 
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest 
		
		/* 创建消费者 */
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
		/* 订阅主题 */
		consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {
			@Override
			public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 在 rebalance方法【前】调用
				
			}
			@Override
			public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 在 rebalance方法【后】调用  
				/* 分区分配方法 */
				for (TopicPartition partition :  partitions) { 
					/*定位到某个 offset*/
					consumer.seek(partition, 1); // TODO: 这里需要输入1  
				}
			}  
		});
		/* 循环拉取 */ 
		while(true) {
			/* 消费消息-获取数据 */
			ConsumerRecords<String, String> consumerRds  = consumer.poll(100);
			/* 解析并打印 ConsumerRecords  */
			/* 遍历 ConsumerRecords*/
			for(ConsumerRecord<String, String> rd : consumerRds) {
				System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); 
			}
			/* 【同步提交】,当前线程会阻塞直到 offset提交成功 */ 
			consumer.commitSync();
		} 
		/* 关闭消费者 */ 
//		consumer.close(); 
	}
}

补充: 消费者rebalance 是什么?

消费者 rebalance, 什么时候触发 rebalance?  如 同一个消费者组下的 某个消费者机器宕机,或新增一个消费者机器,都会触发 rebalance,即重新分配  kafka分区数据与 消费者的对应关系; 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java客户端作为kafka消费者测试 的相关文章

  • Zookeeper的常见面试题

    1 Zookeeper 1 1 Zookeeper基本概念 Zookeeper作为一个优秀高效且可靠的分布式协调框架 ZooKeeper 在解决分布式数据一致性问题时并没有直接使用Paxos算法 而是专门定制了一致性协议叫做 ZAB Zoo
  • Redis Streams做股票行情MQ?

    redis作为内存数据库 大多时候都是作为缓存来使用 但是因为有pub sub的存在 所以也可以做MQ来使用 做为MQ 它有两个严重的问题 1 无法持久化 2 没有ack机制 redis pub sub是一个要即时消费的MQ 如果消费晚了
  • Kafka【命令行操作】

    Kafka 命令行操作 Kafka 主要包括三大部分 生产者 主题分区节点 消费者 1 Topic 命令行操作 也就是我们 kafka 下的脚本 kafka topics sh 的相关操作 常用命令行操作 参数 描述 bootstrap s
  • kafka中partition数量与消费者对应关系

    kafka是由Apache软件基金会开发的一个开源流处理平台 kafka是一种高吞吐量的分布式发布订阅消息系统 它可以处理消费者在网站中的所有动作流数据 kafka中partition类似数据库中的分表数据 可以起到水平扩展数据的目的 比如
  • python 自建kafka消息生成和消费小工具

    要将 Kafka 的消息生产和消费转换为 API 接口 我们可以使用 Python 的 Web 框架 其中 Flask 是一个轻量级且易于使用的选择 下面是一个简单的例子 使用 Flask 创建 API 来生成和消费 Kafka 消息 1
  • 六、Kafka consumer及ConsumerRebalanceListener实现

    1 comsumer代码示例 public class ConsumerMessage private static final String TOPIC NAME topic 07 public static void main Stri
  • 第十四章 kafka专题之日志数据删除策略

    日志数据清理 为了控制磁盘的容量 需要对过去的消息进行清理 1 内部定时任务检测删除日志 默认是5分钟 2 日志清理参数配置 支持配置策略对数据进行清理 以segment为基本单位进行定期清理 当前正在使用的segment不会被清理 启用c
  • Kafka剖析(一):Kafka背景及架构介绍

    转载自 http www infoq com cn articles kafka analysis part 1 Kafka 是由 LinkedIn 开发的一个分布式的消息系统 使用 Scala 编写 它以可水平扩展和高吞吐率而被广泛使用
  • kafka学习笔记(一)简介

    这是对我找到的学习资料的整理 非手打 参考 https kafka apachecn org intro html https blog csdn net weixin 39468305 article details 106346280
  • 大数据技术之Kafka——Kafka入门

    目录 一 概述 1 1 为什么要有Kafka 1 2 定义 1 3 消息队列 1 消息队列的应用场景 2 消息队列的两种模式 1 4 基础架构 二 Producer生产者 2 1 生产者消息发送流程 2 1 1 发送原理 2 2 异步发送A
  • Kafka——集群

    文章目录 集群 1 搭建个集群 2 集群发送消息 3 集群消费 3 1 Procuder 3 2 Consumer 4 消费顺序 集群 对于kafka来说 一个单独的broker意味着kafka集群中只有一个节点 要想增加kafka集群中的
  • 黑马头条 热点文章实时计算、kafkaStream

    热点文章 实时计算 1 今日内容 1 1 定时计算与实时计算 1 2 今日内容 kafkaStream 什么是流式计算 kafkaStream概述 kafkaStream入门案例 Springboot集成kafkaStream 实时计算 用
  • flink 1.4版本flink table方式消费kafka写入hive方式踩坑

    最近在搞flink 搞了一个当前比较新的版本试了一下 当时运行了很长时间 hdfs里面查询有文件 但是hive里面查询这个表为空 后面用了很多种方式 一些是说自己去刷新hive表 如下 第一种方式刷新 alter table t kafka
  • 仿kafka实现java版时间轮

    系统定时 超时 在我们平时的项目开发中 会设置系统的超时时间 比如在http接口中设置超时时间 在定时调度中也会用到 在jdk的开发的实现Timer和ScheduledThreadPoolExecutor DelayQueue定时调度中使用
  • springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区

    springboot集成kafka实战项目 kafka生产者 消费者 创建topic 指定消费分区 前言 本项目代码可直接集成到你现有的springboot项目中 功能包括 1 kafka生产者配置 2 kafka消费者配置 指定分区消费
  • Kafka : KafkaProducer Closing the kafka producer with timeoutMillis

    1 美图 2 背景 一段kafka写入程序 不晓得为啥突然发现很多奇怪的日志 kafka 多线程发送数据 然后在本地是可以的 在服务器上是偶现的 我写了一个本地程序多线程生产数据 发现是没有问题的 Test public void mult
  • kafka 监控工具--CMAK

    CMAK previously known as Kafka Manager is a tool for managing Apache Kafka clusters See below for details about the name
  • [分布式] zookeeper集群与kafka集群

    目录 一 Zookeeper 概述 1 1 Zookeeper定义 1 2 Zookeeper 工作机制 1 3 Zookeeper 特点 1 4 Zookeeper 数据结构 1 5 Zookeeper 应用场景 1 6 Zookeepe
  • Kafka性能保证和延时队列实现原理

    数据不丢不漏 不重不错 一 不丢 生产写入消息不丢失 数据组织形式 首先 从数据组织形式来说 kafka有三层形式 kafka有多个主题 topic 每个主题有多个分区 分区分为主分区和副本分区 每个分区又有多条消息 而每个分区可以分布到不
  • Kafka——Mac搭建kafka环境

    1 下载Kafka安装包 下载地址 将压缩包移动到 usr local mv kafka 2 12 3 1 0 tgz usr local 解压 tar zxvf kafka 2 12 3 1 0 tgz 2 启动 启动zookeeper

随机推荐

  • ‘cmake' 不是内部或外部命令 也不是可运行的程序 或批处理文

    在 Win7下的命令行模式下 输入cmake相关命令 出现如下错误 cmake 不是内部或外部命令 也不是可运行的程序 或批处理文件 解决方法 在环境变量中添加cmake的文件路径 计算机 右键 属性 高级系统设置 高级 环境变量 系统变量
  • pytorch 多GPU训练总结(DataParallel的使用)

    参考 主页 PyTorch中文文档 前言 博主最近搭建网络的时候 需要调用不同的GPU 实现训练的加速 有时间会出现显卡现存分布不均的情况 有时间有的显卡温度特别高 博客持续更新 一更 2022 09 01 DP模式见本文 使用最少的代码实
  • Go语言上手-实战案例(1)

    猜谜游戏 在这个游戏里面 程序首先会生成一个介于1 100之间的随机整数 然后提示玩家进行猜测 玩家每次输入一个数字 程序就会告诉玩家这个猜测的值是高于还是低于那个秘密的随机数 并且让玩家再次猜测 如果猜对了就告诉玩家胜利并且退出程序 生成
  • 搜索神器Everything的功能技巧(非NTFS文件搜索,FTP/HTTP服务)

    Everything这个搜索神器估计大家都听过 磁盘上的任何文件只要输入后基本就是秒搜 但Everything除了搜索 还自带了一些好用的功能 1 添加非NTFS格式的驱动器索引 默认Everything只会索引查询本地的NTFS格式磁盘
  • Linux云计算命令大全

    云计算命令总结 一 系统命令精讲 二 目录和文件管理 三 安装及管理程序 四 账号管理 五 权限及归属管理 六 磁盘管理 七 文件系统与LVM 八 服务器RAID及配置实战 九 引导过程与服务控制 十 进程和计划任务管理 十一 系统安全及应
  • 【linux】nginx: [emerg] the “ssl“ parameter requires ngx_http_ssl_module

    1 概述 我使用 Linux centos8 安装nginx详细步骤 这个安装了一个nginx 然后启动如下 root zdh2 nginx 1 18 0 sudo usr local nginx sbin nginx c usr
  • class与prototype

    创建实例对象 ES5中常用的构造函数模式 function Person name this name name this getName function return this name ES6 通过class定义类 class Per
  • selenium处理登陆爬虫(维持登陆状态请求页面)

    selenium在处理需要登陆的时候 需要修改浏览器请求头参数cookie或token 在请求需要登陆的页面时 添加参数 跳过登陆 直接获取登陆后的内容 直接在driver对象内添加cookie参数绕开登陆 处理思路 浏览器先登陆 请求同一
  • umi如何实现鉴权

    什么是jwt鉴权 JWT JSON Web Token 本质就是一个字符串书写规范 作用是用来在用户和服务器之间传递安全可靠的信息 在目前前后端分离的开发过程中 使用token鉴权机制用于身份验证是最常见的方案 流程如下 服务器当验证用户账
  • 2.查询分离:表数据量大读写缓慢如何优化?

    查询分离 表数据量大读写缓慢如何优化 01 讲中我们提到过 冷热分离解决方案的性价比高 但它并不是一个最优的方案 仍然存在诸多不足 比如 查询冷数据慢 业务无法再修改冷数据 冷数据多到一定程度系统依旧扛不住 我们如果想把这些问题一一解决掉
  • 如何设置电脑永不熄屏

    1 win q调出搜索框 输入系统 点击系统 2 电源和睡眠 两个选项改为从不 如果是虚拟机 设置永不熄屏的方法 设置
  • 猿创征文

    文章目录 1 PolarDB X是什么 2 PolarDB X架构 3 PolarDB X架构优势 4 PolarDB X核心特性 5 PolarDB X部署 5 1 通过PXD部署集群 5 2 通过 K8S 部署 5 3 通过编译安装 1
  • 【Mybatis-puls 】返回map下划线自动转成驼峰

    文章目录 问题描述 1 yml配置解决方案 错误分析 解决方案 转换器代码 ConfigurationPropertiesBinding的作用 2 通过Java配置bean解决 觉得第一种麻烦的直接用第二种 问题描述 VO实体类自动转换驼峰
  • 使用plsql工具查看oracle中的blob字段的可视化值

    SELECT utl raw cast to varchar2 dbms lob substr t detailsql from Voucher t
  • 每天一个设计模式——装饰模式(C++实现)

    设计模式的代码十分难写的 要充分的体现可复用性 网上有着大量关于设计模式的代码 其中很多的代码违背了很多设计原则 比如依赖倒置原则 开放封闭原则 需要我们明辨是非 设计模式的原则大于使用哪个设计模式 类的组合关系也大于类的继承 通过不断的写
  • ECCV 2022

    作者 机器之心编辑部 来源 机器之心 如何将现有的图像 文本多模态大模型 例如 OpenAI CLIP 用于视频内容理解 是一个非常实用且具有前景的研究课题 它不仅可以充分挖掘图像大模型的潜力 还可以为视频大模型的设计和研究铺平道路 在视频
  • 年轻人还记得KCP吗?什么是KCP,怎么使用呢!!!

    一 什么是KCP KCP是一种网络传输协议 A Fast and Reliable ARQ Protocol 可以视它为TCP的代替品 但是它运行于用户空间 它不管底层的发送与接收 只是个纯算法实现可靠传输 它的特点是牺牲带宽来降低延迟 因
  • C51单片机学习笔记(二)——花样流水灯的实现

    C51单片机学习笔记 二 花样流水灯的实现 文章目录 C51单片机学习笔记 二 花样流水灯的实现 1 单片机引脚 晶振 复位的作用 2 流水灯原理图 3 单片机的周期 4 延时函数的编写 5 使用 位操作 控制流水灯 6 使用字节控制 并行
  • lenovo联想笔记本ThinkBook 14 Gen5+ IRH(21HW)原装Win11系统镜像原厂OEM恢复出厂状态

    LENOVO联想笔记本电脑 ThinkBook 14 Gen5 IRH 21HW 原厂Windows11原装OEM系统 恢复出厂时状态系统 系统自带所有驱动 出厂主题壁纸LOGO Office办公软件 联想电脑管家等预装程序 所需要工具 1
  • java客户端作为kafka消费者测试

    README 本文主要对 java客户端作为kafka 消费者进行测试 生产者由 kafka客户端扮演 1 普通消费者 设置消费者组 重置消费者的offset 即每次都从最头开始消费 默认仅保持7天内数据 类似于 命令行 from begi