SpringBoot整合Kafka控制消费启停遇到的问题记录(@KafkaListener注解使用)

2023-05-16

最近在做一个SpringBoot整合Kafka的一个项目,需要控制Kafka客户端消费数据的停止与启动,遇到一个问题,排查下来感觉对自己有一定帮助,趁此记录一下。

配置KafkaListener进行控制

1. 配置KafkaListenerContainer工厂,禁止自启动。

@Configuration
public class KafkaConfiguration {

  private static Logger logger = LoggerFactory.getLogger(KafkaConfiguration.class);

  @Autowired
  private ConsumerFactory consumerFactory;


  // 监听器容器工厂(设置禁止KafkaListener自启动)
  @Bean
  public ConcurrentKafkaListenerContainerFactory delayContainerFactorys() {
    ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
    container.setConsumerFactory(consumerFactory);
    //禁止KafkaListener自启动
    container.setAutoStartup(false);
    return container;
  }
}

2. 配置@KafkaListener,带上之前配置的容器工厂类

@KafkaListener(id="test_cep_consumer", topics = "test_cep", errorHandler="consumerAwareErrorHandler", containerFactory = "delayContainerFactorys")
public void onMessage(List<Event> events) throws Exception {
  logger.info("批量消费一次...");
}

3.  在启动方法里,控制启动消费

@PostConstruct
public void startUp() {

    ...
    consumer.startConsume();
    ...
}

/**
 * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
 * 而是会被注册在KafkaListenerEndpointRegistry中,
 * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
**/
@Autowired
private KafkaListenerEndpointRegistry registry;

public void startConsume() {
  // Start consume Kafka
  if (!registry.getListenerContainer("test_cep_consumer").isRunning()) {
    registry.getListenerContainer("test_cep_consumer").start();
  }
}

实际运行时会发现registry.getListenerContainer("test_cep_consumer")取出来的是null。

排查

首先通过查看KafkaListenerEndpointRegistry源码发现,它是使用如下方法进行注册的。

/**
 * Create a message listener container for the given {@link KafkaListenerEndpoint}.
 * <p>This create the necessary infrastructure to honor that endpoint
 * with regards to its configuration.
 * @param endpoint the endpoint to add
 * @param factory the listener factory to use
 * @see #registerListenerContainer(KafkaListenerEndpoint, KafkaListenerContainerFactory, boolean)
*/
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
  registerListenerContainer(endpoint, factory, false);
}

那么它又是在哪里被调用的呢?通过搜索阅读代码发现KafkaListenerEndpointRegistrar这个类进行了真正注册,而这个类实现了Spring的InitializingBean接口,它的执行期即调用afterPropertiesSet()方法比@PostConstruct注解的方法晚,所以在调用@PostConstruct注解的方法时其尚未成功注册Listener进去。

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
  @Override
  public void afterPropertiesSet() {
    registerAllEndpoints();
  }

  protected void registerAllEndpoints() {
	synchronized (this.endpointDescriptors) {
	  for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
		this.endpointRegistry.registerListenerContainer(
			descriptor.endpoint, resolveContainerFactory(descriptor));
	  }
	this.startImmediately = true;  // trigger immediate startup
    }
  }
}

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot整合Kafka控制消费启停遇到的问题记录(@KafkaListener注解使用) 的相关文章

随机推荐

  • docker--最全的网络类型(7类)

    目录 1bridge桥接模式 xff08 默认 xff09 2 host 3 overlay 4 ipvlan 5 macvlan 6 none 7 container 官网有六类 xff1a Networking overview Doc
  • flex 国际化

    Flex 开发 xff08 国际化 xff09 在Flex4 之前默认只支持en US ja JP 这两种本地化 xff0c 因此如果想在Flex 中支持中文或者其他语言时 xff0c 需要额外的操作 xff1a 1 首先添加新的本地化支持
  • 使用mysqladmin命令来修改mysql的root密码

    一般mysql的root默认密码为空 xff0c 如果你之前并没有设置过root密码就使用mysqladmin命令 xff0c 你可以使用如下mysqladmin命令来修改root密码 1 2 3 mysqladmin u root p p
  • CCF化学方程式的配平

    include lt iostream gt include lt string gt include lt cctype gt include lt unordered map gt using namespace std unorder
  • CCF 201909-4 推荐系统

    include lt cstdio gt include lt set gt include lt unordered map gt include lt algorithm gt using namespace std typedef l
  • CCF 201903-4 消息传递接口

  • Docker学习笔记(一)

    Docker学习笔记 xff08 一 xff09 什么是Docker Docker 使用 Google 公司推出的 Go 语言 进行开发实现 xff0c 基于 Linux 内核的 cgroup xff0c namespace xff0c 以
  • Docker学习笔记(二)

    Docker镜像 Docker 镜像是一个特殊的文件系统 xff0c 除了提供容器运行时所需的程序 库 资源 配置等文件外 xff0c 还包含了一些为运行时准备的一些配置参数 xff08 如匿名卷 环境变量 用户等 xff09 Docker
  • Java多线程里共享变量线程安全问题的原因

    Java多线程里共享变量线程安全问题的原因 Java多线程里对于共享变量的操作往往需要考虑进行一定的同步互斥操作 xff0c 原来是因为Java内存模型导致的共享内存对于线程不可见 Java 内存模型规定 xff0c 将所有的变量都存放在主
  • 重构-改善既有代码的设计读书笔记一

    重构 定义 为何重构 改进软件设计 使软件更容易理解 帮助找到Bug提高编程速度 何时重构 添加功能修改错误复审 总而言之 xff0c 当你觉得代码的可读性 可维护性 可修改性到达一定难以接受的程度 xff0c 就可以开始考虑是否可以使用重
  • Spring文档学习笔记一

    Spring文档学习笔记一 目录 Spring文档学习笔记一 Spring的宗旨 主要特征 几个核心理念 IoC 依赖解析过程 Spring循环依赖的解决方式 更详细的得估计得看Spring源码 1 4 2 Dependencies and
  • python数据结构算法DAY2| 快速排序

    目录 快速排序 xff08 quick sort xff09 1 什么是快速排序 2 快速排序思路 3 快速排序代码 4快速排序复杂度 5 快速排序函数与冒泡排序的效率比较 6 快速排序的缺点 解决办法 xff1a 快速排序 xff08 q
  • Go里w http.ResponseWriter,调用w.Write()方法报错

    Go里w http ResponseWriter写入报错http request method or response status code does not allow 1 下面是报错截图 2 点进去Write方法 它首先是一个接口 x
  • CCF 202012-3 带配额的文件系统 练习

    大模拟 xff0c 没涉及什么算法主要是数据结构的设计 细节的考虑 xff0c 挺锻炼的 xff0c 记录一下 xff0c 代码加了注释 include lt iostream gt include lt string gt include
  • 多接口继承和多层抽象类设计理解

    多接口继承和多层抽象类设计理解 以JDK集合List框架为例有感 以后可能又会有新的理解 xff0c 先记录一下 设计得好的接口一般也要遵循单一职责原则 xff0c 最上层的接口一般属于独立的 xff0c 不再有依赖的 xff0c 如Ite
  • 202012-5 星际旅行 (线段树模板60分)记录一下

    include lt bits stdc 43 43 h gt using namespace std typedef long long ll const int maxn 61 1e5 43 5 const ll MOD 61 1e9
  • 联机象棋(1)

    联机象棋 xff08 1 xff09 需求架构与开发技术主要设计与实现1 棋盘 棋子布局2 选棋 下棋3 人人对战匹配4 判断是否被将5 通信模块6 其他如声音效果等提升用户体验7 人机对战 尚未实现 8 最终实现效果图 需求 登录 注册
  • AbstractQueuedSynchronizer源码阅读(1)(AQS JDK1.8)

    AbstractQueuedSynchronizer 前言AbstractQueuedSynchronizer xff08 1 xff09 JDK 1 8 用途主要源码分析Node内部类ConditionObject类重要方法 主要的属性及
  • ReentrantLock源码阅读(1)(JDK1.8)

    ReentrantLock 前言ReentrantLock JDK 1 8 实现了Lock接口Sync类NonfairSync类FairSync类重要属性和方法 总结 前言 最近在使用Java 并发包时遇到一些问题 xff0c 感觉对于其还
  • SpringBoot整合Kafka控制消费启停遇到的问题记录(@KafkaListener注解使用)

    最近在做一个SpringBoot整合Kafka的一个项目 xff0c 需要控制Kafka客户端消费数据的停止与启动 xff0c 遇到一个问题 xff0c 排查下来感觉对自己有一定帮助 xff0c 趁此记录一下 配置KafkaListener