我遇到了这个对我来说似乎很奇怪的场景:
所以基本上我定义了两个@KafkaListener
在一堂课中:
@KafkaListener(id = "listener1", idIsGroup = false, topics = "data1", containerFactory = "kafkaListenerContainerFactory")
public void receive(){}
@KafkaListener(id = "listener2", idIsGroup = false, topics = "data2", containerFactory = "kafkaListenerContainerFactory2")
public void receive(){}
Their id
, topics
, containerFactory
是不同的,并且每个都依赖于不同的ConcurrentKafkaListenerContainerFactory
正如另一个类中定义的:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory("group1", "earliest"));
factory.setAutoStartup(false);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory("group2", "latest"));
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String, ConsumerRecord> consumerFactory(String groupId, String offset) {
Map<String, Object> config = new HashMap<>();
// dt is current timestamp in millisecond (epoch)
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-" + dt);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
// other config omitted
return new DefaultKafkaConsumerFactory<>(config);
}
所以我期望看到的(以及我想要实现的)是:
- 只有listener2会自动启动,因为
factory.setAutoStartup(true)
- Listener2 将以
group.id
“组2”和auto.offset.reset
“最新的”
- 稍后当listener1通过某个事件监听器启动时,它将启动
和
group.id
“组1”和auto.offset.reset
“最早的”
然而,实际上只有第一个是有保证的。 Listener2 可以从 {group2 +latest} 或 {group1+earest} 开始。后来当listener1开始使用数据时,它只会重用listener2的配置(我可以看到包含时间戳的相同组id在我的日志中打印了两次)
我的问题是,为什么listener2的组ID和偏移配置是随机选择的,而autoStartup不是随机选择的?为什么listener1会重用listener2的配置?