逻辑删除消息未从 KTable 状态存储中删除记录?

2023-11-24

我正在创建 KTable 处理来自 KStream 的数据。但是,当我触发具有密钥和空负载的逻辑删除消息时,它不会从 KTable 中删除消息。

sample -

public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
                .map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
                .groupByKey()
                reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));


GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);

触发具有空值的消息后,我可以在具有空负载的 testStream 映射函数中进行调试,但它不会删除 KTable 更改日志“test-store”上的记录。看起来它甚至没有达到减少方法,不确定我在这里缺少什么。

感谢对此的任何帮助!

Thanks.


如 JavaDocs 中所述reduce()

具有 {@code null} 键或值的记录将被忽略。

因为,<key,null>记录被删除,因此(genericRecord, v1) -> v1永远不会执行,不会将逻辑删除写入存储或更改日志主题。

对于您想到的用例,您需要使用指示“删除”的代理值,例如 Avro 记录中的布尔标志。你的reduce函数需要检查标志并返回null如果设置了标志;否则,必须定期处理该记录。

Update:

Apache Kafka 2.6 添加了KStream#toTable()运算符(通过KIP-523)允许变换KStream into a KTable.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

逻辑删除消息未从 KTable 状态存储中删除记录? 的相关文章

  • 现有内部主题具有无效分区

    当在只有一个 Kafka 代理的测试设置中启动我们的 Kafka Streams 应用程序时 我们大约在 15 次运行中看到以下错误 org apache kafka streams errors StreamsException Exis
  • 使用KafkaListener时,如何检查主题消息是否已读完?

    使用 KafkaListener时 如何检查主题消息是否已读完 See 这个答案 https stackoverflow com questions 55430893 how to check if kafka is empty using
  • Kafka 流处理器上下文中的周期性 NPE

    使用 kafka streams 0 10 0 0 在转发消息时 我会定期在 StreamTask 中看到空指针异常 它在 10 到 50 的调用之间变化 NPE 发生在这个方法中 public
  • 无法刷新状态存储

    我正在尝试在 Kafka Streams 中创建一个 leftJoin 它对于大约 10 条记录工作正常 然后由于以下原因导致异常崩溃 NullPointerException用这样的代码 private static KafkaStrea
  • Spring Cloud Stream Kafka - 方法必须是声明性的

    我已经使用 Spring Cloud Stream 配置了一个基于 Spring Boot 的应用程序 我正在尝试处理 KStream 但不断收到错误 java lang IllegalArgumentException 方法必须是声明性的
  • 有什么办法可以让kafka流暂停一段时间然后再恢复吗?

    我们有一个要求 即使用 Kafka Streams 从 Kafka 主题读取数据 然后通过会话池通过网络发送数据 然而 有时 网络调用有点慢 我们需要经常暂停流 以确保网络不会过载 目前 我们将数据捕获到流中并将其加载到执行器服务 然后通过
  • Kafka Streams - SerializationException:未知的魔术字节

    我正在尝试创建一个处理 Avro 记录的 Kafka Streams 应用程序 但出现以下错误 Exception in thread streams application c8031218 8de9 4d55 a5d0 81c30051
  • Spring Kafka中检测broker断开连接

    我正在尝试为我的卡夫卡消费者编写健康检查 当应用程序启动并运行时 我关闭 Kafka 我看到很多 Connection to node 1001 127 0 0 1 9092 could not be established Broker
  • 动态启动和关闭 KafkaListener 只是为了在会话开始时加载以前的消息

    我有一个让 kafkalistener 从头开始 读取消息的工作代码 offset 0 一个主题 始终运行 对于我的用例 消息传递 我需要两件事 始终捕获特定主题 分区的新消息 该消费者始终在运行 并发送到前端 websocket stom
  • 在反序列化之前根据标头过滤消息

    有时 可以在反序列化之前根据标头值过滤掉消息 使用 spring kafka 是否有针对此场景的任何现有模式 我正在考虑实现类似于 ErrorHandlingDeserializer 除了委托之外还将过滤谓词作为属性 有什么建议么 谢谢 是
  • 具有替代方案的重载方法值表

    我有编译器抱怨的以下代码 val state KTable String String builder table BARY PATH Materialized as PATH STORE 错误信息 error home developer
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • 使用 spring-kafka 2.1.0 和 SpringBoot 1.5.9 的 Kafka Consumer 上的 java.lang.NoSuchMethodError

    我正在尝试使用 SpringBoot 1 5 9 和 Spring kafka 2 1 0 设置 Kafka Consumer 然而 当我启动我的应用程序时 我在 Kafka MessagingMessageListenerAdapter
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • 使用架构注册表对 avro 消息进行 Spring 云合约测试

    我正在查看 spring 文档和 spring github 我可以看到一些非常基本的内容examples https github com spring cloud samples spring cloud contract sample
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • Spring Cloud Streams - 源和接收器的多个动态目的地

    我的系统上有一个更改请求 该请求当前侦听多个通道并向多个通道发送消息 但现在目标名称将位于数据库中并随时更改 我很难相信我是第一个遇到这种情况的人 但我看到的信息有限 我只找到这2个 动态接收器目的地 https github com sp
  • 带有 spring-kafka 的 Kafka 死信队列 (DLQ)

    最好的实施方式是什么死信队列 DLQ Spring Boot 2 0 应用程序中的概念 使用 spring kafka 2 1 x 来处理无法处理的所有消息 KafkaListener某些bean发送到某些预定义的Kafka DLQ主题的方

随机推荐

  • JavaScript 按 id 合并对象[重复]

    这个问题在这里已经有答案了 在 Javascript 中合并两个数组的正确方法是什么 我有两个数组 例如 var a1 id 1 name test id 2 name test2 var a2 id 1 count 1 id 2 coun
  • RequestFactory 理论:为什么 Locator<>.find() 被如此频繁地调用?

    我是 RequestFactory 的新手 但得到了慷慨的帮助托马斯 布罗耶在查看了下面的文档之后 情况变得好多了 RequestFactory 入门 请求工厂移动部件 GWT 2 4 中的 RequestFactory 更改 但你能解释一
  • 如何在 Python 中重置 TCP 套接字?

    我有一个用 Python 编写的套接字代理 当它从一对通信对等点接收到 RST 时 它将通过让套接字被垃圾收集来关闭与两个对等点的连接 这会导致其他对等方看到 FIN 而不是 RST 这意味着代理有效地将 RST 转换为 FIN 我认为这并
  • Anaconda“无法创建进程”

    我是 Python 新手 我刚刚安装了 Python anaconda python 2 7 在启动 Anaconda 时提示发生了 无法创建进程 的情况 那么这里有人可以帮我吗 我感谢每一个帮助 我遇到了完全相同的错误 因为我的用户名包含
  • SVN:将存储库主干移动到另一个分支(带有历史记录)

    我正在使用带有大量存储库的 SVN 设置 我试图通过将一个的主干移动到另一个的分支来合并一些 旧的是新的主题版本 减去我稍后将应用的一些代码修复 所以这对我来说很有意义 简短版本 我想从 RepositoryA trunk 转到 Repos
  • VB“Financial.Pmt”在 C# 中等效吗?

    Microsoft VisualBasic 程序集中有一个内置函数 我可以在 VB 中这样使用它 Financial Pmt dAPR 100 12 iNumberOfPayments dLoanAmount 1 我当前的项目是用C 编写的
  • 如何使用 jQuery 计算 ASP.NET 中 gridview 的行数

    有谁知道如何使用 jQuery 计算 asp GridView 中的行数 如果没有找到行那么我想做一些事情 A GridView只是呈现为标准 HTML 表格 因此只需计算trGridView 下的元素 var totalRows tr l
  • 在 ListBoxFor 中选择值的挑战

    最近在开发我的第一个 ASP Net MVC2 Web 应用程序时 当我需要在列表框中选择多个值时 我遇到了一些问题 我用一些 jQuery 解决了这个问题 但继续编写了一些非常简单的代码来演示 我使用 EF 作为模型 有两个实体 Cust
  • 如何从我的 Android 应用程序中删除 QUERY_ALL_PACKAGES 权限?

    由于 Google 的反馈 我的 Google Play 更新版本已被拒绝 3 次 应使用不太广泛的应用程序可见性方法 我们无法批准您的应用使用 QUERY ALL PACKAGES 权限 因为声明的任务可以使用不太广泛的应用可见性方法来完
  • stl容器是否使用隐式共享?

    众所周知 Qt 小部件使用隐式共享 所以我对 stl 容器感兴趣std vector std string也使用隐式共享 如果没有 为什么 因为它非常有用 如果答案是肯定的 我们如何确定呢 我需要简单的 C stl 程序 该程序显示 stl
  • 将 html 结果保存到 txt 或 html 文件

    我有一个带有 html 代码的变量 以下是 R 控制台中代码变量的输出 h1 My First Heading h1 p My first paragraph p 我尝试将内容保存到txt文件中 write table code file
  • 什么是卷积神经网络中的“线性投影”[关闭]

    Closed 这个问题不符合堆栈溢出指南 目前不接受答案 我正在阅读剩余学习 我有一个问题 3 2中提到的 线性投影 是什么 一旦得到这个看起来很简单 但无法理解 有人可以提供简单的例子吗 首先 重要的是要了解什么x y and F以及为什
  • Google BigQuery 的 JDBC 驱动程序?

    有谁知道 Google BigQuery 的 JDBC 接口或驱动程序吗 请只使用 Java 我已经不再使用 Python 库了 有一个第 3 方 JDBC 驱动程序 可以从以下位置获取 http code google com p sta
  • 不完整类型的 new 在包含在模板中时编译

    考虑这段代码 有一个明显的编译错误 1 struct A struct B B new A error allocation of incomplete type A Using a unique ptr也无济于事 2 struct A s
  • HMM 前向算法中的下溢

    我正在实现 HMM 的前向算法来计算给定 HMM 发出给定观察序列的概率 我希望我的算法对于下溢具有鲁棒性 我无法在对数空间中工作 因为前向算法需要概率的乘法和加法 避免下溢的最佳方法是什么 我已经阅读了一些关于此的资料 但我得到的最好的建
  • 在 Angular 2 和 Spring MVC 中使用其他表单字段上传文件

    我在尝试着上传文件和其他表单字段内容从我的 Angular 2 前端到 Spring 后端 但不知怎的 我无法做到这一点 这是我的代码 应用程序组件 ts fileChange e this fileList e target files
  • 如何在选项卡布局中的文本旁边设置图标

    我正在使用以下文本和图标制作 Tablayouttutorial 我的问题是如何使图标放置在文本旁边而不是文本上方 我是Android开发新手 希望大家能帮助我 提前谢谢您 非常感谢您的回答 这是我的java文件 public class
  • ClassNotFoundException:eclipse 中 Jetty hello world 中的 javax.servlet.AsyncContext

    我已点击链接http wiki eclipse org Jetty Tutorial Jetty HelloWorld教程 使用 Eclipse 还查看了现有的 stackoverflowhere 我使用聚合 jetty 8 0 0 jar
  • 为什么所有 JavaScript 控制台日志和错误都显示第 1 行(开发人员工具)

    我正在开发一个 Javascript 项目并使用 Chrome F12 开发人员工具进行调试 由于某种原因 所有 console log 输出和错误消息都声称它们发生在line 1我的 js 文件 即 在控制台中每行右侧显示myFile j
  • 逻辑删除消息未从 KTable 状态存储中删除记录?

    我正在创建 KTable 处理来自 KStream 的数据 但是 当我触发具有密钥和空负载的逻辑删除消息时 它不会从 KTable 中删除消息 sample public KStream