我正在开发一个 POC,我想使用来自 Kafka 主题“用户”的消息。尝试实现消费者应该从 Kafka 主题读取消息,一旦 spring boot 调度程序在预定时间或 cron 时间触发,那么我们应该开始从 kafka 主题中一一消费现有消息并处理这些消息,当消费完所有消息后,kafka 消费者应该停止。调度程序应在 cron 时间触发并再次启动该进程。
尽管我正在努力确定如何调用,但我已尝试以下方法来实现此目的消费(字符串消息)方法从我的调度程序方法调度消息消费Kakfa以及任何我们有更好的结构来在 Spring Boot 调度程序中使用来自 Kafka 的消息并在实现 tasklet 接口的类中编写 kafka 消费者方法等的示例。感谢任何建议。
@Configuration
@Service
public class SchedularMsgConsumeKafkaController2 {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@KafkaListener(topics = "users", id = "full-part-id", containerGroup = "full-part-group", autoStartup = "false")
public void consume(String message) throws IOException, InterruptedException {
System.out.println(String.format("#### -> Consumed message -> %s", message));
System.out.println(String.format("Really happy"));
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
listenerContainer.stop();
}
@Scheduled(fixedDelay = 30000, initialDelay = 15000)
public void schedularMsgConsumeKakfa() throws Exception {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
listenerContainer.start();
}
}
首先@Scheduled
不是 Spring Boot 功能。它是 Spring 框架原生的:https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#scheduling https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#scheduling.
第二:@KafkaListener
是 Spring for Apache Kafka 项目的一部分:https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-listener-annotation https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-listener-annotation.
因此,这两个功能都可以在 Spring Boot 框架之外使用。我知道这与问题无关,但最好用正确的名称来称呼事物。
的混合@KafkaListener
& @Scheduled
这不是一个很好的解决方案,因为您混合了许多不兼容的问题。
最好寻找一些不需要处理的不同解决方案start()/stop()
并且不会因为额外的线程而造成环境开销。
考虑研究 Spring Integration 及其KafkaMessageSource
执行:https://docs.spring.io/spring-integration/reference/html/kafka.html#kafka-inbound-pollable https://docs.spring.io/spring-integration/reference/html/kafka.html#kafka-inbound-pollable。这对你来说可能是新的东西,但这是值得的,因为你不会自己做太多的事情。
我所说的“你自己”是指使用ConsumerFactory
API及调用KafkaConsumer.poll()
手动从您的@Scheduled
.
可能没有这样的样本@Scheduled
解决方案,因为它确实足够复杂。并且没有样本KafkaMessageSource
在 Spring Integration 中,因为它非常简单,只需配置适当的通道适配器即可。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)