使用 Kafka Streams DSL 进行两步窗口聚合

2023-11-29

假设我有一个流“stream-1”,每秒由 1 个数据点组成,我想计算一个派生流“stream-5”,其中包含使用 5 秒的跳跃窗口和另一个流“stream-10”的总和它基于包含使用 10 秒跳跃窗口的总和的“stream-5”。需要分别对每个键进行聚合,我希望能够在不同的进程中运行每个步骤。如果stream-5和stream-10包含相同密钥/时间戳的更新,这本身不是问题(所以我不一定需要如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?)只要最后的值是正确的。

有没有一种(简单的)方法可以使用高级 Kafka Streams DSL 来解决这个问题?到目前为止,我还没有找到一种优雅的方法来处理由于聚合而在stream-5 上产生的中间更新。

我知道中间更新可以通过某种方式控制cache.max.bytes.buffering and commit.interval.ms设置,但我认为任何设置都不能保证在所有情况下都不会产生中间值。另外,我可以尝试使用密钥的时间戳部分在读取时将“stream-5”转换为 KTable,但似乎 KTable 不支持像 KStreams 那样的窗口操作。

这是我到目前为止所拥有的,由于 Stream-5 上的中间聚合值而失败

Reducer<DataPoint> sum = new Reducer<DataPoint>() {                                                                           
    @Override                                                                                                                 
    public DataPoint apply(DataPoint x, DataPoint y) {                                                                        
        return new DataPoint(x.timestamp, x.value + y.value);                                                                 
    }                                                                                                                         
 };                                                                                                                           

 KeyValueMapper<Windowed<String>, DataPoint, String> strip = new 
           KeyValueMapper<Windowed<String>, DataPoint, String>() {      
      @Override                                                                                                               
      public String apply(Windowed<String> wKey, DataPoint arg1) {                                                            
          return wKey.key();                                                                                                  
      }                                                                                                                       
 };                                                                                                                           

KStream<String, DataPoint> s1 = builder.stream("stream-1");                                                                      

s1.groupByKey()                                                                                                               
       .reduce(sum, TimeWindows.of(5000).advanceBy(5000))                                                                     
       .toStream()                                                                                                            
       .selectKey(strip)                                                                                                      
       .to("stream-5");                                                                                                          

KStream<String, DataPoint> s5 = builder.stream("stream-5");                                                                      

s5.groupByKey()                                                                                                               
       .reduce(sum, TimeWindows.of(10000).advanceBy(10000))                                                                   
       .toStream()                                                                                                            
       .selectKey(strip)                                                                                                      
       .to("stream-10");      

现在如果stream-1包含输入(键只是KEY)

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":1000,"value":1.0}
KEY {"timestamp":2000,"value":1.0}
KEY {"timestamp":3000,"value":1.0}
KEY {"timestamp":4000,"value":1.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":6000,"value":1.0}
KEY {"timestamp":7000,"value":1.0}
KEY {"timestamp":8000,"value":1.0}
KEY {"timestamp":9000,"value":1.0}

Stream-5 包含正确的(最终)值:

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":2.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":4.0}
KEY {"timestamp":0,"value":5.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":5000,"value":2.0}
KEY {"timestamp":5000,"value":3.0}
KEY {"timestamp":5000,"value":4.0}
KEY {"timestamp":5000,"value":5.0}

但stream-10是错误的(最终值应该是10.0),因为它还考虑了stream-5上的中间值:

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":6.0}
KEY {"timestamp":0,"value":10.0}
KEY {"timestamp":0,"value":15.0}
KEY {"timestamp":0,"value":21.0}
KEY {"timestamp":0,"value":28.0}
KEY {"timestamp":0,"value":36.0}
KEY {"timestamp":0,"value":45.0}
KEY {"timestamp":0,"value":55.0}

问题在于,所有聚合的结果都是 KTable,这意味着为其输出主题生成的记录代表了一个变更日志。但是,当您随后将它们作为流加载时,下游聚合将重复计数。

相反,您需要将中间主题加载为表,而不是流。但是,您将无法对它们使用窗口聚合,因为它们仅在流上可用。

您可以使用以下模式来完成表而不是流的窗口聚合:

https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows

如果您想在单独的进程中运行每个步骤,您可以调整它,只需记住使用 builder.table() 而不是 builder.stream() 加载中间表。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Kafka Streams DSL 进行两步窗口聚合 的相关文章

  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • Apache Kafka Streams 将 KTable 物化到主题似乎很慢

    我正在使用 kafka 流 并试图将 KTable 具体化为一个主题 它有效 但似乎每 30 秒左右完成一次 Kafka Stream 如何 何时决定将 KTable 的当前状态具体化为主题 有没有什么办法可以缩短这个时间 让其更加 实时
  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • 使用 offsets_for_times 从时间戳消费

    尝试使用 confluence kafka AvroConsumer 来消费给定时间戳的消息 if flag creating a list topic partitons to search list map lambda p Topic
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB
  • 我的 Kafka 流应用程序刚刚退出,代码为 0,什么也不做

    为了尝试 Kafka 流 我这样做了 public static void main String args final StreamsBuilder builder new StreamsBuilder final Properties
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或
  • Spring Kafka - 为任何主题的分区消耗最后 N 条消息

    我正在尝试读取请求的卡夫卡消息数 对于非事务性消息 我们将从 endoffset N 对于 M 个分区 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息 对于幂等 事务消息 我们必须考虑事务标记 重复消息 这意味着偏移量将不连续 在这
  • Kafka Java Consumer 已关闭

    我刚刚开始使用卡夫卡 我面临着消费者的一个小问题 我用Java写了一个消费者 我收到此异常 IllegalStateException 此消费者已关闭 我在以下行中遇到异常 ConsumerRecords
  • 如何有效地将数据从 Kafka 移动到 Impala 表?

    以下是当前流程的步骤 Flafka http blog cloudera com blog 2014 11 flafka apache flume meets apache kafka for event processing 将日志写入
  • 如何避免连续“重置偏移量”和“寻找最新偏移量”?

    我正在尝试遵循本指南 https spark apache org docs latest structed streaming kafka integration html https spark apache org docs late
  • Kafka中如何同时实现分布式处理和高可用?

    我有一个由 n 个分区组成的主题 为了进行分布式处理 我创建了两个在不同机器上运行的进程 他们使用相同的 groupd id 订阅主题并分配 n 2 个线程 每个线程处理单个流 每个进程 n 2 个分区 这样我就可以实现负载分配 但现在如果
  • Kafka Streams 反序列化处理程序

    我正在尝试在反序列化中使用 LogAndContinueExceptionHandler 当发生错误时 通过成功记录错误并继续 它可以正常工作 但是 假设我的传入消息有连续的错误流 我停止并重新启动 kafka 流应用程序 然后我看到失败并
  • 删除主题级别配置

    为了删除主题中的所有数据 我将其retention ms配置设置为1000 bin kafka topics sh zookeeper KAFKAZKHOSTS alter topic
  • 如何删除 Apache Kafka 中的主题? [复制]

    这个问题在这里已经有答案了 我需要删除 Kafka 0 8 2 2 3 中的一个主题 我使用以下命令删除主题 bin kafka topics sh zookeeper localhost 2181 delete topic DummyTo
  • Kafka 0.10 Java 客户端超时异常:包含 1 条记录的批次已过期

    我有一个单节点 多 3 个代理 Zookeeper Kafka 设置 我正在使用 Kafka 0 10 Java 客户端 我编写了以下简单的远程 在与 Kafka 不同的服务器上 生产者 在代码中我用 MYIP 替换了我的公共 IP 地址
  • Windows 上的 Apache Kafka 错误 - 无法找到或加载主类 QuorumPeerMain

    我刚刚从 Apache 网站下载了 Kafka 2 8 0 我正在尝试使用网站上给出的说明进行设置 但是当我尝试启动 Zookeper 服务器时 出现以下错误 错误 无法找到或加载主类 org apache zookeeper server

随机推荐

  • 调用 ffmpeg.c 的 main 两次导致应用程序崩溃

    使用 FFmpeg 4 0 2 并调用它ffmpeg c s main函数两次导致 Android 应用程序崩溃 使用 FFmpeg 共享库和 JNI A libc Fatal signal 11 SIGSEGV code 1 fault
  • 使用通用参数作为端口数组长度

    我想做的事 entity FIRfilter is generic NTAPS integer port h in array 0 to NTAPS 1 of std logic vector 15 downto 0 end FIRfitl
  • 基于数据库数组PHP自动检查复选框

    在我的页面的 用户设置 选项卡中 我希望用户确定特定用户发布的帖子类型 表格如下
  • Spark独立模式和本地模式有什么区别?

    Spark独立模式和本地模式有什么区别 Spark Standalone是一个可以在集群上工作的资源管理器 它只是内置的资源管理器 而不是像纱线这样的外部资源管理器 Spark本地运行无需任何资源管理器 一切都在单个jvm中运行 您可以决定
  • Java 同步方法...不同步

    对于我当前的 java 练习 我必须从 2 个不同的 Gmail 帐户获取邮件 我通过创建 Gmail 类的新实例来完成此操作 gmail 类扩展了线程 其中有一个同步方法 readMail 用于获取邮件并打印它 这个 readMail 方
  • 使用 NLog 将记录器名称写入 Excel 文件

    感谢 Rolf 在这个问题中的评论 NLog 在 C 中具有严重性和类别 我能够将日志消息的类别 例如 热 或 数据库 或 机械 记录到文本文件中 我只需将名称传递给 GetLogger 方法即可完成此操作 public MainWindo
  • Mongoimport 带有字符串 _id 和 upsert 的 csv 文件

    我正在尝试使用 mongoimport 来更新插入 id 中带有字符串值的数据 由于 id 看起来像整数 即使它们在引号中 因此 mongoimport 将它们视为整数并创建新记录 而不是更新插入现有记录 我正在运行的命令 mongoimp
  • Android GPU 分析 - OpenGL 动态壁纸速度很慢

    我正在使用 OpenGL ES 3 0 开发动态壁纸 我已经根据优秀教程进行了设置http www learnopengles com how to use opengl es 2 in an android live wallpaper
  • 如何在 NestJS 中处理 RpcException

    我正在尝试构建一个包含多个微服务的 NestJS 后端和一个作为与微服务通信的网关的 REST API 对于网关和微服务之间的通信 我使用 gRPC 简单的通信已经可以工作 但现在我想在微服务中实现错误处理 NestJS 文档指出 这可以通
  • RecyclerView 查看项目

    我想在 RecyclerView 中显示 2 列 但它们显示在 1 列中 如下图所示 如何在两列中显示我的视图 我在我的代码中尝试了两列 rcv pro setLayoutManager new GridLayoutManager this
  • 如何在无需用户交互且仅通过客户端 ID 和密码的情况下验证我的 Quickbook Intuit api 访问?

    我正在开发一个项目 其中后台 crons 创建发票 我想将它们添加到我在后端创建的 Quickbook 帐户中 所以问题是我想仅使用客户端 ID 和秘密参与来访问 api 如何在无需用户交互且仅通过客户端 ID 和密码的情况下验证我的 Qu
  • 有没有办法在使用 ES6 简写方法表示法的方法中使用词法 `this` ?

    关于SO的第一个问题 我希望我没有重复任何内容 我看过other 问题并认为我的不同足以值得询问 基本上 有没有办法让this它位于使用速记符号编写的方法的方法主体中 或者是词法的 或者是绑定到特定值的 这样做的动机来自于我在实现时想要使用
  • 如何指定 JSON 对象应采用哪一个 oneOf 项?

    使用Python和jsonschema我正在尝试验证分配ObjA or ObjB等等beta test json alpha beta ObjA 在我的架构中 testschema json beta is oneOf多个项目 每个项目定义
  • Selenium-IDE:如何验证/断言页面刷新

    我的页面上有一个链接 单击该链接会刷新此页面 如何使用 Selenium IDE 验证页面是否确实已刷新 重新加载 我通过断言页面上最初存在的元素在刷新后不存在于页面上来解决这个问题 然后等到页面完全刷新 并断言该元素再次存在 刷新并等待
  • React router dom 中的链接不会加载页面,仅 url 浏览器导航会更改

    React router dom v5 和 React 16 我的加载应用程序组件包含 ReactDOM render
  • 如何通过 Android 应用程序编辑日历事件

    我如何通过 Android 应用程序编辑日历中的日历事件 任何人都知道如何在日历应用程序中打开议程活动 从日历中读取数据后 试试这个 将单次事件添加到日历 要将条目添加到特定日历 我们需要使用 ContentValues 配置要插入的日历条
  • unicodecsv 读取器从 unicode 字符串无法正常工作?

    我在将 unicode CSV 字符串读入 python unicodescv 时遇到问题 gt gt gt import unicodecsv StringIO gt gt gt f StringIO StringIO u gt gt g
  • 在sqlite3数据库中插入1000000行

    我想向数据库中插入 10 00 000 行 但是插入的时间太长了 例如现在我正在尝试 2055 行 需要 3 分钟才能将这些数据上传到数据库中 对于 2055 个条目来说 这个时间太多了 以下是我将数据插入数据库的方法 public voi
  • numpy var() 和 pandas var() 之间的区别

    最近遇到的一件事让我注意到numpy var and pandas DataFrame var or pandas Series var 给出不同的值 我想知道它们之间有什么区别吗 这是我的数据集 Country GDP Area Cont
  • 使用 Kafka Streams DSL 进行两步窗口聚合

    假设我有一个流 stream 1 每秒由 1 个数据点组成 我想计算一个派生流 stream 5 其中包含使用 5 秒的跳跃窗口和另一个流 stream 10 的总和它基于包含使用 10 秒跳跃窗口的总和的 stream 5 需要分别对每个