Flink intervalJoin 使用 -转载

2023-11-01

1.前言

Flink中基于DataStream的join,只能实现在同一个窗口的两个数据流进行join,但是在实际中常常会存在数据乱序或者延时的情况,导致两个流的数据进度不一致,就会出现数据跨窗口的情况,那么数据就无法在同一个窗口内join。
Flink基于KeyedStream提供的interval join机制,intervaljoin 连接两个keyedStream, 按照相同的key在一个相对数据时间的时间段内进行连接。

2.代码示例

将订单流与订单品流通过订单id进行关联,获得订单流中的会员id。
其中ds1就是订单品流,ds2就是订单流,分别对ds1和ds2通过订单id进行keyBy操作,得到两个KeyedStream,再进行intervalJoin操作;
between方法传递的两个参数lowerBound和upperBound,用来控制右边的流可以与哪个时间范围内的左边的流进行关联,即:
leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound
相当于左边的流可以晚到lowerBound(lowerBound为负的话)时间,也可以早到upperBound(upperBound为正的话)时间。

 

DataStream<OrderItemBean> ds = ds1.keyBy(jo -> jo.getString("fk_tgou_order_id"))
                .intervalJoin(ds2.keyBy(jo -> jo.getString("id")))
                .between(Time.milliseconds(-5), Time.milliseconds(5))
                .process(new ProcessJoinFunction<JSONObject, JSONObject, OrderItemBean>() {

                    @Override
                    public void processElement(JSONObject joItem, JSONObject joOrder, Context context, Collector<OrderItemBean> collector) throws Exception {
                        String order_id = joItem.getString("fk_tgou_order_id");
                        String item_id = joItem.getString("activity_to_product_id");
                        String create_time = df.format(joItem.getLong("create_time"));
                        String member_id = joOrder.getString("fk_member_id");
                        Double price = joItem.getDouble("price");
                        Integer quantity = joItem.getInteger("quantity");
                        collector.collect(new OrderItemBean(order_id, item_id, create_time, member_id, price, quantity));
                    }
                });
ds.map(JSON::toJSONString).addSink(new FlinkKafkaProducer010<String>("berkeley-order-item", schema, produceConfig));

3.Interval Join源码

<1> 使用Interval Join时,必须要指定的时间类型为EventTime

 

image.png

<2>两个KeyedStream在进行intervalJoin并调用between方法后,跟着使用process方法;
process方法传递一个自定义的 ProcessJoinFunction 作为参数,ProcessJoinFunction的三个参数就是左边流的元素类型,右边流的元素类型,输出流的元素类型。

image.png

 

image.png

<3>intervalJoin,底层是将两个KeyedStream进行connect操作,得到ConnectedStreams,这样的两个数据流之间就可以实现状态共享,对于intervalJoin来说就是两个流相同key的数据可以相互访问。
ConnectedStreams的keyby????

<4> 在ConnectedStreams之上执行的操作就是IntervalJoinOperator

 

image.png

 

这里有两个参数控制是否包括上下界,默认都是包括的。

a.initializeState()方法
这里面初始化了两个状态对象,

image.png

 

分别用来存储两个流的数据,其中Long对应数据的时间戳,List<BufferEntry<T1>>对应相同时间戳的数据

b.processElement1和processElement2方法
方法描述的是,当两个流达到之后,比如左边的流有数据到达之后,就去右边的流去查找对应上下界范围内的数据。这两个方法调用的都是processElement方法。

 

private <THIS, OTHER> void processElement(
            final StreamRecord<THIS> record,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
            final long relativeLowerBound,
            final long relativeUpperBound,
            final boolean isLeft) throws Exception {
                
        final THIS ourValue = record.getValue();
        final long ourTimestamp = record.getTimestamp();

        if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                    "interval stream joins need to have timestamps meaningful timestamps.");
        }

        if (isLate(ourTimestamp)) {
            return;
        }

        addToBuffer(ourBuffer, ourValue, ourTimestamp);

        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }

            for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }
    }

(1)获取记录的值和时间戳,判断是否延时,当到达的记录的时间戳小于水位线时,说明该数据延时,不去处理,不去关联另一条流的数据。

 

image.png

 

    private boolean isLate(long timestamp) {
        long currentWatermark = internalTimerService.currentWatermark();
        return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
    }

(2)将数据添加到对应自己流的MapState缓存状态中,key为数据的时间。
addToBuffer(ourBuffer, ourValue, ourTimestamp);

 

private static <T> void addToBuffer(
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer,
            final T value,
            final long timestamp) throws Exception {
        List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
        if (elemsInBucket == null) {
            elemsInBucket = new ArrayList<>();
        }
        elemsInBucket.add(new BufferEntry<>(value, false));
        buffer.put(timestamp, elemsInBucket);
    }

(3)去遍历另一条流的MapState,如果ourTimestamp + relativeLowerBound <=timestamp<= ourTimestamp + relativeUpperBound ,则将数据输出给ProcessJoinFunction调用,ourTimestamp表示流入的数据时间,timestamp表示对应join的数据时间

 

        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }

            for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

对应的collect方法:

 

   private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
        final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

        collector.setAbsoluteTimestamp(resultTimestamp);
        context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

        userFunction.processElement(left, right, context, collector);
    }

设置结果的Timestamp为两边流中最大的,之后执行processElement方法

 

image.png

 

image.png

(4)注册定时清理时间

 

        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }

定时的清理时间,就是当下流入的数据的时间+relativeUpperBound,当watermark大于该时间就需要清理。

 

public void onEventTime(InternalTimer<K, String> timer) throws Exception {

        long timerTimestamp = timer.getTimestamp();
        String namespace = timer.getNamespace();

        logger.trace("onEventTime @ {}", timerTimestamp);

        switch (namespace) {
            case CLEANUP_NAMESPACE_LEFT: {
                long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
                logger.trace("Removing from left buffer @ {}", timestamp);
                leftBuffer.remove(timestamp);
                break;
            }
            case CLEANUP_NAMESPACE_RIGHT: {
                long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
                logger.trace("Removing from right buffer @ {}", timestamp);
                rightBuffer.remove(timestamp);
                break;
            }
            default:
                throw new RuntimeException("Invalid namespace " + namespace);
        }
    }

清理时间逻辑:
假设目前流到达的数据的时间戳为10s,between传进去的时间分别为1s,5s,
upperBound为5s,lowerBound为1s
根据 左边流时间戳+1s<=右边时间戳<=左边流时间戳+5s右边时间戳-5s<=左边流时间戳<=右边时间戳-1s
a。如果为左边流数据到达,调用processElement1方法
此时relativeUpperBound为5,relativeLowerBound为1,relativeUpperBound>0,所以定时清理时间为10+5即15s
当时间达到15s时,清除左边流数据,即看右边流在15s时,需要查找的左边流时间范围
10s<=左边流时间戳<=14s,所以watermark>15s时可清除10s的数据。

image.png

 

b。如果为右边流数据到达,调用processElement2方法
此时relativeUpperBound为-1,relativeLowerBound为-5,relativeUpperBound<0,所以定时清理时间为10s
当时间达到10s时,清除右边流数据,即看左边流在10s时,需要查找的右边流时间范围
11s<=右边流时间戳<=15s,所以可以清除10s的数据。

 

image.png



作者:LZhan
链接:https://www.jianshu.com/p/d457a6dff349
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink intervalJoin 使用 -转载 的相关文章

随机推荐

  • nginx转发后如何获取真实的ip地址

    前言 最近做一个团队的打卡系统 需要通过连接实验室WiFi来判是否人在实验室 网上千篇一律的获取主机ip的方法由于我使用了nginx反向代理 导致获取到的ip地址为127 0 0 1 这肯定是不符合我们验证标准的 还有就是失去了校验的意义了
  • 人工智能笔记

    第一章 绪论 1956年正式提出人工智能 artificial intelligence AI 这个术语并把它作为一门新兴科学的名称 20世纪三大科学技术成就 空间技术 原子能技术 人工智能 智能是知识与智力的总和 知识是一切智能行为的基础
  • Flutter 在MAC环境下jenkins+fastlane+gitlab实现自动打包部署(看这一篇就够了,小而精)

    实现办公局域网下的jenkins服务 Flutter配置 文档 jenkins安装 通过homebrew安装 1 安装homebrew bin bash c curl fsSL https raw githubusercontent com
  • 谷歌播客Google PodCasts解析脚本

    解析某个频道 全部的结果 import json import requests feed url https podcasts google com feed aHR0cHM6Ly93d3cueGltYWxheWEuY29tL2FsYnV
  • 我的LLVM学习笔记——OLLVM混淆研究之FLA篇

    因为要做代码保护 所以抽时间研究了下OLLVM中的三种保护方案 BCF Bogus Control Flow 中文名虚假控制流 FLA Control Flow Flattening 中文名控制流平坦化 SUB Instructions S
  • vue使用wangEditor

    vue版本2 0 editor5 1 23版本 editor for vue 1 0 2版本 api文档入口 效果图 点击查看如何封装 安装步骤入口 npm install wangeditor editor save npm instal
  • 多路选择器MUX总结-IC学习笔记(八)

    多路复用器是一种组合电路 它从许多输入信号中选择一个作为输出 本文先介绍两个MUX的简单应用 主要关于如何将verilog与物理实现对应 第二当MUX作为时钟切换电路时如何避免毛刺 glitch 文章目录 1 1 MUX code与物理实现
  • grafana配置MySQL持久化存储并配置HTTPS

    Grafana 配置 MySQL 数据持久化存储 一 mysql8 0 30 安装 1 1 解压并初始化 MySQL8 0 30 查询是否存在 MariaDB 和 MySQL 存在需要删除后进行安装 rpm qa grep MariaDB
  • 目标检测VOC标注格式中,将斜框标注转化为水平框

    目标检测VOC格式数据集obb标注向hbb标注的转换 polygon 2 bndbox polygon obb 和bndbox hbb 介绍 polygon obb bndbox hbb polygon2bndbox转换原理 polygon
  • unturned服务器修改空投频率,unturned 服务器设置

    unturned 服务器设置 内容精选 换一换 区块链服务状态为 弹性IP异常 排查项 弹性公网IP已 解绑 或被释放 在BCS控制台 服务管理页面中的目标服务卡片中 单击 更多 gt 更新访问地址 查看弹性公网IP 登录网络控制台 查找目
  • ajax success function(data)后的data数据无法使用Uncaught TypeError: Cannot read property ‘xxx‘ of undefined

    问题描述 前端小白 在不了解ajax机制的情况下误使用函数返回ajax中需要时间完成的ajax函数 导致return后的data无法在后面的script代码块中正常使用 function getData ajax type get url
  • procfs使用及字符设备

    以下内容由chatgpt给出 以下是一个使用procfs接口创建设备节点的示例代码 include
  • Android Kotlin SharedFlow

    SharedFlow 会从其中收集值得所有使用方法中发出数据 简而言之就是 像普通的流只可以一方发送 一方接受 而这个流可以一方发送 多方接受 下面上代码演示 SharedFlowFragment package com example a
  • 【C#】-属性(Property)和字段(Field)的区别

    导读 近期学习过程中发现了一些问题 我的学习只是学习 敲代码就是敲代码 没有加入思考 也不问为什么就直接去敲人家写好的例子去敲 把知识都学死了 逐渐散失了思考能力 所以学习的兴趣大打折扣 正如那句话 学而不思则罔 思而不学则殆 在设计模式中
  • c语言 学生信息管理系统设计,大一C语言结课设计之《学生信息管理系统》

    第一次写这么长的程序 代码仅供参考 有问题请留言 学生信息管理系统 IDE Dev Cpp 4 9 9 2 2014 6 15 include include include include include using std sort u
  • 【Redis】回顾Redis知识点之事务机制

    回顾Redis知识点之事务机制 Redis事务机制 为什么 Redis 不支持回滚 roll back 假如事务执行一半的时候Redis宕机怎么办 为什么需要内存回收 上一篇回顾下Redis基础知识点中有简单介绍Redis与Memcache
  • python 一行代码 将小数变成百分数

    数据表 将人数占比和金额占比设置成百分数 先将数据变成保留4位小数点的数 方法一 最简单 result 人数占比 round 4 result 金额占比 round 4 方法二 lambda函数 把数据变成4位小数点的数 写lambda函数
  • 云服务器和传统IDC物理机有什么区别?

    为什么选腾讯云服务器不选传统IDC物理机 云服务器弹性计算使用灵活 上云是趋势 并且性价比高 支持一键升级配置或降级配置 一键部署搭建应用程序环境 一键调整公网带宽 一键镜像复制到另一台服务器 并且支持退款 可以实现分分钟创建多台云服务器实
  • Springboot中使用mabatis_plus拓展包多数据源配置

    1 需求 使用Springboot进行项目开发时 需要访问多个数据库 每个数据库都有各自的作用 需要将数据分开建表存储 1 多数据源配置 两个库业务互不相干 a方法使用a库的数据 b方法使用b库的数据 2 动态数据源配置 两个库业务有关联
  • Flink intervalJoin 使用 -转载

    1 前言 Flink中基于DataStream的join 只能实现在同一个窗口的两个数据流进行join 但是在实际中常常会存在数据乱序或者延时的情况 导致两个流的数据进度不一致 就会出现数据跨窗口的情况 那么数据就无法在同一个窗口内join