我正在创建 KTable 处理来自 KStream 的数据。但是,当我触发具有密钥和空负载的逻辑删除消息时,它不会从 KTable 中删除消息。
sample -
public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
.map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
.groupByKey()
reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));
GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);
触发具有空值的消息后,我可以在具有空负载的 testStream 映射函数中进行调试,但它不会删除 KTable 更改日志“test-store”上的记录。看起来它甚至没有达到减少方法,不确定我在这里缺少什么。
感谢对此的任何帮助!
Thanks.
如 JavaDocs 中所述reduce()
具有 {@code null} 键或值的记录将被忽略。
因为,<key,null>
记录被删除,因此(genericRecord, v1) -> v1
永远不会执行,不会将逻辑删除写入存储或更改日志主题。
对于您想到的用例,您需要使用指示“删除”的代理值,例如 Avro 记录中的布尔标志。你的reduce函数需要检查标志并返回null
如果设置了标志;否则,必须定期处理该记录。
Update:
Apache Kafka 2.6 添加了KStream#toTable()
运算符(通过KIP-523)允许变换KStream
into a KTable
.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)