带有 Kafka 消费者的 Spring Boot 作业调度程序

2024-04-21

我正在开发一个 POC,我想使用来自 Kafka 主题“用户”的消息。尝试实现消费者应该从 Kafka 主题读取消息,一旦 spring boot 调度程序在预定时间或 cron 时间触发,那么我们应该开始从 kafka 主题中一一消费现有消息并处理这些消息,当消费完所有消息后,kafka 消费者应该停止。调度程序应在 cron 时间触发并再次启动该进程。

尽管我正在努力确定如何调用,但我已尝试以下方法来实现此目的消费(字符串消息)方法从我的调度程序方法调度消息消费Kakfa以及任何我们有更好的结构来在 Spring Boot 调度程序中使用来自 Kafka 的消息并在实现 tasklet 接口的类中编写 kafka 消费者方法等的示例。感谢任何建议。

@Configuration
@Service
public class SchedularMsgConsumeKafkaController2 {

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @KafkaListener(topics = "users", id = "full-part-id", containerGroup = "full-part-group", autoStartup = "false")
    public void consume(String message) throws IOException, InterruptedException {
        System.out.println(String.format("#### -> Consumed message -> %s", message));
        System.out.println(String.format("Really happy"));
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
        listenerContainer.stop();
    }

    @Scheduled(fixedDelay = 30000, initialDelay = 15000)
    public void schedularMsgConsumeKakfa() throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
        listenerContainer.start();
    }

}

首先@Scheduled不是 Spring Boot 功能。它是 Spring 框架原生的:https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#scheduling https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#scheduling.

第二:@KafkaListener是 Spring for Apache Kafka 项目的一部分:https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-listener-annotation https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-listener-annotation.

因此,这两个功能都可以在 Spring Boot 框架之外使用。我知道这与问题无关,但最好用正确的名称来称呼事物。

的混合@KafkaListener & @Scheduled这不是一个很好的解决方案,因为您混合了许多不兼容的问题。

最好寻找一些不需要处理的不同解决方案start()/stop()并且不会因为额外的线程而造成环境开销。

考虑研究 Spring Integration 及其KafkaMessageSource执行:https://docs.spring.io/spring-integration/reference/html/kafka.html#kafka-inbound-pollable https://docs.spring.io/spring-integration/reference/html/kafka.html#kafka-inbound-pollable。这对你来说可能是新的东西,但这是值得的,因为你不会自己做太多的事情。

我所说的“你自己”是指使用ConsumerFactoryAPI及调用KafkaConsumer.poll()手动从您的@Scheduled.

可能没有这样的样本@Scheduled解决方案,因为它确实足够复杂。并且没有样本KafkaMessageSource在 Spring Integration 中,因为它非常简单,只需配置适当的通道适配器即可。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

带有 Kafka 消费者的 Spring Boot 作业调度程序 的相关文章

随机推荐

  • Log4J 仅将一个类附加到附加程序

    我需要定期轮询正在运行的应用程序的 JVM 内存统计信息 我正在运行一个服务来执行此操作并将统计信息写入根记录器 我对根记录器的使用与否没有太多控制权 我想要做的是将这些日志消息路由到单个附加程序 该附加程序应该只处理来自该类的日志消息 而
  • 如何在 python apache beam 中展平多个 Pcollection

    应该如何实现位于以下位置的以下逻辑 https beam apache org documentation pipelines design your pipeline https beam apache org documentation
  • 如何快速将 pandas 数据框行转换为ordereddict

    寻找一种快速方法将 pandas 数据框中的行放入有序字典中 而不使用列表 列表很好 但对于大数据集将花费很长时间 我正在使用 fiona GIS 阅读器 行是有序字典 其模式给出数据类型 我使用 pandas 来连接数据 在很多情况下 行
  • SetTimeout() 不会执行该函数

    这是我的代码片段 in VBScript Sub Main Dim timeoutTimer more scripts here more scripts here more scripts here timeoutTimer window
  • Angular js - 幻灯片视图但不是主页 - ng-animate

    我在用着ng动画滑动应用程序视图 因此每个路线都会滑动自己的视图 这是我的简单代码 html div class slide div css Animations slide left 0 slide ng enter transition
  • 在 JavaScript 中从 Base64 字符串创建 BLOB

    我在字符串中有 Base64 编码的二进制数据 const contentType image png const b64Data iVBORw0KGgoAAAANSUhEUgAAAAUAAAAFCAYAAACNbyblAAAAHElEQV
  • 从 webview 获取用户触摸的元素

    大家好 我正在尝试获取用户在 web 视图中触摸的 html 元素 例如 该场景是用户触摸 Web 视图中的某种按钮 应用程序显示该按钮的 html 代码 如下所示 a href index html a 我已经能够获取用户正在浏览的 ht
  • 为什么我在显式调用构造函数时无法引用实例方法?

    有谁知道为什么你可以参考static构造函数第一行中的方法使用this or super 但不是非静态方法 考虑以下工作 public class TestWorking private A a null public TestWorkin
  • 将 UTF-8 编码的转储加载到 MySQL 中

    昨天我为这个问题苦苦思索了几个小时 我在 MySQL 4 1 22 服务器上有一个数据库 编码设置为 UTF 8 Unicode utf8 如 phpMyAdmin 报告 该数据库中的表的默认字符集设置为latin2 但是 使用它的 Web
  • 如何调整 NLTK 句子标记器

    我正在使用 NLTK 来分析一些经典文本 但我在按句子标记文本时遇到了麻烦 例如 这是我从以下内容中得到的片段莫比迪克 http www gutenberg org cache epub 2701 pg2701 txt import nlt
  • 如何对计算值进行排序?

    我目前正在建立一个 NFL 选秀联盟网站 我有一个用户模型 一个游戏模型和一个连接表 用于捕获每个用户的个人选择 游戏模型具有 结果 属性 其中 W 表示获胜 L 表示失败 P 表示推动 平局 我在构建排名页面时遇到问题 目前我的用户模型中
  • 将 cURL json 数组响应转换为关联数组

    我有一个像这样的 cURL 请求 ch curl init data filter year StartTime urlencode eq 2013 and month StartTime urlencode eq 06 curl seto
  • Jenkins Slack 集成

    我想使用 Jenkins 中的 Slack 插件将通知发送到 Slack 通道 当我测试连接时 Jenkins 表示成功 但我在 Slack 频道中没有收到任何通知 是否存在任何已知问题 如何让 Jenkins 向 Slack 发送通知 我
  • 操作栏图标大小

    根据操作栏图标 https developer android com guide practices ui guidelines icon design action bar html size11mdpi 屏幕的操作栏图标应为 24 x
  • 如何使用 Slick 3.0 编写可读的嵌套连接查询

    此代码创建一个查询 用于在 Web 后端检索用户的个人资料 它创建一个查询 将必要的信息组装到 DTO 这只是一个案例类 中 随后以 JSON 形式发回 def getProfile userId Long val q for u p a
  • 从 powershell 脚本调用可执行文件(带参数)

    我正在从 powershell 调用 zip 实用程序 但很难直接获取其参数 这是代码 if not test path C Program Files x86 7 Zip 7z exe throw C Program Files x86
  • 创建嵌套 ul li 的 PHP 函数?

    我正在尝试将一个小型 CMS 附加到我正在创建的网站 不过我遇到了一个小问题 CMS 使用 PHP 函数插入菜单 这些 PHP 函数创建 HTML 我希望使用的特定函数 treemenu 创建一个嵌套的 ul li 然后可将其用于下拉菜单
  • 如何更改此 html 用户表单上的日期格式

    我有将数据输入 mysql DB 的 html 表单 但在日期的输入字段中它具有以下格式 mm dd yyyy 但我更喜欢在输入日期时使用这种格式 dd mm yyyy 任何机构都可以帮助更改格式吗 这里是 HTML 表单 p Admiti
  • IDispatchEx 存在于哪里?

    找不到包含 IDispatchEx 接口的库 我想实现这个接口 但是找不到 有谁知道它在哪里吗 谢谢 保罗 如果您想编写一个实现的托管类IDispatchEx http msdn microsoft com en us library sk
  • 带有 Kafka 消费者的 Spring Boot 作业调度程序

    我正在开发一个 POC 我想使用来自 Kafka 主题 用户 的消息 尝试实现消费者应该从 Kafka 主题读取消息 一旦 spring boot 调度程序在预定时间或 cron 时间触发 那么我们应该开始从 kafka 主题中一一消费现有