Flink_03_Window(个人总结)

2023-11-09

    声明: 1. 本文为我的个人复习总结, 并那种从零基础开始普及知识 内容详细全面, 言辞官方的文章
              2. 由于是个人总结, 所以用最精简的话语来写文章
              3. 若有错误不当之处, 请指出
keyBy不仅是为了分组, 同时还是为了能把数据分布到不同分区进行并行计算

所以开窗前最好先.keyBy

如果没keyBy, 那么调用的就是windowAll

TimeWindow:

  1. 滚动窗口

    窗口大小 = 步长

    .timeWindow(Time.seconds(15))
        
    // 或是 在时间语义为事件时间时
    .window(TumblingEventTimeWindows.of(Time.seconds(15)))
    
  2. 滑动窗口

    窗口大小 ≥ 步长, 会有窗口重叠

    .timeWindow(Time.seconds(15),Time.seconds(5))
        
    // 或是 在时间语义为事件时间时
    .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5)))
    
  3. 会话窗口

    超过一段时间(session时间范围内)没有接收到新数据就会生成新的窗口

    // 或是 在时间语义为事件时间时  
    .window(EventTimeSessionWindows.withGap(Time.seconds(30)))
    

并不是以最小数据的到来时间作为窗口的起点, 而是由一个计算公式:
TimeWindow类中:

 public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
     // offset默认为0, windowSize%windowSize=0, 
     // 故等价为timetamp-timestamp%windowSize, 即以是windowSize的整数倍 & 最接近而且小于等于 最小数据的时间 作为窗口起点
	// offset一般是用来调时区的, 可由SlidingEventTimeWindows of(Time size, Time slide, Time offset)来设置
     return timestamp - (timestamp - offset + windowSize) % windowSize;
 }

CountWindow:

  1. 滚动窗口

    .countWindow(15)
    
  2. 滑动窗口

    .countWindow(15,5)
    

增量聚合函数:

每来一条数据, 就进行计算(提前计算, 预聚合)

  1. .reduce(new ReduceFunction( )…), 只需要实现reduce方法即可

  2. .aggregate(AggregateFunction<T, ACC, R> function), AggregateFunction麻烦些, 要自己实现好多方法

单独用的时候, 延时低, 但是计算次数太多伤性能

对于调用窗口函数进行聚合时, 最好先调用.aggregate 进行预聚合, 如:

  • .aggregate(AggregateFunction<IN, ACC, OUT>aggFunction,ProcessWindowFunction<IN, OUT, KEY, W> windowFunction)

  • .aggregate(AggregateFunction<IN, ACC, OUT>aggFunction,WindowFunction<IN, OUT, KEY, W>windowFunction)

    第一个参数的输出, 是第二个函数的输入

ProcessWindowFunction的 process方法中Iterable<IN> elements参数, 迭代器里只有一个元素

全窗口函数:

数据都到齐了后, 再进行计算

  1. .apply(new WindowFunction( ){ }) // 方法参数里有当前窗口
  2. .process.(new ProcessWindowFunction( ){ }) // 方法参数里有ctx上下文, 更全些

其他函数:

  1. .trigger( ) 触发器: 定义 window 什么时候关闭, 关闭后触发计算并输出结果

  2. .evitor( ) 移除器: 定义移除某些数据的逻辑

  3. .allowedLateness( ) 允许处理迟到的数据

  4. .sideOutputLateData( ) 将迟到的数据放入侧输出流

  5. .getSideOutput( ) 获取侧输出流

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink_03_Window(个人总结) 的相关文章

随机推荐

  • 转骰子

    转骰子 题目描述 骰子是一个立方体 每个面一个数字 初始为左 1 右 2 前 3 观察者方向 后 4 上 5 下 6 用 123456 表示这个状态 放置到平面上 可以向左翻转 用 L表示向左翻转 1 次 可以向右翻转 用 R 表示 向右翻
  • 利用Python采集某站上的所有陪玩妹子信息!听说现在做陪玩的都非常漂亮?

    前言 我想应该很多男生都是会打游戏的吧 女生就手机游戏 王者 刺激战场这些应该多些 男生应该就是英雄联盟 穿越火线 绝地求生等等一系列的游戏吧 那么你知道陪玩是什么时候就冒出来的新职业吗 知道的话可以告诉我一下哦 今天咱们的主要目的还是玩爬
  • Docker容器网络更改

    1 添加自定义网络 默认是桥接模式 docker network create 网络名称 2 解除容器绑定的网络 docker network disconnect 网络名称 容器名称 3 为容器重新指定网络 docker network
  • 《数据结构、算法与应用C++语言描述》使用C++语言实现二维数组稀疏矩阵

    数据结构 算法与应用C 语言描述 使用C 语言实现二维数组稀疏矩阵 稀疏矩阵定义 一个mxn 的矩阵 如果大多数元素都是0 则称为稀疏矩阵 spare matrix 一个矩阵如果不是稀疏的 就称为稠密矩阵 dense matrix 在稀疏矩
  • 如何设计一个分布式系统去分析3亿条数据?

    V xin ruyuanhadeng获得600 页原创精品文章汇总PDF 目录 从一个新闻门户网站案例引入 推算一下你需要分析多少条数据 黄金搭档 分布式存储 分布式计算 这篇文章聊一个话题 什么是分布式计算系统 一 从一个新闻门户网站案例
  • ROS学习之利用xacro/URDF模型搭建及rviz和gazebo仿真

    建议好好研究一下P3DX中的代码 非常有借鉴意义 xacro非常重要的作用是利用类似宏的方式 利用参数化来快速搭建模型 A ROS Gazebo Pioneer 3DX model created by Rafael Berkvens mo
  • qu32调音台说明书_Qu-32 数字调音台

    技术参数 触摸屏 7英寸 800x480彩色触摸屏 推子 100mm 电动推子 32个麦克风 线路输入 平衡XLR 19dBu最大输入电平 TRS带10dB定值衰减 总谐波失真 噪声 均一增益0dB 0 0005 89dBu 20 20kH
  • STM32F103C8T6 驱动 oled 4针篇

    1 配置STM32编译环境 大部分买的STM32F103C8T6属于国产的 会导致程序烧录报错 如果出现问题参考另一篇STM32F103C8T6程序烧录方法 流浪法师解剖鱼的博客 CSDN博客 2 烧录程序 环境配置好后 编写程序烧录就可以
  • 把int变量赋值给char数组 C语言

    char p 10 int i for i 0 i lt 10 i p i char 0 i
  • ubuntu setup nvidia development environment on a single machine(not virtual machine)

    ubuntu 18 04 gtx1080ti setup nvidia driver using software update building in ubuntu refer to address1 i choose nvidia dr
  • 阿里巴巴为什么能抗住90秒100亿?看完这篇你就明白了!

    作者 huashiou链接 https segmentfault com a 1190000018626163 1 概述 本文以淘宝作为例子 介绍从一百个并发到千万级并发情况下服务端的架构的演进过程 同时列举出每个演进阶段会遇到的相关技术
  • librdkafka介绍文档

    ntroduction to librdkafka the Apache Kafka C C client library librdkafka 是一个C实现的高性能 Apache Kafka 客户端 为生产环境提供了一个可靠和高性能的客户
  • 算法题记录【华为od】货币换算单位

    题目描述 思路分析 就是直接处理 比较麻烦的的是将字符串处理成数组 我用的是正则匹配 代码解析 t input 4 100CNY101fen 100HKD102cents 100JPY103sen 100EUR104eurocents 10
  • rabbitmq(四)、消息丢失问题

    丢失消息的三种情况 生产者弄丢了数据 RabbitMQ 弄丢了数据 消费端弄丢了数据 一 生产者弄丢了数据 生产者将数据发送到 RabbitMQ 的时候 可能数据就在半路给搞丢了 因为网络问题啥的 都有可能 方法一 此时可以选择用 Rabb
  • linux cp无法创建一般文件夹,cp: 无法创建普通文件 : 文件已存在

    背景 碰到一个偶现的编译出错问题 如图 报错的信息是 cp 无法创建普通文件 xxx 文件已存在 排查原因 看了下 Makefile 这句非常简单 就是 cp xxx xxx 而已 本身没什么问题 那再结合上下文出现的打印 一个异常之处就是
  • Jupyter快捷键-查看并设置

    1 快捷键 Jupyter 笔记本有两种不同的键盘输入模式 编辑模式允许您将代码或文本输入到一个单元格中 并通过一个绿色边框的单元格来表示 命令模式将键盘与笔记本级命令绑定在一起 并通过一个灰框 左边距蓝色的单元格显示 命令行模式 按 Es
  • 《魔童降世》影评——从封神演义谈到宿命

    今日看完魔童降世中的哪吒 不得不称赞这次改编很精妙 也不得不说编剧的三观很正 哪吒在我们大多数人的心中可能是纯真无害的小孩子 生来便拥有法宝乾坤圈和混天绫 得遇名师太乙真人 修得神通三头六臂 坚持正义 帮助武王伐纣 灭石叽 最后肉身 莲花
  • ImportError: libopenblas.so.0: cannot open shared object file: No such file or directory

    安装OpenBLAS的步骤 1 下载最新的openblas git clone https github com xianyi OpenBLAS git 没有安装git 先安装git CentOS安装git yum install git
  • MyBatisPlus的@TableId注解来实现自增序列id自动插入的功能

    写法 TableId value 数据库主键字段 type IdType 六种类型之一 例如 TableId value user id type IdType AUTO 1 ASSIGN ID 雪花算法 如果不设置 type 值 默认则使
  • Flink_03_Window(个人总结)

    声明 1 本文为我的个人复习总结 并非那种从零基础开始普及知识 内容详细全面 言辞官方的文章 2 由于是个人总结 所以用最精简的话语来写文章 3 若有错误不当之处 请指出 keyBy不仅是为了分组 同时还是为了能把数据分布到不同分区进行并行