如何将自定义 StateStore 添加到 Kafka Streams DSL 处理器?

2023-11-24

对于我的 Kafka 流应用程序之一,我需要使用 DSL 和处理器 API 的功能。我的流媒体应用程序流程是

source -> selectKey -> filter -> aggregate (on a window) -> sink

聚合后,我需要向接收器发送一条聚合消息。所以我定义我的拓扑如下

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
      .filterNot((k,v) -> k.equals("UnknownGroup"))
      .process(() -> new MyProcessor());

我定义一个自定义StateStore并将其注册到我的处理器,如下所示

public class MyProcessor implements Processor<String, String> {

    private ProcessorContext context = null;
    Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);


    KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
        .withKeys(Serdes.String())
        .withValues(invSerde)
        .persistent()
        .build()
        .get();

    public MyProcessor() {
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.register(invStore, false, null); // register the store
        this.context.schedule(10 * 60 * 1000L);
    }

    @Override
    public void process(String partitionKey, String message) {
        try {
            MessageModel smb = new MessageModel(message);
            HashMapStore oldStore = invStore.get(partitionKey);
            if (oldStore == null) {
                oldStore = new HashMapStore();
            }
            oldStore.addSmb(smb);
            invStore.put(partitionKey, oldStore);
        } catch (Exception e) {
           e.printStackTrace();
        }
    }

    @Override
    public void punctuate(long timestamp) {
       // processes all the messages in the state store and sends single aggregate message
    }


    @Override
    public void close() {
        invStore.close();
    }
}

当我运行该应用程序时,我得到java.lang.NullPointerException

线程“StreamThread-18”中的异常 java.lang.NullPointerException 在 org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) 在 org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332) 在org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) 在 org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) 在 org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) 在 org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) 在 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) 在 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

知道这里出了什么问题吗?


您需要注册您的商店outside您的处理器使用StreamsBuilder (or KStreamBuilder在旧版本中)。首先您创建商店,然后将其注册到StreamsBuilder (KStreamBuilder),并且当您添加处理器时,您提供商店名称以连接处理器和商店。

StreamsBuilder builder = new StreamsBuilder();

// create store
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("invStore"),
    Serdes.String(),
    invSerde));
// register store
builder.addStateStore(storeBuilder);

KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
        .filterNot((k,v) -> k.equals("UnknownGroup"))
        .process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name


// older API:

KStreamBuilder builder = new KStreamBuilder();

// create store
StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore")
    .withKeys(Serdes.String())
    .withValues(invSerde)
    .persistent()
    .build();
// register store
builder.addStateStore(storeSupplier);

KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
        .filterNot((k,v) -> k.equals("UnknownGroup"))
        .process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将自定义 StateStore 添加到 Kafka Streams DSL 处理器? 的相关文章

  • KafkaStreams:获取窗口最终结果

    是否可以得到窗口最终结果在 Kafka Streams 中通过抑制中间结果 我无法实现这个目标 我的代码有什么问题吗 val builder StreamsBuilder builder stream
  • 使用 Apache Kafka Streaming 解析 JSON 数据

    我有一个从 Kafka 主题读取 JSON 数据的场景 通过使用 Kafka 0 11 版本 我需要编写 Java 代码来流式传输 Kafka 主题中存在的 JSON 数据 我的输入是包含字典数组的 Json 数据 现在我的要求是获取 文本
  • Kafka 比较键的连续值

    我们正在构建一个应用程序来从传感器获取数据 数据被传输到 Kafka 消费者将其发布到不同的数据存储 每个数据点将具有代表传感器状态的多个属性 在其中一个消费者中 我们希望仅当值发生变化时才将数据发布到数据存储 例如如果有温度传感器每 10
  • KafkaStreams serde异常

    我正在使用 Kafka 和流技术 我为 KStream 创建了一个自定义序列化器和反序列化器 我将使用它来接收来自给定主题的消息 现在的问题是我正在以这种方式创建一个 serde JsonSerializer
  • 与 KafkaStreams 的窗口结束外连接

    我有一个 Kafka 主题 我希望消息具有两种不同的密钥类型 旧的和新的 IE 1 new 1 old 2 new 2 old 密钥是唯一的 但有些可能会丢失 现在 使用 Kotlin 和 KafkaStreams API 我可以记录具有相
  • Kafka Stream groupBy 行为:聚合的许多中间输出/更新

    我正在尝试使用 Kafka Stream 来聚合人们的某些属性 我有一个像这样的卡夫卡流测试 new ConsumerRecordFactory Array Byte Character input new ByteArraySeriali
  • 新建的 KTable 不返回任何内容

    我正在尝试使用 KTable 来消费来自 Kafka 主题的事件 但是 它什么也没返回 当我使用 KStream 时 它返回并打印对象 这实在是太奇怪了 生产者和消费者可以在这里找到 https github com pavankjadda
  • 如何实现通用 Kafka Streams 反序列化器

    我喜欢 Kafka 但讨厌编写大量序列化器 反序列化器 所以我尝试创建一个GenericDeserializer
  • 升级到 kafka-streams:5.5.0-css (Apache Kafka 2.5.0) 后获取 GlobalKTable 的存储崩溃 [已解决]

    我有一个使用 GlobalKTable 的 Spring Boot 应用程序 它工作正常 直到从 5 3 2 css 更新到 kafka streams 5 5 0 css 与 Apache Kafka 2 5 0 兼容的 Confluen
  • 如果一个代理关闭,流应用程序中的 KafkaStream EXACTLY_ONCE 会导致重新平衡失败

    我有一个 Kafka 流应用程序 其中 kafka streams 和 kafka clients 均为 2 4 0 具有以下配置 properties put StreamsConfig BOOTSTRAP SERVERS CONFIG
  • 如何配置 Spring Boot Kafka 客户端使其不尝试连接

    这与Spring Boot Kafka 客户端有 断路器 吗 https stackoverflow com q 69914621 2886891 但我仍然认为这是一个不同的问题 我们需要配置 Spring Boot Kafka 客户端 以
  • Kafka Streams.allMetadata() 方法返回空列表

    所以我正在尝试使用 Kafka 流进行交互式查询 我有 Zookeeper 和 Kafka 在本地运行 在 Windows 上 我使用 C temp 作为 Zookeeper 和 Kafka 的存储文件夹 我已经设置了这样的主题 kafka
  • 现有内部主题具有无效分区

    当在只有一个 Kafka 代理的测试设置中启动我们的 Kafka Streams 应用程序时 我们大约在 15 次运行中看到以下错误 org apache kafka streams errors StreamsException Exis
  • 有什么办法可以让kafka流暂停一段时间然后再恢复吗?

    我们有一个要求 即使用 Kafka Streams 从 Kafka 主题读取数据 然后通过会话池通过网络发送数据 然而 有时 网络调用有点慢 我们需要经常暂停流 以确保网络不会过载 目前 我们将数据捕获到流中并将其加载到执行器服务 然后通过
  • Kafka Streams 在 HDFS 上查找数据

    我正在使用 Kafka Streams v0 10 0 1 编写一个应用程序 并希望通过查找数据来丰富我正在处理的记录 该数据 带时间戳的文件 每天 或每天 2 3 次 写入 HDFS 目录 我怎样才能将其加载到Kafka Streams应
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • KafkaStreams 同一应用程序中的多个流

    我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策 假设我想将两个不同的事件放入其中KTables 我有一个制作人将这些消息发送给KStream那就是听那个话题 据我所知 我不能对消息使用条件转发KafkaStrea
  • 有没有办法重新分区 Kafka 流中的输入主题?

    我有一个由 byte 键控的主题 我想对其进行重新分区并通过消息正文中字段中的另一个键处理该主题 我发现有KGroupedStream and groupby功能 但它需要一个聚合函数来转换为 KTable KStream 我不需要聚合 我
  • TopologyTestDriver 在 KTable 聚合上发送错误消息

    我有一个聚合在 KTable 上的拓扑 这是我创建的通用方法 用于根据我拥有的不同主题构建此拓扑 public static
  • Spring Kafka - 为任何主题的分区消耗最后 N 条消息

    我正在尝试读取请求的卡夫卡消息数 对于非事务性消息 我们将从 endoffset N 对于 M 个分区 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息 对于幂等 事务消息 我们必须考虑事务标记 重复消息 这意味着偏移量将不连续 在这

随机推荐