flink核心之watermarker

2023-05-16

背景介绍:

现在的社会,人们产生越来越多的数据,而数据对每个人人都产生了巨大的影响。比如你去银行贷款,那么必然银行要对你做信用评估,会涉及到大数据画像等数据分析。比如美团外卖,则需要分析足够多的数据,给买卖各方做推荐引导,并同时结合大量的订单数据给骑手做配送陆续引导。

所有的这一切都离不开大数据计算,而flink作为其中的经典代表,有着极其重要的位置。在成千上万的服务器上跑着flink应用。

今天本文抛出一个flink应用里面极其重要的因子-watermark。

我们看下实际的应用场景中的例子:我们知道在实际的生产环境上,应用是在不同的物理机器上面运行着的,所以数据的产生和真正的消费是通过网络传输在不同的机器上流流转着的。有网络传输就可能会有一些延迟,毕竟网络不可靠么。

那必然就有一个问题,比如event1 产生的时间是第1S,而event2 产生的时间是第2S,因为网络延迟的原因, event2 反而先于event1到来消费,如果我们都是按照实时消费的模式来处理数据,那么必然event1的数据就被忽略了,因为event1的数据没按时到来。所以这种实时窗口的方式实际在生成中会有一些问题。

那么如何解决这样的问题呢?所以就引出了本文的内容:实际会定义一个window,允许 window规格内的数据也是正常需要处理的数据。

一种非实时的事件产生图示


­­­下面开始我们的今天的主题。

--------------------------------------------------------------------------------------------------------------------------

Watermark的定义

Watermark就是为了解决上述问题由Apache Flink提出的一种机制,本质上也是一种时间戳,由Flink Source或者自定义的Watermark生成器按照一定的方式,通常是Punctuated或者Periodic两种方式生成的,接收方收到Watermark Event就会根据流入事件去调整自己管理的EventTime clock Apache Flink 框架保证Watermark不会重复且严格递增,接受方收到某个Watermark时候,就知道不会再有任何小于该Watermark的时间戳的event了,所以Watermark简单理解就是某种协议,约定在接受到watermark的时候,已经处理完所有小于该watermark的事件了。

处理流程图如上图

系统里面涉及的几种时间类型

我们上文中说了解决event 乱序的问题,乱序本质就是数据的产生和消费时间没有严格顺序,在flink里面的涉及到的时间,有如下三种:

  1. ProcessingTime
    是数据流入到具体某个算子(即将进入运算)时候相应的系统时间。ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境中相同数据流多次运行有可能产生不同的计算结果,具有不确定性。
  2. IngestionTime
    IngestionTime是数据进入Flink框架的时间,Source Operator中去设置。结果可预算,因为只在source源处产生,IngestionTime的时间戳比较稳定,同一数据在流经不同窗口时,该时间是固定的,而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳。

EventTime
EventTime是事件流经设备的时候产生的时间。在进入Flink框架之前EventTime通常就生成到记录中,后续不改变, EventTime也可以从记录中提取出来。在实际计算中,大多会使用EventTime来进行数据计算的依据。

三种时间示意图

开篇描述的问题和本篇要介绍的Watermark所涉及的时间类型均是指EventTime类型

Watermark的接口定义

Apache Flink官方文档中有如下定义:

  • Periodic Watermarks - AssignerWithPeriodicWatermarks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

/**
 * Returns the current watermark. This method is periodically called by the
 * system to retrieve the current watermark. The method may return {@code null} to
 * indicate that no new Watermark is available.
 *
 * <p>The returned watermark will be emitted only if it is non-null and itsTimestamp
 * is larger than that of the previously emitted watermark (to preserve the contract of
 * ascending watermarks). If the current watermark is still
 * identical to the previous one, no progress in EventTime has happened since
 * the previous call to this method. If a null value is returned, or theTimestamp
 * of the returned watermark is smaller than that of the last emitted one, then no
 * new watermark will be generated.
 *
 * <p>The interval in which this method is called and Watermarks are generated
 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
 *
 * @see org.Apache.flink.streaming.api.watermark.Watermark
 * @see ExecutionConfig#getAutoWatermarkInterval()
 *
 * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
 */
 @Nullable
 Watermark getCurrentWatermark();

  • Punctuated Watermarks - AssignerWithPunctuatedWatermarks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

public interface AssignerWithPunctuatedWatermarks<T> extendsTimestampAssigner<T> {

/**
 * Asks this implementation if it wants to emit a watermark. This method is called right after
 * the {@link #extractTimestamp(Object, long)} method.
 *
 * <p>The returned watermark will be emitted only if it is non-null and itsTimestamp
 * is larger than that of the previously emitted watermark (to preserve the contract of
 * ascending watermarks). If a null value is returned, or theTimestamp of the returned
 * watermark is smaller than that of the last emitted one, then no new watermark will
 * be generated.
 *
 * <p>For an example how to use this method, see the documentation of
 * {@link AssignerWithPunctuatedWatermarks this class}.
 *
 * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
 */
 @Nullable
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}

AssignerWithPunctuatedWatermarks 继承了TimestampAssigner接口 -TimestampAssigner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

public interfaceTimestampAssigner<T> extends Function {

/**
 * Assigns aTimestamp to an element, in milliseconds since the Epoch.
 *
 * <p>The method is passed the previously assignedTimestamp of the element.
 * That previousTimestamp may have been assigned from a previous assigner,
 * by ingestionTime. If the element did not carry aTimestamp before, this value is
 * {@code Long.MIN_VALUE}.
 *
 * @param element The element that theTimestamp is wil be assigned to.
 * @param previousElementTimestamp The previous internalTimestamp of the element,
 *                                 or a negative value, if noTimestamp has been assigned, yet.
 * @return The newTimestamp.
 */
long extractTimestamp(T element, long previousElementTimestamp);
}

英文可能看着吃力,总结了讲就是Watermark可以在事件中提取EventTime,进而定义一定的计算逻辑产生Watermark的时间戳。

Watermark的产生方式

主流的Flink生成watermark的方式有两种:

  • Punctuated – 严格递增,即每个数据流中的EventTime都会产生一个Watermark。

这就有个问题,会产生大量的watermark,对接收方造成TPS的压力,所以这种方式一般用在实时性要求很高的应用系统中。。

  • Periodic - 周期性的产生一个Watermark(比如多长时间或者多少条数据)。实际使用过程中需要考虑到时间以及数据量两个维度,解决时延大的问题。

怎么用就看你的场景对实时性如何要求。

Watermark再看开篇问题

Watermark定义和Flinkperiodic方式的watermark结合起来看,因为eventTime  产生之后不能再变,而Watermark可以根据实际的系统情况自定义或者干脆跟eventTime 一致,实际上Watermark的产生是在Flink的源节点或有Watermark生成器计算的节点上产生的,Flink内部对单流或多流的场景有统一的Watermark处理。

上面的提到的延迟问题的核心在于EventTime在延迟接受到后该如何处理的问题。要解决这个问题再看一下EventTime window是如何触发的? EventTime window 计算条件是当Window计算的Timer时间戳 小于等于 当前系统的Watermak的时间戳时候进行计算。

  • 当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:
  • 处理数据延迟场景,则Watermark生成策略为 Watermark = EventTime -5s, 如下:

通过上面前后两种方式对比我们知道,能处理延迟的数据核心在于window计算的时候延迟触发了(eventTime – 5s 就是延迟触发)。这个方式也有问题因为延迟计算,所以整个结果也是延迟的,这就是为什么window计算要求实时性高的时候,watermark 不能周期性太大。

有可能数据流不止一条

实际的应用系统中,会从多个source 表去输入数据,然后经过一定的规则映射,去分组处理数据,处理的数据在汇总的到相同的输出节点。这里带来一个问题,不同的source 源(还记得eventTime 是在source处产生的吗?)就必然涉及多个不同的watermark,虽然每个watermark是单调严格递增的,但是多个watermark之间就没这个约束了。这时候如果你去设计flink,如何解决问题?

Flink解决问题这问题说起来也很简单:每一个边上只能有一个递增的Watermark按照这个逻辑去设计,一但有多个EventTime流分组汇总的时候,就挑选eventTime最小的那个waterMark作为统一的汇总的watermark给下游,这样就保证了严格递增。

这个方法其实用数学公式可以很容易推导的:

时间A   W1 W2 W3 W4,   W5  w1…w5 分别表是watermark

时间A+5S  W11  W21  W31  W41   W51  W11..W51分别是在时间A+5s时候的watermark

假设  W3  w1…w5 中最小的watermark ,那么必然有     w1 > w3 , w2 > w3  w4 > w3  w5 > w3

W41 W11..w51中最小的watermark 那么必然有      w11 > w41 ,w21 > w41, w31 > w41, w51 > w41

那么因为对于同一个source watermark内部是严格递增的,则有如下关系:

w11 > w41 > w4 w4 又大于  w3 那么  w41 > w3  ,结论显而易见了。

不同source 源的watermark处理流程图

小结

本文从一个分布式计算场景经典的事件延迟处理问题,引申出了flink中是如何借助 watermark解决的,并拓展了在多个source源的场景下,如何利用多watermark解决递增问题。至此,你就掌握了flink的关键技术watermark机制。

其实在watermark使用过程中会遇到各种各样具体的问题工程问题,这个就需要各位实际上手才会有真实的解决思路与感觉。而且不同的大数据平台对于watermark的使用也有些许的差异。

感谢您的观看。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink核心之watermarker 的相关文章

  • 基于MFC的USB上位机开发(3)数据传输模块

    延伸阅读 基于MFC的USB上位机开发 1 概述 基于MFC的USB上位机开发 2 速度测试模块 基于MFC的USB上位机开发 3 数据传输模块 基于MFC的USB上位机开发 4 环路模块 基于MFC的USB上位机开发 5 下环路模块 目录

随机推荐

  • 虚拟现实的起源、发展、爆发与沉淀

    虚拟现实的三生三世 闲来无事翻篇外文 xff0c 本博主并非故意蹭热点 xff08 奸笑 xff09 xff0c 结尾我会细说为何是三生三世 xff0c 不是五生五世 xff1a 虚拟现实远早于这个概念被创造和形式化之前 在这篇描写虚拟现实
  • 基于卷积的密度统计(一)密度图的生成

    在基于卷积的人数统计中 xff0c 一种是最后回归整张图的人数 xff0c 即输入图像 xff0c 输出人群个数 xff0c 另外一种是回归人群分布密度图 xff0c 即输入图像 xff0c 得到的是密度分布 显然得到密度分布的人数统计方法
  • Python使用国内镜像

    国内镜像地址 xff1a 阿里云 xff1a https mirrors aliyun com pypi simple 清华 xff1a https pypi tuna tsinghua edu cn simple中国科技大学 https
  • [RK3568 Android11] 开发之蓝牙(AP6275S)

    目录 前言 一 设备树dts配置 二 蓝牙打不开 三 蓝牙打开成功 四 修改蓝牙设备名称
  • XFCE菜单列表

    vim etc xdg xfce4 panel xfce4 menu 19 rc use default menu 61 true menu file 61 etc xdg menus xfce applications menu icon
  • X的DISPLAY=:0.0

    ZZ http blog 163 com caizf1987 64 126 blog static 13212128020104611592660 在Linux Unix类操作系统上 DISPLAY用来设置将图形显示到何处 直接登陆图形界面
  • 国外服务器上玩游戏延迟很高,什么原因造成的?

    在国外服务器玩游戏为什么延迟很高 有的比较热门的国际游戏 xff0c 为了玩家通常都会将整个游戏的区服划分为亚服 欧服 美服 东南亚服 韩服等等 xff0c 这主要是为了玩家有个良好的游戏体验 xff0c 那为什么在外服 国外服务器 上玩游
  • 公司抽奖小程序(自定义名单,空格控制滚动、抽奖,可作弊,可满足千人团队, 带可执行程序下载及源代码)

    内含 抽奖小程序 及 名单生成工具 使用时将两个小程序放在 同一目录下 先用名单生成工具生成名单 打开工具 按照提示输入要创建的参与抽奖的人数 输入每个人的编号及姓名 每行一个编号 空格 姓名 打开程序 复制粘贴即可 先用excel或者tx
  • 大厂面试之JAVA核心技能:Slipped Conditions

    聊聊Splipped Condtion 定义一个更现实的例子解决Slipped Conditions问题 定义 所谓Slipped conditions xff0c 就是说 xff0c 从一个线程检查某一特定条件到该线程操作此条件期间 xf
  • 一文带你彻底搞懂Docker中的cgroup

    前言 进程在系统中使用CPU 内存 磁盘等计算资源或者存储资源还是比较随心所欲的 xff0c 我们希望对进程资源利用进行限制 xff0c 对进程资源的使用进行追踪 这就让cgroup的出现成为了可能 xff0c 它用来统一将进程进行分组 x
  • 一文彻底搞懂Docker中的namespace

    什么是namespace namespace是对全局系统资源的一种封装隔离 这样可以让不同namespace的进程拥有独立的全局系统资源 这样改变一个namespace的系统资源只会影响当前namespace中的进程 xff0c 对其它na
  • 带你玩转多进程编程(一)

    前言 之前用加法器的例子一文带你轻松掌握多种范式讲解了多种范式的封装差异得到了很多童鞋的阅读 这次我再通过对进程的封装来给大家继续加深讲解下关于这几种编程范式的差异吧 结构化设计 相对于pthread create 函数 xff0c for
  • 【网易面试题】如何实现一个线程安全的shared_ptr智能指针

    题目描述 网易一面遇到过这么一个题目 xff0c 面试官要求自己实现一个线程安全的shared ptr智能指针 题目分析 可能很多人只知道shared ptr是C 43 43 11模块库的头文件定义的一个智能指针 xff0c 即shared
  • Ubuntu命令行安装Teamviewer

    1 预备安装 更新32位架构 sudo dpkg add architecture i386 更新安装 sudo apt get update 2 使用deb安装 官网下载 xff1a https www teamviewer com en
  • 阿里云数据湖分析急招实习生

    团队介绍 OSS 对象存储的概念可能对很多同学相对比较陌生 xff0c 但其实和大家紧密相关 xff0c 因为整个阿里集团业务以及阿里云外部客户的视频 图片 JS和CSS等资源全部都存储在我们这里 xff0c 可以说国内每天使用互联网应用的
  • 源码剖析Redis中如何使用跳表的

    前言 阿里云今年春招校招面试题 xff0c 面试官问Redis在是如何使用跳表的 xff1f 让很多同学赶到很头疼 今天我们就来讲一讲吧 Sorted Set的结构 redis的数据类型中有序集合 xff08 sorted set xff0
  • 多进程生产者消费者框架设计

    前言 介绍了进程如何基于面向对象的封装 xff0c 本章我们基于封装好的Process类来实现一种无锁版的生产者和消费者框架 xff0c 用它实现了高性能文件拷贝功能 读这篇文章之前大家可以想一下如果是你 xff0c 你会怎么设计这样的框架
  • golang实现简单rpc调用

    RPC通信过程 RPC的通信过程网上介绍很多 xff0c 这里就不在单独介绍了 xff0c 具体过程如下 xff1a 1 Client以本地调用的方式发起调用 xff1b 2 Client stub收到调用后负责将被调用的方法名 参数等打包
  • 一文彻底搞懂leveldb架构

    leveldb leveldb是一个写性能十分优秀的存储引擎 xff0c 是典型的LSM tree的实现 LSM的核心思想是为了换取最大的写性能而放弃掉部分读性能 那么 xff0c 为什么leveldb写性能高 xff1f 简单来说它就是尽
  • flink核心之watermarker

    背景介绍 xff1a 现在的社会 xff0c 人们产生越来越多的数据 xff0c 而数据对每个人人都产生了巨大的影响 比如你去银行贷款 xff0c 那么必然银行要对你做信用评估 xff0c 会涉及到大数据画像等数据分析 比如美团外卖 xff