Kafka Streams 2.1.1 类转换,同时刷新定时聚合以存储

2023-12-01

我正在尝试使用 kafka 流执行窗口聚合,并仅在某个会话窗口关闭后才发出结果。为了实现这一点,我使用了抑制功能。

问题是我找不到一种方法来使这个简单的测试工作,因为当它尝试保持状态时,我得到一个类转换异常,因为它尝试将 Windowed 转换为 String。 我试图向聚合函数提供Materialized<Windowed<String>,Long,StateStore<>>但它不进行类型检查,因为它期望第一个类型只是字符串。

我在这里缺少什么?

卡夫卡版本2.1.1

package test;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.Test;

import java.text.MessageFormat;
import java.time.Duration;
import java.util.Properties;

public class TestAggregation {

    @Test
    public void aggregationTest() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream<String, Long> input = streamsBuilder.stream("input");

        input
            .groupByKey()
            .windowedBy(SessionWindows.with(Duration.ofSeconds(30)))
            .aggregate(() -> Long.valueOf(0), (key, v1, v2) -> v1 + v2, (key, agg1, agg2) -> agg1 + agg2)
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream()
            .map((k, v) -> new KeyValue<>(k.key(), v))
            .to("output");

        Topology topology = streamsBuilder.build();

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());

        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

        ConsumerRecordFactory<String, Long> producer =
            new ConsumerRecordFactory<>("input", Serdes.String().serializer(), Serdes.Long().serializer());

        testDriver.pipeInput(producer.create("input", "key", 10L));

        ProducerRecord<String, Long> output = testDriver.readOutput("output", Serdes.String().deserializer(), Serdes.Long().deserializer());
        System.out.println(MessageFormat.format("output: k: {0}, v:{1}", output.key(), output.value()));
    }
}

这是我从中得到的堆栈跟踪

17:05:38.925 [main] DEBUG org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Committing
17:05:38.925 [main] DEBUG org.apache.kafka.streams.processor.internals.ProcessorStateManager - task [0_0] Flushing all stores registered in the state manager
17:05:38.929 [main] ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - task [0_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001: 
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:78)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:37)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
    at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
    at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:37)
    at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:86)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
    at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:198)
    at org.apache.kafka.streams.state.internals.MeteredSessionStore.flush(MeteredSessionStore.java:191)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431)
    at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:405)
    at test.TestAggregation.aggregationTest(TestAggregation.java:49)

有两种选择可以解决该问题:

  1. use TimeWindowedKStream::aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);

  2. use KStream::groupByKey(final Grouped<K, V> grouped)

在你的情况下它将是:

Ad 1:

input
    .groupByKey()
    .windowedBy(SessionWindows.with(Duration.ofSeconds(30)))
    .aggregate(() -> Long.valueOf(0), (key, v1, v2) -> v1 + v2, (key, agg1, agg2) -> agg1 + agg2, Materialized.with(Serdes.String(), Serdes.Long()))
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    .map((k, v) -> new KeyValue<>(k.key(), v))
    .to("output");

Ad 2:

input
    .groupByKey(Grouped.with(Serdes.String(), Serdes.Long())
    .windowedBy(SessionWindows.with(Duration.ofSeconds(30)))
    .aggregate(() -> Long.valueOf(0), (key, v1, v2) -> v1 + v2, (key, agg1, agg2) -> agg1 + agg2)
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    .map((k, v) -> new KeyValue<>(k.key(), v))
    .to("output");
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Streams 2.1.1 类转换,同时刷新定时聚合以存储 的相关文章

随机推荐

  • javascript 中函数闭包前是否需要分号?

    我想知道这是否是一个编译器错误 或者这就是它应该的方式 使用node js v10 15 3 此代码会抛出错误 var x x false function y console log foo 但这段代码工作正常 var x x false
  • 隐藏/不可见的 Ajax 请求?

    是否可以编写和创建一个无法被 Firefox 中的 Firebug 插件捕获的 JavaScript Ajax 请求 我问这个问题是因为我可以在 Facebook 上看到没有正在进行的 Ajax 请求 但是当我从另一个帐户发送消息时 顶部的
  • 2D CUDA 中值滤波器优化

    我在 CUDA 中实现了一个 2D 中值滤波器 整个程序如下所示 include cuda runtime h include cuda runtime api h include device launch parameters h in
  • MiniZinc 数组中字符串值的索引

    问题 给定一个 MiniZinc 字符串数组 int numStats set of int Stats 1 numStats array Stats of string statNames 使用从 MiniZinc 数据文件加载的数据 n
  • 无法安装 NuGet 包 - 500 内部服务器错误

    这对我来说毫无意义 NuGet 以前工作正常 但现在当我尝试安装软件包时它会抛出服务器错误 我已经卸载并重新安装了最新的 NuGet 但仍然没有成功 PM gt Install Package EntityFramework Install
  • 为什么 LocationManager 没有 LastKnown 位置?

    我想要用户的位置 并且在该用户自己导航后也只需要一次 locationManager LocationManager this getSystemService LOCATION SERVICE location locationManag
  • 如何在 Windows 7 上安装 Windows Phone 8 SDK

    我在 Windows 7 上设置了所有工作区和所有内容 但我也想开发 Windows Phone 8 但正如 Microsoft 网站所述 它无法安装在 Windows 7 上 有人知道如何在 Windows 7 上安装吗 我找到了一个破解
  • iOS Ionic 应用程序中的链接无法立即打开

    我的应用程序的视图之一中有一些链接 Sharing 添加到日历 打开外部链接 在我用来测试应用程序的 iPhone 上 1 和 2 可以工作 但只有当我按下主页按钮然后返回到应用程序时 才会出现共享 日历对话框 谁能建议什么可能导致这个 我
  • 使用对象类型的字符串名称在 C# 中进行类型转换

    我有以下代码 应该很容易理解 public class Foo public void FooHasAMethod Console WriteLine it is me foo public class Bar public Foo Foo
  • sh 按匹配的列名值分解 CSV 文件,同时保留标题

    我有一个目录 其中包含来自表导出的许多 CSV 文件 tblA csv A B C 1 1 1 1 2 2 2 2 2 3 3 3 tblB csv C D A 1 1 1 1 2 2 2 2 2 3 3 3 为了破坏文件我找到了这个脚本
  • 将 Map[String, Double] 转换为 java.util.Map[String, java.lang.Double]

    我认为我们可以依靠隐式转换来转换scala Double to java lang Double 所以我尝试了以下方法 import scala collection JavaConverters object Main extends A
  • 如何从我的 iPad 应用程序打开设置 (Objective-C) [重复]

    这个问题在这里已经有答案了 我尝试过使用 UIApplication sharedApplication openURL NSURL URLWithString prefs root General path Network 以及您在这里看
  • 从 JAR 中执行 python 文件

    我试图弄清楚如何引用 python 文件 以便我可以在 Java GUI Jar 中执行它 它需要是一个可移植的解决方案 因此使用绝对路径对我来说不起作用 我在下面列出了我的项目结构 并包含了我如何尝试执行 python 脚本的代码 我已经
  • 如何修复高图表中隐藏的数据标签?

    请看一下JSFIDDLE 此处 绿色条不显示任何值 我知道添加overflow none crop false将显示该值 但它超出了绘图区域 有时对于较大的数字 它与标题重叠 我想 仅 在条内获取绿色条值 而不是隐藏该值 对于内部的特定列
  • 淘汰打字稿扩展器

    有人可以发布一个在打字稿中扩展可观察值的示例吗 淘汰赛延长器 http knockoutjs com documentation extenders html 我从2013年3月6日开始使用这个版本的knockout d tshttps g
  • RabbitMQ SSL 与 Apring AMQP 1.4.3 连接

    我正在尝试通过 SSL 连接到 RabbitMQ 我已按照 此处 https www rabbitmq com ssl html 链接的 RabbitMQ SSL 文档进行操作 根据 RabbitMQ SSL 文档 由于已知漏洞 不建议使用
  • 验证输入字符串是有效的十进制数

    任何人都可以提供一种算法来检查输入字符串是否是正确形状和形式的十进制数 正确形状和形式的规则 最多两位小数 出于所有实际目的 最大数字是 99 999 999 99 整数部分可以使用空格 逗号或点作为组分隔符 小数部分可以使用逗号或点作为分
  • 使用 fgetcsv 将 Excel csv 导出到 php 文件

    我正在使用 Excel 2010 Professional Plus 创建 Excel 文件 稍后我尝试将其导出为 UTF 8 csv 文件 我通过将其另存为 CSV 符号分隔 抱歉 我不知道那里的确切措辞 但我没有英文版本 我担心它的翻译
  • 从 podspec 或 info.plist 检索 pod 版本到代码中

    我创建了自己的 pod 其中包含 podspec 文件 其中包含 s version 0 4 7 我希望以编程方式将其写入代码中 因此每当应用程序运行时 它都会将 pod 版本发送到服务器 另一个获取 pod 版本的地方是下面的 plist
  • Kafka Streams 2.1.1 类转换,同时刷新定时聚合以存储

    我正在尝试使用 kafka 流执行窗口聚合 并仅在某个会话窗口关闭后才发出结果 为了实现这一点 我使用了抑制功能 问题是我找不到一种方法来使这个简单的测试工作 因为当它尝试保持状态时 我得到一个类转换异常 因为它尝试将 Windowed 转