如何暂停kafka消费者?

2023-12-27

我在我的框架中使用 Kafka 生产者-消费者模型。消费者端消费的记录随后被索引到elasticsearch上。这里我有一个用例,如果 ES 关闭,我将不得不暂停 kafka 消费者,直到 ES 启动,一旦启动,我需要恢复消费者并使用我上次离开的位置的记录。 我认为@KafkaListener 无法实现这一点。谁能给我一个解决方案吗?我发现我需要为此编写自己的 KafkaListenerContainer,但我无法正确实现它。任何帮助将非常感激。


有几种方法可以实现这一目标。

方法#1

创建您的KafkaConsumer线程内的对象并运行无限while循环消耗事件。

一旦完成此设置,您就可以中断线程并在while循环,检查是否Thread.interrupt() is true。如果是,则跳出循环并关闭消费者。

完成恢复活动后,使用相同的组 ID 重新创建使用者。请注意,这可能会重新平衡消费者。

如果你使用 python,同样的事情可以使用线程来实现stop_event.

方法#2

使用 KafkaConumer APIpause(partitions_list)功能。它接受 Kafka 分区作为输入。因此,提取分配给消费者的所有部分并将这些部分传递给pause(partitions_list)功能。消费者将停止从这些分区中提取数据。

经过一定时间后,您可以使用resume(partitions_list)函数来恢复消费者。此方法不会重新平衡消费者。

注意:如果您使用的是 Spring Kafka 客户端。这变得容易多了。您可以启动/停止消息侦听器容器。

你可以找到详细的解释here https://bikas-katwal.medium.com/start-stop-kafka-consumers-or-subscribe-to-new-topic-programmatically-using-spring-kafka-2d4fb77c9117.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何暂停kafka消费者? 的相关文章

  • 如何在谷歌地图android上显示多个标记

    我想在谷歌地图android上显示带有多个标记的位置 问题是当我运行我的应用程序时 它只显示一个位置 标记 这是我的代码 public class koordinatTask extends AsyncTask
  • 在我的 Spring Boot 示例中无法打开版本 3 中的 Swagger UI

    我在 Spring Boot 示例中打开 swagger ui 时遇到问题 当我访问 localhost 8080 swagger ui 或 localhost 8080 root api name swagger ui 时出现这种错误 S
  • Java 和 Python 可以在同一个应用程序中共存吗?

    我需要一个 Java 实例直接从 Python 实例数据存储中获取数据 我不知道这是否可能 数据存储是否透明 唯一 或者每个实例 如果它们确实可以共存 都有其单独的数据存储 总结一下 Java 应用程序如何从 Python 应用程序的数据存
  • logcat 中 mSecurityInputMethodService 为 null

    我写了一点android应显示智能手机当前位置 最后已知位置 的应用程序 尽管我复制了示例代码 并尝试了其他几种解决方案 但似乎每次都有相同的错误 我的应用程序由一个按钮组成 按下按钮应该log经度和纬度 但仅对数 mSecurityInp
  • java for windows 中的文件图标叠加

    我正在尝试像 Tortoise SVN 或 Dropbox 一样在文件和文件夹上实现图标叠加 我在网上查了很多资料 但没有找到Java的解决方案 Can anyone help me with this 很抱歉确认您的担忧 但这无法在 Ja
  • 使用 AsyncTask 传递值

    我一直在努力解决这个问题 但我已经到了不知道该怎么办的地步 我想做的是使用一个类下载文件并将其解析为字符串 然后将该字符串发送到另一个类来解析 JSON 内容 所有部件都可以单独工作 并且我已经单独测试了所有部件 我只是不知道如何将值发送到
  • 如何使用 jUnit 将测试用例添加到套件中?

    我有 2 个测试类 都扩展了TestCase 每个类都包含一堆针对我的程序运行的单独测试 如何将这两个类 以及它们拥有的所有测试 作为同一套件的一部分执行 我正在使用 jUnit 4 8 在 jUnit4 中你有这样的东西 RunWith
  • Eclipse 启动时崩溃;退出代码=13

    I am trying to work with Eclipse Helios on my x64 machine Im pretty sure now that this problem could occur with any ecli
  • 我如何在java中读取二进制数据文件

    因此 我正在为学校做一个项目 我需要读取二进制数据文件并使用它来生成角色的统计数据 例如力量和智慧 它的设置是让前 8 位组成一个统计数据 我想知道执行此操作的实际语法是什么 是不是就像读文本文件一样 这样 File file new Fi
  • 找不到符号 NOTIFICATION_SERVICE?

    package com test app import android app Notification import android app NotificationManager import android app PendingIn
  • 创建一个 JSON 对象以在 Spring Boot 测试中发布

    我想编写基本测试来使用 JSON 负载在 users URL 上执行 POST 请求来创建用户 我找不到如何将新对象转换为 JSON 到目前为止有这么多 这显然是错误的 但解释了目的 Test public void createUser
  • Java中super关键字的范围和使用

    为什么无法使用 super 关键字访问父类变量 使用以下代码 输出为 feline cougar c c class Feline public String type f public Feline System out print fe
  • Java:带注释的注释(和传递值)

    我有一个注释 其中包含其他几个注释 与此处的注释非常相似 Component Spring Component Interface OsgiService boolean isFactory 意味着所有带有注释的类 OsgiService也
  • Visual Studio Code:运行 Maven 时无法识别环境变量

    我正在尝试为 Maven 重新创建 Eclipse 运行配置 我在launch json中添加了环境变量 env environment test applicationname asset misc api log level debug
  • jTidy 漂亮的打印自定义 HTML 标签

    我正在尝试使用 JTidy 来漂亮地打印用户生成的格式良好的 HTML div class component holder ng binding ng scope ui draggable ui draggable handle div
  • Java中的对象池模式

    所以我实现了自己的对象池模式 它工作得很好并且符合预期 从列表中返回我的 老师 对象 并在没有对象时创建它们 我的问题 返回的对象 Teacher 然后需要被转换为它的专门子类之一 例如 生物老师 获得这种功能的最佳方法是什么 编辑 抱歉
  • Java“tail -f”包装器

    我需要将 Unix 命令 tail f 包装在 BufferedInputStream 中 我不想模拟或模仿尾巴 如所述这个问题 https stackoverflow com questions 557844 java io implem
  • 始终保持 TreeSet 中可变对象的排序

    我注意到 如果稍后更改对象属性值 TreeSet 不会按排序顺序保留可变对象 例如 public class Wrap static TreeSet
  • Sqlite 查询检查 - 小于和大于

    return mDb query DATABASE TABLE new String KEY ROWID KEY LEVEL KEY LEVEL gt 3 lt 5 null null null null 我究竟做错了什么 它返回的值全部高
  • 使用 @Inheritance(strategy=InheritanceType.JOINED) 与实体进行 JPA 一对多关联

    大家好 我正在尝试将一对多关联映射到映射的实体 Inheritance strategy InheritanceType JOINED ManyToMany JoinTable name S MC CC CONTRATTIRAPPORTI

随机推荐