【README】
本文主要对 java客户端作为kafka 消费者进行测试, 生产者由 kafka客户端扮演;
【1】普通消费者
设置消费者组;
重置消费者的offset, 即每次都从最头开始消费(默认仅保持7天内数据) ;
类似于 命令行 --from-beginning
kafka-console-consumer.sh --topic first --zookeeper centos201:2181 --from-beginning
小结:从头开始消费,必须满足2个条件;
条件1: 必须重新换组, 如本文中的消费者组 从 sichuan 更新为 sichuan1 ;
条件2: 需要设置offset, 修改为 earliest, 默认值是 lastest;
/**
* 普通消费者
*/
public class MyConsumer {
public static void main(String[] args) {
/* 1.创建消费者配置信息 */
Properties props = new Properties();
/*2.给配置信息赋值*/
/*2.1连接的集群*/
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
/*2.2开启自动提交 */
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
/*2.3 自动提交的延时*/
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
/*2.4 key value的反序列化 */
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/*2.5 消费者组 */
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1");
/*2.6 重置消费者的offset */
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest
/* 创建消费者 */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/* 订阅主题 */
consumer.subscribe(Arrays.asList("first", "second"));
/* 循环拉取 */
while(true) {
/* 消费消息-获取数据 */
ConsumerRecords<String, String> consumerRds = consumer.poll(100);
/* 解析并打印 ConsumerRecords */
/* 遍历 ConsumerRecords*/
for(ConsumerRecord<String, String> rd : consumerRds) {
System.out.println("[消费者] " + rd.key() + "--" + rd.value());
}
}
/* 关闭消费者 */
// consumer.close();
}
}
从官网可以找到以上配置值; https://kafka.apache.org/0110/documentation.html#configuration
【2】kafka消费者-手动提交offset
手动提交offset有3种方式:
- 方式1:同步手动提交;
- 方式2:异步手动提交;
- 方式3:自定义手动提交策略;
0)为啥需要手动提交?
kafka自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
很多情况下我们需要从kafka成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 , 所以这时我们就需要进行手动提交kafka的offset下标。
这里顺便说下offset具体是什么。
offset:指的是kafka的topic中的每个消费组消费的下标。
简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。
1)关闭自动提交(默认为true)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
第一次启动 consumer 从 90 开始消费;
第2次启动相同 consumer ,还是从90开始消费;
2) 如何使用手动提交?
kafka提供了手动提交offset的api;
方法1:commitSync 同步提交: ;
方法2:commitAsync 异步提交;
两者相同点:都会将本次 poll 的一批数据最高的偏移量提交;
不同点是, commitSync 阻塞当前线程,一直到提交成功, 并且会自动失败重试;
而 commitAsync 没有失败重试机制, 可能提交失败;
3)同步手动提交offset
/**
* 手动同步提交offset
*/
public class ManSyncCommitOffsetConsumer {
public static void main(String[] args) {
/* 1.创建消费者配置信息 */
Properties props = new Properties();
/*2.给配置信息赋值*/
/*2.1连接的集群*/
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
/*2.2 关闭自动提交(默认为true) */
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
/*2.3 自动提交的延时*/
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
/*2.4 key value的反序列化 */
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/*2.5 消费者组 */
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1");
/*2.6 重置消费者的offset */
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest
/* 创建消费者 */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/* 订阅主题 */
consumer.subscribe(Arrays.asList("first", "second"));
/* 循环拉取 */
while(true) {
/* 消费消息-获取数据 */
ConsumerRecords<String, String> consumerRds = consumer.poll(100);
/* 解析并打印 ConsumerRecords */
/* 遍历 ConsumerRecords*/
for(ConsumerRecord<String, String> rd : consumerRds) {
System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value());
}
/* 【同步提交】,当前线程会阻塞直到 offset提交成功 */
consumer.commitSync();
}
/* 关闭消费者 */
// consumer.close();
}
}
4)异步手动提交offset
/**
* 异步手动提交offset
*/
public class ManASyncCommitOffsetConsumer {
public static void main(String[] args) {
/* 1.创建消费者配置信息 */
Properties props = new Properties();
/*2.给配置信息赋值*/
/*2.1连接的集群*/
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
/*2.2 关闭自动提交(默认为true) */
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
/*2.3 自动提交的延时*/
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
/*2.4 key value的反序列化 */
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/*2.5 消费者组 */
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1");
/*2.6 重置消费者的offset */
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest
/* 创建消费者 */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/* 订阅主题 */
consumer.subscribe(Arrays.asList("first", "second"));
/* 循环拉取 */
while(true) {
/* 消费消息-获取数据 */
ConsumerRecords<String, String> consumerRds = consumer.poll(100);
/* 解析并打印 ConsumerRecords */
/* 遍历 ConsumerRecords*/
for(ConsumerRecord<String, String> rd : consumerRds) {
System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value());
}
/* 【异步提交】 当前线程会阻塞直到 offset提交成功 */
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception !=null) {
System.out.println("异步提交失败");
} else {
System.out.println("异步提交成功");
}
}
});
}
/* 关闭消费者 */
// consumer.close();
}
}
5)自定义手动提交offset策略
5.0)为啥需要自定义?
因为异步提交有一些问题,如下:
先消费数据,后提交offset, 可能导致数据重复消费;
先提交offset, 后走业务逻辑,可能会丢数据;
5.1)应用场景:
把 offset 存储到本地库 和 消息消费逻辑 在同一个数据库事务里面;
5.2)如何实现?需要实现 ConsumerRebalanceListener 来实现。
/**
* 自定义手动提交offset策略
*/
public class DiyCommitOffsetConsumer {
public static void main(String[] args) {
/* 1.创建消费者配置信息 */
Properties props = new Properties();
/*2.给配置信息赋值*/
/*2.1连接的集群*/
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
/*2.2 关闭自动提交(默认为true) */
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
/*2.3 自动提交的延时*/
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
/*2.4 key value的反序列化 */
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/*2.5 消费者组 */
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1");
/*2.6 重置消费者的offset */
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest
/* 创建消费者 */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/* 订阅主题 */
consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 在 rebalance方法【前】调用
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 在 rebalance方法【后】调用
/* 分区分配方法 */
for (TopicPartition partition : partitions) {
/*定位到某个 offset*/
consumer.seek(partition, 1); // TODO: 这里需要输入1
}
}
});
/* 循环拉取 */
while(true) {
/* 消费消息-获取数据 */
ConsumerRecords<String, String> consumerRds = consumer.poll(100);
/* 解析并打印 ConsumerRecords */
/* 遍历 ConsumerRecords*/
for(ConsumerRecord<String, String> rd : consumerRds) {
System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value());
}
/* 【同步提交】,当前线程会阻塞直到 offset提交成功 */
consumer.commitSync();
}
/* 关闭消费者 */
// consumer.close();
}
}
补充: 消费者rebalance 是什么?
消费者 rebalance, 什么时候触发 rebalance? 如 同一个消费者组下的 某个消费者机器宕机,或新增一个消费者机器,都会触发 rebalance,即重新分配 kafka分区数据与 消费者的对应关系;