使用 Apache Beam 进行窗口化 - 修复了窗口似乎没有关闭的问题?

2023-11-23

我们正在尝试在 Apache Beam 管道上使用固定窗口(使用DirectRunner)。我们的流程如下:

  1. 从发布/订阅中提取数据
  2. 将 JSON 反序列化为 Java 对象
  3. 带有 5 秒固定窗口的窗口事件
  4. 使用自定义CombineFn,合并每个窗口Event变成一个List<Event>
  5. 为了测试方便,直接输出结果List<Event>

管道代码:

    pipeline
                // Read from pubsub topic to create unbounded PCollection
                .apply(PubsubIO
                    .<String>read()
                    .topic(options.getTopic())
                    .withCoder(StringUtf8Coder.of())
                )

                // Deserialize JSON into Event object
                .apply("ParseEvent", ParDo
                    .of(new ParseEventFn())
                )

                // Window events with a fixed window size of 5 seconds
                .apply("Window", Window
                    .<Event>into(FixedWindows
                        .of(Duration.standardSeconds(5))
                    )
                )

                // Group events by window
                .apply("CombineEvents", Combine
                    .globally(new CombineEventsFn())
                    .withoutDefaults()
                )

                // Log grouped events
                .apply("LogEvent", ParDo
                    .of(new LogEventFn())
                );

我们看到的结果是最后一步永远不会运行,因为我们没有得到任何日志记录。

另外,我们还添加了System.out.println("***")在我们自定义的每个方法中CombineFn类,以便跟踪它们何时运行,但似乎它们也没有运行。

这里的窗口设置不正确吗?我们遵循了一个例子https://beam.apache.org/documentation/programming-guide/#windowing这看起来相当简单,但显然缺少一些基本的东西。

如有任何见解,我们将不胜感激 - 提前致谢!


看起来主要问题确实是缺少触发器 - 窗口正在打开,但没有任何东西告诉它何时发出结果。我们想简单地根据处理时间(而不是事件时间)来设置窗口,因此执行了以下操作:

.apply("Window", Window
    .<Event>into(new GlobalWindows())
    .triggering(Repeatedly
        .forever(AfterProcessingTime
            .pastFirstElementInPane()
            .plusDelayOf(Duration.standardSeconds(5))
        )
    )
    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)

本质上,这会创建一个全局窗口,在处理第一个元素后 5 秒触发该窗口发出事件。每次关闭窗口时,一旦收到元素,就会打开另一个窗口。当我们没有时,Beam 抱怨道withAllowedLateness片段 - 据我所知,这只是告诉它忽略任何最新的数据。

我的理解可能有点离题,但是上面的代码片段已经解决了我们的问题!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Apache Beam 进行窗口化 - 修复了窗口似乎没有关闭的问题? 的相关文章

  • 多线程环境下如何更好的使用ExecutorService?

    我需要创建一个库 其中包含同步和异步方法 executeSynchronous 等待直到有结果 返回结果 executeAsynchronous 立即返回一个 Future 如果需要 可以在其他事情完成后进行处理 我的图书馆的核心逻辑 客户
  • 如何在Java 8中实现Elvis运算符?

    我有一个经典的 Elvis 运算符 案例 其中我调用每个可能返回 null 的方法并将它们链接在一起 thing nullableMethod1 a nullableMethod2 b nullableMethod3 在 Java 8 中
  • 如何测试调用父类的受保护(不需要的)方法的方法?

    我陷入了一个非常奇怪的情况 我有一些需要测试的特定代码 这里是 public class A The real method of real class is so big that I just don t want to test it
  • 使用除 SINGLE_TABLE 之外的任何其他 Hibernate 继承策略时 JVM 崩溃

    好吧 这可能不太可能 但还是这样吧 在Java JRE 1 6 0 26 b03 中我有两个类 SuperControl及其子类SubControl 它们都需要是持久对象 我正在使用 Hibernate Annotations 来实现这一点
  • @OneToMany 与 @JoinTable 错误

    我试图理解 OneToMany with JoinTable 对于这样的场景 我正在使用 JPA 2 1 Hibernate 5 0 4 和 Oracle 11 XE 当我打电话时userDao save user 下面的代码 我有 jav
  • 如何识别 Java 中的不可变对象

    在我的代码中 我正在创建一个对象集合 这些对象将由各种线程以只有在对象不可变的情况下才安全的方式访问 当尝试将新对象插入到我的集合中时 我想测试它是否是不可变的 如果不是 我将抛出异常 我能做的一件事是检查一些众所周知的不可变类型 priv
  • 在 TestNG 中运行多个类

    我正在尝试自动化一个场景 其中我想登录一次应用程序 然后进行操作而无需再次重新登录 考虑一下 我有在特定类的 BeforeSuite 方法中登录应用程序的代码 public class TestNGClass1 public static
  • 是否可以从另一个方法传递 args[] 来调用 main 方法?

    我试图从另一个传递参数的方法调用类的主要方法 就像从命令行运行该类时一样 有没有办法做到这一点 您可以致电main方法就像您调用任何其他 静态 方法一样 MyClass main new String arg1 arg2 arg3 Exam
  • 正确使用 JDBC 连接池 (Glassfish)

    我需要在 Java Web 服务中作为会话 bean 实现数据库连接 但我不确定我这样做是否正确 我创建了一个类 public final class SQLUtils private static DataSource m ds null
  • 如何在 Struts 2 OGNL 中将参数传递给方法调用

    我想使用属性作为对象方法的参数
  • 如何将自定义日志处理程序添加到 Google App Engine?

    我正在尝试向我的 java 应用程序添加自定义日志处理程序 我已经实现了一个扩展 java util Logging Handler 类的 InnerLogger 类 在我的logging properties中声明为处理程序 handle
  • 从关卡堆栈中获取相对比例的数学

    为这个可怕的标题道歉 我花了 10 分钟试图用一句话来解释这一点 但失败了 虽然提示这个问题的应用程序是用Java Android 编写的 但我认为它非常通用并且适用于任何语言 欢迎使用伪代码 或简单的英语 回复 我不确定是否应该标记所有通
  • 如何制作无限的jscrollpane?

    我之前已经实现过拖动滚动 但是创建无限滚动窗格的最佳方法是什么 当然不会有任何滚动条 我将实现拖动滚动 我想做的是在无限表面上实现动态加载 EDIT 当然 它实际上不会是无限的 我想问如何伪造它 您可以执行以下操作 AdjustmentCl
  • Java元数据读写

    是否可以以通用方式 对于所有图像类型 在 Java 中读取和写入元数据 我找到了一些示例 但它们总是特定的 例如 JPEG 或 PNG 我需要一些足够通用的东西 而不是到处都有 if else 语句 我不想重写源代码 但这是一个很好的例子
  • 在 eclipse 之外将 Spring MVC 应用程序部署到 tomcat 的幕后会发生什么?

    我猜想使用像 eclipse 这样很棒的 IDE 的一个缺点是你会忽略应用程序幕后发生的事情 我是一名 Ruby 开发人员 所以不是一名 Java 老手 所以我一直在用 java 编写一个项目 并使用 spring 框架进行 IOC 和 M
  • 如何使用云打印打印Android活动显示

    我正在尝试将 Google 云打印实现到应用程序中 遵循集成指南 https developers google com cloud print docs android 我试图通过打印 google com 来保持基本 单击我创建的打印按
  • 战争库中的罐子爆炸

    我们可以将分解的 jar 文件放入 war web inf 库中吗 它在 JBOSS 4 2 中对我不起作用 我收到以下错误并且无法部署应用程序 Caused by javax management RuntimeOperationsExc
  • 在服务器内部调用 Web 服务

    我有一个网络服务 getEmployee 当传递 id 时 它会获取单个员工的员工详细信息 同一服务器上的另一个 Web 服务 getEmployeeList 当传递一个部门时 它会获取整个员工列表 这将获取部门的 ID 然后调用 getE
  • 从命令行运行 Maven 插件的语法是什么。

    我看到这里已经有人问过这个问题 如何从命令行执行maven插件 https stackoverflow com questions 12930656 how to execute maven plugin from command line
  • 如何从spark中的hbase表中获取所有数据

    我在 hbase 中有一个大表 名称为 UserAction 它具有三个列族 歌曲 专辑 歌手 我需要从 歌曲 列族中获取所有数据作为 JavaRDD 对象 我尝试了这段代码 但效率不高 有更好的解决方案来做到这一点吗 static Spa

随机推荐

  • 从 SharedPreferences 设置和获取 StringSet?

    我正在构建一个 Android 应用程序 我想在首选项中存储一组字符串 以便根据登录信息跟踪谁使用了该应用程序 我不想使用数据库 所以我知道我应该使用 SharedPreferences 来存储登录人员的列表 我希望能够重置此列表 以便将个
  • 使用中位数和分组依据以及谷歌表格进行查询

    我需要获得分组中位数 我已经对表单的数据进行了分组 From type Weight A person person 4 A person person 3 A person organization 11 A person person
  • 在 QToolTip 中使用图片或图像

    有没有办法在 QToolTip 中显示图片 图像 我想显示键盘按钮的小图像 以向用户解释他可以在该特定小部件上使用哪些按钮 快捷方式 您可以使用以下 html 代码轻松显示图像 QToolTip showText QCursor pos i
  • AngularJS 和谷歌云端点:需要演练

    我是 AngularJS 的新手 但我真的很喜欢 AngularJS 的工作方式 因此我想将其部署为我的 Google 云端点后端的客户端 然后我立即遇到两个问题 1 放在哪里我的回调 那么它能够在 ANGularJs 控制器中工作吗 2
  • 在字符串中包含常量而不连接

    PHP 中有没有一种方法可以在字符串中包含常量而无需连接 define MY CONSTANT 42 echo This is my constant MY CONSTANT No 对于字符串 PHP 无法区分字符串数据和常量标识符 这适用
  • 使用元素求幂加速嵌套 for 循环

    我正在编写一个大型代码 我发现自己需要加速其中的特定部分 我创建了一个MWE如下图所示 import numpy as np import time def random data N Generate some random data r
  • 如何以编程方式打开 Safari 扩展 ToolbarItem 弹出窗口

    我想以编程方式触发 Safari 扩展工具栏项目上的 单击 事件 以便在网页上发生某些情况后出现我的自定义弹出窗口 我正在使用新的 Xcode 扩展 IDE 并使用界面生成器构建了我的弹出窗口 目前 StackOverflow 上的所有答案
  • 使用 .bat 文件运行 php 脚本

    我需要每天晚上在我的服务器上运行一个 php 脚本 在 Linux 系统上我设置了一个 cron 作业 但我被困在 Windows 系统上 我知道我必须使用 Windows 任务计划程序设置一个任务 并且该任务需要运行一个 bat 文件 该
  • 向 DataTable 添加多行

    我知道有两种方法将带有数据的新行添加到DataTable string arr2 one two three dtDeptDtl Columns Add Dept Cd for int a 0 a lt arr2 Length a Data
  • 关于 C 中的 ## 预处理器

    Given define cat x y x y 电话cat a 1 回报a1 but cat cat 1 2 3 未定义 但是如果我也定义 define xcat x y cat x y 那么结果是xcat xcat 1 2 3 就是现在
  • RequireJS:根据环境加载不同的文件

    是否有根据当前项目环境 例如开发或生产 加载不同文件的功能 我的意思是 它可以帮助我透明地加载缩小或完整的文件 我读到有关多版本加载的内容 但多版本意味着我需要指定文件的版本 例如 我的模块中有 module js 文件 在这个文件中我需要
  • CSS 媒体查询 - 顺序很重要吗?

    现在我经常使用 CSS 媒体查询 我想知道最好按什么顺序使用它们 Method 1 media only screen and min width 800px content sidebar media only screen and ma
  • 获取文件的 QuickLook 预览图像

    有什么方法可以快速查看文件的预览图像吗 我正在寻找这样的东西 NSImage image QuickLookPreviewer quickLookPreviewForFile path See QLThumbnailRequest在文档中
  • Flutter 中的水平步进器

    我想创建一个水平步进器 我知道这很容易 但是这一次 步数应该很大 举个例子 这就是我在垂直领域所做的事情 import package flutter material dart void main gt runApp new MyApp
  • 在 C++ 中如何实现多个 COM 接口?

    我试图理解这个示例代码关于浏览器帮助程序对象 在内部 作者实现了一个公开多个接口 IObjectWithSite IDispatch 的类 他的 QueryInterface 函数执行以下操作 if riid IID IUnknown pp
  • 如何验证 jar 内 MANIFEST.MF 的顺序?

    我遇到了一个有趣的问题 这对我来说绝对是新的 正如我突然发现的 Jar 规范说 被包含在内 META INF and MANIFEST MF必须是第一个和第二个条目 jar包而不仅仅是存档中的目录和文件 我正在使用 Java 框架 非常注意
  • 如何在 C++/CLI 中使用 boost::bind 绑定托管类的成员

    我在本机 C 类中使用 boost signal 现在我正在 C CLI 中编写 NET 包装器 以便可以将本机 C 回调公开为 NET 事件 当我尝试使用 boost bind 获取托管类的成员函数的地址时 出现编译器错误 3374 指出
  • Python CSV 到 SQLite

    我正在 转换 一个大的 1 6GB CSV 文件并将CSV 的特定字段插入到SQLite 数据库中 基本上我的代码如下所示 import csv sqlite3 conn sqlite3 connect path to file db co
  • 使用 Apache POI 将部分单元格内容设置为下划线?

    我正在开发一个程序 其中我必须在 Excel 电子表格中设置单元格值 例如 这是一下划线 text 它可以是任何粗体 斜体或下划线 我正在使用 Apache POI 3 9 请尝试以下操作 public static void differ
  • 使用 Apache Beam 进行窗口化 - 修复了窗口似乎没有关闭的问题?

    我们正在尝试在 Apache Beam 管道上使用固定窗口 使用DirectRunner 我们的流程如下 从发布 订阅中提取数据 将 JSON 反序列化为 Java 对象 带有 5 秒固定窗口的窗口事件 使用自定义CombineFn 合并每