最近在做一个SpringBoot整合Kafka的一个项目,需要控制Kafka客户端消费数据的停止与启动,遇到一个问题,排查下来感觉对自己有一定帮助,趁此记录一下。
配置KafkaListener进行控制
1. 配置KafkaListenerContainer工厂,禁止自启动。
@Configuration
public class KafkaConfiguration {
private static Logger logger = LoggerFactory.getLogger(KafkaConfiguration.class);
@Autowired
private ConsumerFactory consumerFactory;
// 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactorys() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止KafkaListener自启动
container.setAutoStartup(false);
return container;
}
}
2. 配置@KafkaListener,带上之前配置的容器工厂类
@KafkaListener(id="test_cep_consumer", topics = "test_cep", errorHandler="consumerAwareErrorHandler", containerFactory = "delayContainerFactorys")
public void onMessage(List<Event> events) throws Exception {
logger.info("批量消费一次...");
}
3. 在启动方法里,控制启动消费
@PostConstruct
public void startUp() {
...
consumer.startConsume();
...
}
/**
* @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
* 而是会被注册在KafkaListenerEndpointRegistry中,
* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
**/
@Autowired
private KafkaListenerEndpointRegistry registry;
public void startConsume() {
// Start consume Kafka
if (!registry.getListenerContainer("test_cep_consumer").isRunning()) {
registry.getListenerContainer("test_cep_consumer").start();
}
}
实际运行时会发现registry.getListenerContainer("test_cep_consumer")取出来的是null。
排查
首先通过查看KafkaListenerEndpointRegistry源码发现,它是使用如下方法进行注册的。
/**
* Create a message listener container for the given {@link KafkaListenerEndpoint}.
* <p>This create the necessary infrastructure to honor that endpoint
* with regards to its configuration.
* @param endpoint the endpoint to add
* @param factory the listener factory to use
* @see #registerListenerContainer(KafkaListenerEndpoint, KafkaListenerContainerFactory, boolean)
*/
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
registerListenerContainer(endpoint, factory, false);
}
那么它又是在哪里被调用的呢?通过搜索阅读代码发现KafkaListenerEndpointRegistrar这个类进行了真正注册,而这个类实现了Spring的InitializingBean接口,它的执行期即调用afterPropertiesSet()方法比@PostConstruct注解的方法晚,所以在调用@PostConstruct注解的方法时其尚未成功注册Listener进去。
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)