具有状态存储的 Kafka Streams - 应用程序重新启动时重新处理消息

2023-12-31

我们有以下带有两个变压器的拓扑,每个变压器都使用持久状态存储:

kStreamBuilder.stream(inboundTopicName)
    .transform(() -> new FirstTransformer(FIRST_STATE_STORE), FIRST_STATE_STORE)
    .map((key, value) -> ...)
    .transform(() -> new SecondTransformer(SECOND_STATE_STORE), SECOND_STATE_STORE)
    .to(outboundTopicName);

和卡夫卡设置有auto.offset.reset: latest。应用程序启动后,我看到创建了两个内部压缩主题(这是预期的):appId_inbound_firstStateStore-changelog and appId_inbound_secondStateStore-changelog

我们的应用程序停机了两天,在我们再次启动应用程序后,从头开始重新处理特定分区的消息(但我们有多个分区)。 我知道对于版本 2 之前的 kafka 代理来说,提交的偏移量会存储大约 1 天,因此我们的偏移量应该通过保留来清理。但是如果我们使用的话为什么消息会从一开始就被重新处理auto.offset.reset: latest?也许它在某种程度上与有状态操作或变更日志内部主题有关。

我看到以下日志(其中大多数都重复多次):

StoreChangelogReader Restoring task 0_55's state store firstStateStore from beginning of the changelog
Fetcher [Consumer clientId=xxx-restore-consumer, groupId=] Resetting offset for partition xxx-55 to offset 0
ConsumerCoordinator Setting newly assigned partitions
ConsumerCoordinator Revoking previously assigned partitions
StreamsPartitionAssignor Assigned tasks to clients
AbstractCoordinator Successfully joined group with generation
StreamThread partition revocation took xxx ms
Unsubscribed all topics or patterns and assigned partitions
AbstractCoordinator (Re-)joining group
Attempt to heartbeat failed since group is rebalancing
AbstractCoordinator Group coordinator xxx:9092 (id: xxx rack: null) is unavailable or invalid, will attempt rediscovery
FetchSessionHandler - [Consumer clientId=xxx-restore-consumer, groupId=] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: org.apache.kafka.common.errors.DisconnectException

卡夫卡经纪人版本0.11.0.2;卡夫卡流版本2.1.0


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有状态存储的 Kafka Streams - 应用程序重新启动时重新处理消息 的相关文章

随机推荐

  • 上传到 Google Play 后,谷歌地图不显示

    我将我的应用程序上传到谷歌游戏商店 但地图不起作用 只显示白屏 我知道我必须根据发布证书指纹获取密钥 我已尝试阅读所有相关内容 并执行了以下操作 但它仍然不起作用 我从 Android Studio 创建了一个新的密钥库 带有密码 别名和密
  • 内存映射文件偏移低

    我正在用 C 和 Visual Studio 编写 Windows 程序 我必须映射一个文件而不是从它的第 750 个字节访问它 我试过 pFile char MapViewOfFile hMMap FILE MAP ALL ACCESS
  • 如何使用scrapy抓取javascript实现的多页数据

    我想用scrapy从网页中爬取数据 但是从url上看不出不同页面之间的区别 例如 上面的url是我要抓取数据的第一个页面 很容易从中获取数据 这是我的代码 author Rabbit from scrapy spiders import S
  • 使用 jQuery Mobile 的动态页面

    我已经使用 jQuery 有一段时间了 并且迈出了使用 jQuery Mobile 的第一步 我使用index html作为我的应用程序的jQuery Mobile和设计 它在加载后立即调用content php 所有页面的列表视图 中的内
  • 在 C++ 的动态内存分配(堆)中,“删除”运算符实际上是如何在幕后工作的?

    我不明白 删除 运算符在 C 中是如何在幕后实际实现的 例如 class Node int i Node left right int main Node a new Node somehow the object a is initial
  • 合并两个 Git 存储库而不破坏文件历史记录

    我需要将两个 Git 存储库合并到一个全新的第三个存储库中 我发现了许多关于如何使用子树合并来执行此操作的描述 例如雅库布 纳伦布斯基的回答 https stackoverflow com a 1426163 on 如何合并两个 Git 存
  • 跟踪视频中的眼睛瞳孔

    我正在开展一个项目 旨在追踪眼睛瞳孔 为此 我制作了一个头戴式系统来捕获眼睛的图像 硬件部分我已经完成了在软件中被击中部分 我在用opencv 请让我知道跟踪瞳孔最有效的方法是什么 霍夫圆表现不佳 之后我也尝试过HSV过滤器 这是代码和 链
  • 在 onStop 之前隐藏视图?

    当用户按下主页按钮时 调用 onStop 方法 系统会截取屏幕截图 当用户按住主页按钮 在 Android 手机上 时 可以在打开的应用程序中看到屏幕截图 我的问题是如何防止任何可以拿起手机并按住主页按钮的人看到敏感数据 想象一下 应用程序
  • 它是可调用损失函数(以函数的形式)的 TensorFlow 最佳实践吗?除了 Eager Execution 兼容性之外,还有其他优点吗?

    热切执行要求传递给任何优化器的任何损失都可以被调用 即以函数的形式 所以这没问题 def loss function return tf reduce mean tf nn sampled softmax loss weights soft
  • glibc的写入是如何工作的?

    我尝试编译一个简单的程序 名为write with nostdlib 但我收到错误 path to file 3 undefined reference to write 我想write是 Unix 的东西并且一直存在 但显然不是 事实证明
  • 解析一个数字但保留负数

    我正在尝试将数字取消格式化为其原始形式 但保留它是否为负数 堆栈溢出上的某人引导我找到了这段代码 该代码工作得非常好 但它没有保留负数 有人能帮我更好地解决这个问题吗 EDIT 对于美元货币 普通数字 Example 1 234 1234
  • 如何更新 OpenJDK 的时区信息?

    如何更新 OpenJDK 的时区信息 Oracle 推出了 tzupdater 但它受到他们的许可证的约束 所以我不想使用它 我正在寻找一个开源替代方案 它允许我只更新时区信息而不是整个 JRE Azul 最近发布了一个开源工具来更新 TZ
  • 用随机数据填充表

    我有如下两张表 区域 表 AreaKey AreaID
  • 以编程方式将 NSScrollView 滚动到右侧

    一切都在标题中 我想以编程方式滚动NSScrollView向右 这样我就可以看到文档的结尾 我试过这个 let width scrollView frame size width let height scrollView frame si
  • 无法运行“phonegap run android”,抛出异常

    我想开始使用 Phonegap 开发东西 我按照他们网站上的说明进行操作 http phonegap com install http phonegap com install 当我执行 phonegap run android 时 它给了
  • 避免PHP执行时间限制

    我需要用 PHP 语言创建一个脚本来执行数字排列 但 PHP 的执行时间限制设置为 60 秒 我怎样才能运行脚本 以便在需要运行超过60个sesunde时 不被服务器中断 我知道我可以更改 php 中的最大执行时间限制 但我想听到另一个不需
  • 获取 woocommerce 子类别产品

    我正在尝试让 woocommerce 子类别中的产品显示在主要类别下 ul class wsubcategs li a href a li ul
  • 使用 ruby​​ 加密数据,使用 Node 解密

    我想在 ruby 应用程序中加密一些数据 然后在 nodejs 应用程序中对其进行解码 我一直在尝试让它发挥作用 现在我只是尝试用两种语言加密同一段数据以获得相同的结果 但我似乎无法做到这一点 js var crypto require c
  • 在 Log4j2 中扩展 PatternLayout

    自从 Log4J2 以来org apache logging log4j core layout PatternLayout班级是final 我无法扩展它来为我的创建标头CSV 我引用了文档 它没有提供有关如何扩展现有布局的信息 http
  • 具有状态存储的 Kafka Streams - 应用程序重新启动时重新处理消息

    我们有以下带有两个变压器的拓扑 每个变压器都使用持久状态存储 kStreamBuilder stream inboundTopicName transform gt new FirstTransformer FIRST STATE STOR