Flink:是否有另一种方法来计算平均值和状态变量而不是使用 RichAggregateFunction?

2023-12-26

我不确定必须使用哪个流 Flink 转换来计算某个流的平均值并在 5 秒的窗口内更新状态(假设它是我的状态的整数数组)。 如果我使用RichFlatMapFunction我可以计算平均值并更新我的数组状态。但是,我必须打电话

streamSource
    .keyBy(0)
    .flatMap(new MyRichFlatMapFunction())
    .print()

我不能把它写在窗户上。 如果我使用

streamSource
    .keyBy(0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .aggregate(new MyAggregateFunction())
    .print()

我无法保持数组状态ValueState.

我试图使用RichAggregateFunction我也遇到了这个线程的同样问题。使用 RichAggregateFunction 时出现 Flink 错误 https://stackoverflow.com/questions/47437207/flink-error-on-using-richaggregatefunction是否有另一种方法来计算平均值并跟踪 Flink 中的另一个状态?

在 Flink 中我该如何解决这个问题? 这是我试图做但实际上不起作用的方法>https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L70 https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L70

streamStations.filter(new SensorFilter("COUNT_TR"))
                .map(new TrainStationMapper())
                .keyBy(new MyKeySelector())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)));
                // THIS AGGREGATE DOES NOT WORK
                // .aggregate(new AverageRichAggregator())
                // .print();

    public static class AverageRichAggregator extends
            RichAggregateFunction<Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double>, Tuple3<Double, Long, Integer>, Tuple2<String, Double>> {

        private static final long serialVersionUID = -40874489412082797L;
        private String functionName;
        private ValueState<CountMinSketch> countMinSketchState;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<CountMinSketch> descriptor = new ValueStateDescriptor<>("countMinSketchState",
                    CountMinSketch.class);
            this.countMinSketchState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public Tuple3<Double, Long, Integer> createAccumulator() {
            this.countMinSketchState.clear();
            return new Tuple3<>(0.0, 0L, 0);
        }

        @Override
        public Tuple3<Double, Long, Integer> add(
                Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double> value,
                Tuple3<Double, Long, Integer> accumulator) {
            try {
                if (value.f1.f1.equals("COUNT_PE")) {
                    // int count = (int) Math.round(value.f2);
                    // countMinSketch.updateSketchAsync("COUNT_PE");
                } else if (value.f1.f1.equals("COUNT_TI")) {
                    // int count = (int) Math.round(value.f2);
                    // countMinSketch.updateSketchAsync("COUNT_TI");
                } else if (value.f1.f1.equals("COUNT_TR")) {
                    // int count = (int) Math.round(value.f2);
                    // countMinSketch.updateSketchAsync("COUNT_TR");
                }
                CountMinSketch currentCountMinSketchState = this.countMinSketchState.value();
                currentCountMinSketchState.updateSketchAsync(value.f1.f1);
                this.countMinSketchState.update(currentCountMinSketchState);

            } catch (IOException e) {
                e.printStackTrace();
            }

            return new Tuple3<>(accumulator.f0 + value.f2, accumulator.f1 + 1L, value.f1.f4);
        }

        @Override
        public Tuple2<String, Double> getResult(Tuple3<Double, Long, Integer> accumulator) {
            String label = "";
            int frequency = 0;
            try {
                if (functionName.equals("COUNT_PE")) {
                    label = "PEOPLE average on train station";
                    // frequency = countMinSketch.getFrequencyFromSketch("COUNT_PE");

                } else if (functionName.equals("COUNT_TI")) {
                    label = "TICKETS average on train station";
                    // frequency = countMinSketch.getFrequencyFromSketch("COUNT_TI");

                } else if (functionName.equals("COUNT_TR")) {
                    label = "TRAIN average on train station";
                    // frequency = countMinSketch.getFrequencyFromSketch("COUNT_TR");
                }
                frequency = this.countMinSketchState.value().getFrequencyFromSketch(functionName);

            } catch (IOException e) {
                e.printStackTrace();
            }

            return new Tuple2<>(label + "[" + accumulator.f2 + "] reads[" + frequency + "]",
                    ((double) accumulator.f0) / accumulator.f1);
        }

        @Override
        public Tuple3<Double, Long, Integer> merge(Tuple3<Double, Long, Integer> a, Tuple3<Double, Long, Integer> b) {
            return new Tuple3<>(a.f0 + b.f0, a.f1 + b.f1, a.f2);
        }
    }

error:

Exception in thread "main" java.lang.UnsupportedOperationException: This aggregation function cannot be a RichFunction.
    at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:692)
    at org.sense.flink.examples.stream.MultiSensorMultiStationsReadingMqtt2.<init>(MultiSensorMultiStationsReadingMqtt2.java:71)
    at org.sense.flink.App.main(App.java:141)

Thanks


不允许聚合器保留任意状态,以防聚合器可能与合并窗口一起使用 - 因为 Flink 不知道如何合并您的临时状态。

但是您可以将 AggregateFunction 与 ProcessWindowFunction 组合起来,如下所示:

input
 .keyBy(<key selector>)
 .timeWindow(<duration>)
 .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());

ProcessWindowFunction 的 process 方法将传递一个仅包含预聚合结果的迭代器,以及一个提供对全局和每个窗口状态的访问的上下文 https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction。希望这能以简单的方式提供您所需的内容。但是,如果您需要使用每个到达的记录更新自己的状态,那么您需要扩展聚合器管理的类型以适应这种情况。

以下是如何使用全局状态的粗略概述:

private static class MyWindowFunction extends ProcessWindowFunction<IN, OUT, KEY, TimeWindow> {
    private final static ValueStateDescriptor<Long> myGlobalState =
      new ValueStateDescriptor<>("stuff", LongSerializer.INSTANCE);

    @Override
    public void process(KEY key, Context context, Iterable<IN> values,  Collector<OUT> out) {
        ValueState<Long> goodStuff = context.globalState().getState(myGlobalState);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink:是否有另一种方法来计算平均值和状态变量而不是使用 RichAggregateFunction? 的相关文章

  • Java中反射是如何实现的?

    Java 7 语言规范很早就指出 本规范没有详细描述反射 我只是想知道 反射在Java中是如何实现的 我不是问它是如何使用的 我知道可能没有我正在寻找的具体答案 但任何信息将不胜感激 我在 Stackoverflow 上发现了这个 关于 C
  • Java EE:如何获取我的应用程序的 URL?

    在 Java EE 中 如何动态检索应用程序的完整 URL 例如 如果 URL 是 localhost 8080 myapplication 我想要一个可以简单地将其作为字符串或其他形式返回给我的方法 我正在运行 GlassFish 作为应
  • 在 java 类和 android 活动之间传输时音频不清晰

    我有一个android活动 它连接到一个java类并以套接字的形式向它发送数据包 该类接收声音数据包并将它们扔到 PC 扬声器 该代码运行良好 但在 PC 扬声器中播放声音时会出现持续的抖动 中断 安卓活动 public class Sen
  • 给定两个 SSH2 密钥,我如何检查它们是否属于 Java 中的同一密钥对?

    我正在尝试找到一种方法来验证两个 SSH2 密钥 一个私有密钥和一个公共密钥 是否属于同一密钥对 我用过JSch http www jcraft com jsch 用于加载和解析私钥 更新 可以显示如何从私钥 SSH2 RSA 重新生成公钥
  • 在 HTTPResponse Android 中跟踪重定向

    我需要遵循 HTTPost 给我的重定向 当我发出 HTTP post 并尝试读取响应时 我得到重定向页面 html 我怎样才能解决这个问题 代码 public void parseDoc final HttpParams params n
  • Final字段的线程安全

    假设我有一个 JavaBeanUser这是从另一个线程更新的 如下所示 public class A private final User user public A User user this user user public void
  • Android:捕获的图像未显示在图库中(媒体扫描仪意图不起作用)

    我遇到以下问题 我正在开发一个应用程序 用户可以在其中拍照 附加到帖子中 并将图片保存到外部存储中 我希望这张照片也显示在图片库中 并且我正在使用媒体扫描仪意图 但它似乎不起作用 我在编写代码时遵循官方的Android开发人员指南 所以我不
  • 无法展开 RemoteViews - 错误通知

    最近 我收到越来越多的用户收到 RemoteServiceException 错误的报告 我每次给出的堆栈跟踪如下 android app RemoteServiceException Bad notification posted fro
  • Android MediaExtractor seek() 对 MP3 音频文件的准确性

    我在使用 Android 时无法在eek 上获得合理的准确度MediaExtractor 对于某些文件 例如this one http www archive org download emma solo librivox emma 01
  • JavaMail 只获取新邮件

    我想知道是否有一种方法可以在javamail中只获取新消息 例如 在初始加载时 获取收件箱中的所有消息并存储它们 然后 每当应用程序再次加载时 仅获取新消息 而不是再次重新加载它们 javamail 可以做到这一点吗 它是如何工作的 一些背
  • 磁模拟

    假设我在 n m 像素的 2D 表面上有 p 个节点 我希望这些节点相互吸引 使得它们相距越远吸引力就越强 但是 如果两个节点之间的距离 比如 d A B 小于某个阈值 比如 k 那么它们就会开始排斥 谁能让我开始编写一些关于如何随时间更新
  • JRE 系统库 [WebSphere v6.1 JRE](未绑定)

    将项目导入 Eclipse 后 我的构建路径中出现以下错误 JRE System Library WebSphere v6 1 JRE unbound 谁知道怎么修它 右键单击项目 特性 gt Java 构建路径 gt 图书馆 gt JRE
  • 总是使用 Final?

    我读过 将某些东西做成最终的 然后在循环中使用它会带来更好的性能 但这对一切都有好处吗 我有很多地方没有循环 但我将 Final 添加到局部变量中 它会使速度变慢还是仍然很好 还有一些地方我有一个全局变量final 例如android Pa
  • Java Integer CompareTo() - 为什么使用比较与减法?

    我发现java lang Integer实施compareTo方法如下 public int compareTo Integer anotherInteger int thisVal this value int anotherVal an
  • AWS 无法从 START_OBJECT 中反序列化 java.lang.String 实例

    我创建了一个 Lambda 函数 我想在 API 网关的帮助下通过 URL 访问它 我已经把一切都设置好了 我还创建了一个application jsonAPI Gateway 中的正文映射模板如下所示 input input params
  • 玩!框架:运行“h2-browser”可以运行,但网页不可用

    当我运行命令时activator h2 browser它会使用以下 url 打开浏览器 192 168 1 17 8082 但我得到 使用 Chrome 此网页无法使用 奇怪的是它以前确实有效 从那时起我唯一改变的是JAVA OPTS以启用
  • 有没有办法为Java的字符集名称添加别名

    我收到一个异常 埋藏在第 3 方库中 消息如下 java io UnsupportedEncodingException BIG 5 我认为发生这种情况是因为 Java 没有定义这个名称java nio charset Charset Ch
  • JGit 检查分支是否已签出

    我正在使用 JGit 开发一个项目 我设法删除了一个分支 但我还想检查该分支是否已签出 我发现了一个变量CheckoutCommand但它是私有的 private boolean isCheckoutIndex return startCo
  • 如何修复 JNLP 应用程序中的“缺少代码库、权限和应用程序名称清单属性”?

    随着最近的 Java 更新 许多人都遇到了缺少 Java Web Start 应用程序的问题Codebase Permissions and Application name体现属性 尽管有资源可以帮助您完成此任务 但我找不到任何资源综合的
  • 将 List 转换为 JSON

    Hi guys 有人可以帮助我 如何将我的 HQL 查询结果转换为带有对象列表的 JSON 并通过休息服务获取它 这是我的服务方法 它返回查询结果列表 Override public List

随机推荐

  • 使用 MPI 分散不同大小的矩阵块

    假设所有矩阵都按行优先顺序存储 说明该问题的一个示例是将 10x10 矩阵分布在 3x3 网格上 以便每个节点中的子矩阵的大小如下所示 3x3 3x3 3x4 3x3 3x3 3x4 4x3 4x3 4x4 我在 Stackoverflow
  • 改进单选按钮的使用以启用/禁用表单字段

    我有两个单选按钮和两个相应的表单字段 根据选择的单选按钮 一个表单字段将被禁用 而另一个表单字段将被启用 我的代码可以工作 但我认为它可以改进 现在我有两个独立的进程 检查页面加载时选择了哪个单选按钮并禁用相应的字段 另一个在页面加载后响应
  • 将下拉菜单与年份绑定

    我必须在 C 中绑定一个下拉框 其中包含从 2008 年到当前年份的年份 我怎样才能实现它 您可以使用以下命令构建整数序列System Linq Enumerable Range var startYear 2008 myDropDownL
  • Swift 3 - 如何提取正则表达式中捕获的组?

    我正在使用 Swift 3 并尝试访问捕获的组 let regexp ALREADY PAID NOT ALR PROVIDER MAY READY MAY BILL BILL YOU PAID n d d d check if some
  • 连接被拒绝 - connect(2) 用于“localhost”端口 25 Rails

    在培训期间 我正在开发一个网站 我们使用 Ruby on Rails 我们需要向用户发送邮件 所以我创建了一个邮件程序 我尝试过将 smtp 放在两者中development rb and environment rb config act
  • 如何在action类和jsp页面之间传递对象数据?

    我有一个名为 Code 的 Java 类 它具有与代码相关的所有值 例如codeId codeDescription等等及其 getter 和 setter 我正在成功检索一个操作类中的代码数据 我正在使用 struts 2 现在我想将这些
  • Go 中的 Unix FIFO?

    有没有办法用Go语言创建unix FIFO 没有Mkfifo nor Mknod in os包 尽管我预计命名的 FIFO 主要用于 posix 操作系统 事实上 有一个创建未命名的 FIFO 管道 的函数 但没有创建命名管道的函数 我是唯
  • 使用constraintEqualToAnchor()时设置自动布局约束后如何更改它们?

    我尝试使用以下命令设置具有自动布局约束的视图constraintEqualToAnchor override func viewDidLoad super viewDidLoad let myView UIView myView backg
  • 如果不存在则调用自由函数而不是方法

    假设您有一系列与类型无关的类 通过返回值的给定方法来实现共同概念 class A public int val const class B public int val const 假设您需要一个通用的自由函数T为未实现的任何类型返回常规值
  • 通过 ASPX 页面流式传输 PDF 数据

    我如何在我的网络服务器上流式传输 pdf 文件 并像谷歌文档一样在我自己的页面中显示它 该页面嵌入到页面中 None
  • swagger-codegen 客户端:如何在模型上包含 Jackson 注释

    我正在使用 swagger codegen 生成一个休息客户端 但遇到一个问题 我正在使用的服务返回一个具有继承的模型 API 模型如下所示 public class Person private List
  • Ansible 日期时间时区转换

    有没有办法在我的剧本中的 调试 语句中将ansible日期转换为不同的时区 我不想在剧本级别设置全球时区 我有这个 debug msg Y m d H M S strftime ansible date time epoch 这工作正常 但
  • 尝试的条件约束不是可索引操作

    我正在使用 DynamoDB 对于除 EQ 之外的所有 ComparisonOperators 的查询 API 它一直给出 尝试的条件约束不是可索引操作 错误 是什么原因 TableName My Table name IndexName
  • 使用 C# 程序为 Canon EOS Rebel XS 创建 USB 延时拍摄

    我想知道这是否可能 我想制作自己的软件来通过远程快门释放来控制快门的释放 从图中我看到它的电压为 3 3V 空载 阈值电压为 1 8V 我想知道如果我理解正确的话 我是否可以使用限制大约 5 25v 的 USB 电缆电压 这是否可能 或者我
  • R 根据前一行中的值删除行

    我是 R 新手 尝试根据前一行的值删除行 样本数据 Cust ID Date Value 500219 2016 04 11 12 00 00 0 500219 2016 04 12 16 00 00 0 500219 2016 04 14
  • /usr/bin/sudo 必须由 uid 0 拥有并设置了 setuid 位版本 .ubantu14.04 LTS

    当我为 npm 设置 EACCESS 并在终端中运行 chown 命令以更改所有者权限时 但现在我陷入了困境 sudo usr bin sudo 必须由 uid 0 拥有并设置了 setuid 位 我的版本是 ubuntu14 04 LTS
  • 将查询的所有结果放入 Prolog 中的列表中

    我想知道如何创建一个谓词 将从某个查询中获得的所有结果 因此我得到一个结果并按分号 直到得到 False 放入列表中 例如 如果我写foo X 1 2 3 在一些 Prolog 监听器中 假设结果是 X 11 X 22 False 我想将所
  • 覆盖css文件中的定义

    我有一个 css 文件 它定义了所有样式 tags 像这样 p 我怎样才能写一个在包含具有默认样式的样式表的页面中 没有简单的方法可以做到这一点 不过 有一些常见的技巧可以模拟这种行为 最好使用的方法取决于被覆盖区域的复杂程度以及您想要执行
  • 适用于 GAE 的 Weasyprint Dockerfile

    我正在尝试在 gae 上安装 weasyprint 我知道我们可以通过将运行时从 python 更改为 app yaml 中的自定义来将其传递到 Dockerfile 中来安装外部库 我在为 weasyprint 库创建 Dockerfil
  • Flink:是否有另一种方法来计算平均值和状态变量而不是使用 RichAggregateFunction?

    我不确定必须使用哪个流 Flink 转换来计算某个流的平均值并在 5 秒的窗口内更新状态 假设它是我的状态的整数数组 如果我使用RichFlatMapFunction我可以计算平均值并更新我的数组状态 但是 我必须打电话 streamSou