Flink 窗口

2023-11-04

介绍:流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段,其分为两种类型:1、时间窗口,2:计数窗口

一、时间窗口

时间窗口根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)

1.1、滚动窗口(Tumbling Windows)

介绍:将数据依据固定的窗口长度(时间)对数据进行切片
特点:时间对齐,窗口长度固定,没有重叠

package com.xx.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * @author aqi
 * @since 2023/8/30 15:46
 */
@Slf4j
public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
        SingleOutputStreamOperator<Demo> sensorDS = env
                .socketTextStream("127.0.0.1", 7777)
                .map(new DemoMapFunction());

        // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                .keyBy(Demo::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        // 聚合(也可以使用别的算子进行聚合)
        SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce(
                (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue())
        );
        // 打印计算结果
        reduce.print();
        // 触发计算
        env.execute();
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Demo {

    private String id;

    private Long value;
}

class DemoMapFunction implements MapFunction<String, Demo> {

    @Override
    public Demo map(String value) {
        String[] datas = value.split(",");
        return new Demo(datas[0], Long.valueOf(datas[1]));
    }
}


1.2、滑动窗口(Sliding Windows)

介绍:滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
特点:时间对齐,窗口长度固定,有重叠

package com.xx.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * @author aqi
 * @since 2023/8/30 15:46
 */
@Slf4j
public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
        SingleOutputStreamOperator<Demo> sensorDS = env
                .socketTextStream("127.0.0.1", 7777)
                .map(new DemoMapFunction());

        // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
//        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
//                .keyBy(Demo::getId)
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        // 滑动窗口(每5秒钟统计一次,过去的10秒钟内的数据)
        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                .keyBy(Demo::getId)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));

        // 聚合(也可以使用别的算子进行聚合)
        SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce(
                (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue())
        );
        // 打印计算结果
        reduce.print();
        // 触发计算
        env.execute();
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Demo {

    private String id;

    private Long value;
}

class DemoMapFunction implements MapFunction<String, Demo> {

    @Override
    public Demo map(String value) {
        String[] datas = value.split(",");
        return new Demo(datas[0], Long.valueOf(datas[1]));
    }
}


1.3、会话窗口(Session Windows)

介绍:由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口
特点:时间无对齐

package com.xx.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * @author aqi
 * @since 2023/8/30 15:46
 */
@Slf4j
public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
        SingleOutputStreamOperator<Demo> sensorDS = env
                .socketTextStream("127.0.0.1", 7777)
                .map(new DemoMapFunction());

        // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
//        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
//                .keyBy(Demo::getId)
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));


        // 滑动窗口(每5秒钟统计一次,过去的10秒钟内的数据)
//        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
//                .keyBy(Demo::getId)
//                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));


        // 会话窗口(超时间隔5s)
        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                .keyBy(Demo::getId)
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));


        // 聚合(也可以使用别的算子进行聚合)
        SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce(
                (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue())
        );
        // 打印计算结果
        reduce.print();
        // 触发计算
        env.execute();
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Demo {

    private String id;

    private Long value;
}

class DemoMapFunction implements MapFunction<String, Demo> {

    @Override
    public Demo map(String value) {
        String[] datas = value.split(",");
        return new Demo(datas[0], Long.valueOf(datas[1]));
    }
}


1.4、总结

滚动窗口:TumblingProcessingTimeWindows.of(Time.seconds(10))
滑动窗口:SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))
会话窗口:ProcessingTimeSessionWindows.withGap(Time.seconds(5))

二、计数窗口

和时间窗口类似,同样也分为三种,使用方法也基本相同

1.1、滚动窗口(Tumbling Windows)

窗口长度=5个元素

sensorKs.countWindow(5);

1.2、滑动窗口(Sliding Windows)

窗口长度=5个元素,滑动步长=2个元素

sensorKs.countWindow(5, 2);

1.3、会话窗口(Session Windows)

三、窗口触发方式

3.1、增量聚合

来一条数据,计算一条数据,窗口触发的时候输出计算结果
函数:reduce、aggregate等,除了process都是增量函数

3.2、全窗口函数

数据来了不计算,存储起来,窗口触发的时候,计算并输出结果,并且可以获取到窗口信息、上下文信息等,灵活性非常的强
函数:process

package com.xx.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author aqi
 * @since 2023/8/30 15:46
 */
@Slf4j
public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
        SingleOutputStreamOperator<Demo> sensorDS = env
                .socketTextStream("127.0.0.1", 7777)
                .map(new DemoMapFunction());

        // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                .keyBy(Demo::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<Demo, String, String, TimeWindow>() {

            /**
             * 全窗口函数的计算逻辑,窗口触发时才会调用一次,统一计算窗口的所有数据
             * @param s 分组的key
             * @param context 上下文
             * @param elements 存的数据
             * @param out 采集器
             */
            @Override
            public void process(String s, ProcessWindowFunction<Demo, String, String, TimeWindow>.Context context, Iterable<Demo> elements, Collector<String> out) {
                long start = context.window().getStart();
                long end = context.window().getEnd();

                String startWindow = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                String endWindow = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");

                long count = elements.spliterator().estimateSize();

                out.collect("key=" + s + "的窗口[" + startWindow + "," + endWindow + "]包含:" + count + "条数据===>" + elements);
            }
        });

        // 打印计算结果
        process.print();
        // 触发计算
        env.execute();
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Demo {

    private String id;

    private Long value;
}

class DemoMapFunction implements MapFunction<String, Demo> {

    @Override
    public Demo map(String value) {
        String[] datas = value.split(",");
        return new Demo(datas[0], Long.valueOf(datas[1]));
    }
}


3.3、增量函数和全窗口函数组合使用

package com.xx.window;

import com.xx.entity.WaterSensor;
import com.xx.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author aqi
 * @since 2023/8/30 15:46
 */
public class WindowAggregateAndProcessDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("127.0.0.1", 7777)
                .map(new WaterSensorMapFunction());


        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);

        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> result = sensorWS.aggregate(
                // 第一个参数:输入数据的类型,第二个参数:累加器的类型,存储的中间计算结果的类型,第三个参数:输出的类型
                new AggregateFunction<WaterSensor, Integer, String>() {
                    @Override
                    public Integer createAccumulator() {
                        System.out.println("初始化累加器");
                        return null;
                    }

                    @Override
                    public Integer add(WaterSensor value, Integer accumulator) {
                        if (accumulator == null) {
                            accumulator = 0;
                        }
                        Integer add = value.getVc() + accumulator;
                        System.out.println("调用add方法,累加结果:" + add);
                        return add;
                    }

                    @Override
                    public String getResult(Integer accumulator) {
                        System.out.println("获取最终结果");
                        return accumulator.toString();
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        System.out.println("调用merge方法");
                        return null;
                    }
                }, new ProcessWindowFunction<String, String, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                        long start = context.window().getStart();
                        long end = context.window().getEnd();

                        String startWindow = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                        String endWindow = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");

                        long count = elements.spliterator().estimateSize();

                        out.collect("key=" + s + "的窗口[" + startWindow + "," + endWindow + "]包含:" + count + "条数据===>" + elements);
                    }
                });

        result.print();
        env.execute();
    }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink 窗口 的相关文章

随机推荐

  • vue+axios+vite实现跨域请求

    使用vue axios vite实现跨域请求 1 route js import createRouter createWebHistory from vue router const routerHistory createWebHist
  • VUEJS入坑日记.2 -DatePicker设置默认日期

    iview中DatePicker 的value属性和v modal不能同时使用
  • 从输入网址到显示网页的全过程分析

    作为一个软件开发者 你一定会对网络应用如何工作有一个完整的层次化的认知 同样这里也包括这些应用所用到的技术 像浏览器 HTTP HTML 网络服务器 需求处理等等 本文将更深入的研究当你输入一个网址的时候 后台到底发生了一件件什么样的事 1
  • python怎样让条形图x轴的字不要重叠

    在 matplotlib 中绘制条形图时 可以使用 rotation 参数来控制 x 轴标签的旋转角度 例如 如果你希望将 x 轴标签旋转 45 度 可以这样写 import matplotlib pyplot as plt 绘制条形图的代
  • 虚拟机类加载机制

    虚拟机类加载机制 深入理解Java虚拟机 第2版 类加载的时机 类从被加载到虚拟机内存中开始 到卸载出内存为止 它的整个生命周期包括 加载 验证 准备 解析 初始化 使用和卸载7个阶段 其中验证 准备 解析3个部分统称为连接 加载 验证 准
  • PAN解读 —— Efficient and Accurate Arbitrary-Shaped Text Detection with Pixel Aggregation Network

    文章目录 简述 网络结构 Backbone Segmentation Head FPEM FFM Output and Pixel Aggregation PA 损失函数 Aggregation Loss Discrimination Lo
  • CocoaPods问题排查步骤

    CocoaPods问题排查步骤 1 ruby源的问题排查 1 查看ruby的源 gem sources l 2 添加ruby的源 如果不是 https gems ruby china com 的源的话 添加这个源 gem sources a
  • 10分钟手把手教你运用Python实现简单的人脸识别

    今天 我们用Python实现高大上的人脸识别技术 Python里 简单的人脸识别有很多种方法可以实现 依赖于python胶水语言的特性 我们通过调用包可以快速准确的达成这一目的 这里介绍的是准确性比较高的一种 01 首先 梳理一下实现人脸识
  • uniApp入门(一)

    目录 一 项目准备 1 1 创建项目 1 2 创建页面 1 3 运行项目 1 4 pages json文件的页面配置与全局配置 1 5 rpx单位 二 内置组件 2 1 基础内容 2 2 视图容器 2 2 1 scrollView 2 2
  • 面向对象基础案例(2)

    个人简介 作者简介 大家好 我是W chuanqi 一个编程爱好者 个人主页 W chaunqi 支持我 点赞 收藏 留言 愿你我共勉 没有什么比勇气更温文尔雅 没有什么比怯懦更冷酷无情 文章目录 面型对象基础案例 2 1 打印水果类价格
  • shell中怎么比较两个字符串的大小?

    shell中 有两个字符串 2004 05 23 和 2005 03 01 怎么来比较他们的大小呢 方法一 date d echo 2005 03 01 tr s date d echo 2004 05 23 tr s 执行结果分别为 11
  • 离线OCR中英文图片识别

    webOcr WebOcr 基于Google Tessract4机器学习构建中英文离线Ocr项目 在其基础上提供了http调用的接口 便于你在其他的项目中调用 并且提供了Docker 便于部署 特性 中文识别 快速高识别率 模型训练 通过j
  • Git 讲解及常用的操作

    这个博主的Git常用操作 以及Git的讲解 让我这个初学者收获良多 因此转载记录 https blog csdn net pzm1993 article details 79980258 前言 目前来说 版本控制主要分为 集中式版本控制 C
  • matlab中散点图的线性拟合_matlab画散点图并拟合函数曲线MATLAB画散点图和二次函数...

    matlab画散点图并拟合函数曲线 MATLAB画散点图和二次函数 www zhiqu org 时间 2020 12 07 画法如下 在输入栏分别输入x y matlab的开始菜单start gt toolboxes gt cirve fi
  • jacob 插入水印方法整理

    Dispatch activeWindow this word getProperty ActiveWindow toDispatch 取得活动窗格对象 Dispatch activePan Dispatch get activeWindo
  • 1262、可被三整除的最大和(扩展、可被k整除的最大和)

    LeetCode 1262 可被三整除的最大和 给你一个整数数组 nums 请你找出并返回能被三整除的元素最大和 示例 1 输入 nums 3 6 5 1 8 输出 18 解释 选出数字 3 6 1 和 8 它们的和是 18 可被 3 整除
  • mybatis-plus注解实现数据的批量操作

    1 批量删除 Delete Integer deletePrinterBindAll String pri
  • vue怎么在style中使用data中定义的变量

    需求 动态修改三方组件的样式 思路 项目开发中使用的某某某三方ui组件 所以想要修改这个组件的样式只能通过css进行修改 那么想要动态修改 就要在style中使用data里的变量 实现
  • python伪造请求头x-forwarded-for的作用_python3-requests伪造x-forwarded-for以及解决

    通常 我们的服务器会有一级或者多级的反向代理 因此我们代码中拿remote addr会取到最后一级反向代理的ip 为了拿到真实ip 常常会去x Forward For中拿用户最后一级代理Ip 但是由于该值是header中的 直接去取可能取到
  • Flink 窗口

    介绍 流式计算是一种被设计用于处理无限数据集的数据处理引擎 而无限数据集是指一种不断增长的本质上无限的数据集 而 window 是一种切割无限数据为有限块进行处理的手段 其分为两种类型 1 时间窗口 2 计数窗口 一 时间窗口 时间窗口根据