低级处理函数ProcessFunction

2023-10-27

原文链接:https://zhuanlan.zhihu.com/p/130708277

1. ProcessFunction定义

ProcessFunction 函数是低阶流处理算子,可以访问流应用程序所有(非循环)基本构建块:

  • 事件 (数据流元素)
  • 状态 (容错和一致性,仅用于keyed stream)
  • 定时器 (事件时间和处理时间,仅用于keyed stream)

ProcessFunction 可以被认为是一种提供了对 KeyedState 和定时器访问的 FlatMapFunction。每在输入流中接收到一个事件,就会调用来此函数来处理。

对于容错的状态,ProcessFunction 可以通过 RuntimeContext 访问 KeyedState,类似于其他有状态函数访问 KeyedState。

定时器可以对处理时间和事件时间的变化做一些处理。每次调用 processElement() 都可以获得一个 Context 对象,通过该对象可以访问元素的事件时间戳以及 TimerService。TimerService 可以为尚未发生的事件时间/处理时间实例注册回调。当定时器到达某个时刻时,会调用 onTimer() 方法。在调用期间,所有状态再次限定为定时器创建的键,允许定时器操作 KeyedState。

如果要访问 KeyedState 和定时器,那必须在 KeyedStream 上使用 ProcessFunction。

2. 内置ProcessFunction

  • ProcessFunction: 用于DataStream
  • KeyedProcessFunction: 用于KeyedStream,keyBy之后的流处理
  • CoProcessFunction: 用于connect连接的流
  • ProcessJoinFunction: 用于join流操作
  • BroadcastProcessFunction: 用于广播
  • KeyedBroadcastProcessFunction: keyBy之后的广播
  • ProcessWindowFunction: 窗口增量聚合
  • ProcessAllWindowFunction: 全窗口聚合

其中ProcessFunction看作是一个具有key state和定时器(timer)访问权的FlatMapFunction。对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件。

如果想要在流处理过程中访问keyed state和定时器,就必须在一个keyed stream上应用ProcessFunction函数,代码如下:

stream.keyBy(...).process(new MyProcessFunction())

3. 使用实例

作为ProcessFunction的扩展(即子类),KeyedProcessFunction在其onTimer(…)方法中提供对计时器key的访问。其模板代码如下所示:

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
    K key = ctx.getCurrentKey();
    // ...
}

在下面的示例中,KeyedProcessFunction维护每个key的计数,并在每过一分钟(以事件时间)而未更新该key时,发出一个key/count对:

  1. 把计数、key和最后修改时间戳(last-modification-timestamp)存储在一个ValueState中, ValueState的作用域是通过key隐式确定的。
  2. 对于每个记录,KeyedProcessFunction递增计数器并设置最后修改时间戳。
  3. 该函数还安排了一个一分钟后的回调(以事件时间)。
  4. 在每次回调时,它根据存储的计数的最后修改时间检查回调的事件时间时间戳,并在它们匹配时发出key/count(即,在该分钟内没有进一步的更新)。

示例: 维护数据流中每个key的计数,并在每过一分钟(以事件时间)而未更新该key时,发出一个key/count对。

1)首先导入必须所依赖包

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;

2)定义存储状态数据的数据结构(数据模型)

/**
 * 存储在状态中的数据类型
 */
public class CountWithTimestamp {

	public String key;           // 存储key
	public long count;           // 存储计数值
	public long lastModified;    // 最后一次修改时间
}

3)自定义ProcessFunction,继承自KeyedProcessFunction:

public class CountWithTimeoutFunction
		extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {

	/** 由这个处理函数负责维护的状态 */
	private ValueState<CountWithTimestamp> state;

	// 首先获得由这个处理函数(process function)维护的状态
        // 通过 RuntimeContext 访问Flink的keyed state
	@Override
	public void open(Configuration parameters) throws Exception {
		state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
	}

	// 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件
	// 对于每个记录,KeyedProcessFunction递增计数器并设置最后修改时间戳
	@Override
	public void processElement(
			Tuple2<String, String> value,
			Context ctx,
			Collector<Tuple2<String, Long>> out) throws Exception {

		// 获取当前的计数
		CountWithTimestamp current = state.value();
		if (current == null) {
			current = new CountWithTimestamp();
			current.key = value.f0;
		}

		// 更新状态计数值
		current.count++;

		// 设置该状态的时间戳为记录的分配的事件时间时间时间戳
                if (ctx != null) {
                	current.lastModified = ctx.timestamp();
                }

                // 将状态写回
		state.update(current);

		// 从当前事件时间开始安排下一个计时器60秒
		ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
		}

	// 如果一分钟内没有进一步的更新,则发出 key/count对
	@Override
	public void onTimer(
			long timestamp,
			OnTimerContext ctx,
			Collector<Tuple2<String, Long>> out) throws Exception {

		// 获取调度此计时器的key的状态
		CountWithTimestamp result = state.value();

		// 检查这是一个过时的计时器还是最新的计时器
		if (timestamp == result.lastModified + 60000) {
			// 超时时发出状态
			out.collect(new Tuple2<String, Long>(result.key, result.count));
		}
	}
}

4)在流处理的主方法中应用自定义的处理函数

public class StreamingJob {
    public static void main(String[] args) throws Exception {
	// 设置流执行环境
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	// 默认情况下,Flink将使用处理时间。要改变这个,可以设置时间特征:
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 源数据流
        DataStream<Tuple2<String, String>> stream = env
                .fromElements("good good study","day day up","you see see you")
                .flatMap(new FlatMapFunction<String, Tuple2<String,String>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, String>> collector) throws Exception {
                        for(String word : line.split("\\W+")){
                            collector.collect(new Tuple2<>(word,"1"));
                        }
                    }
                });

	// 因为模拟数据没有时间戳,所以用此方法添加时间戳和水印
        DataStream<Tuple2<String, String>> withTimestampsAndWatermarks =
                stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, String>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple2<String, String> element) {
                        return System.currentTimeMillis();
                    }
                });

	// 在keyed stream上应用该处理函数
	DataStream<Tuple2<String, Long>> result = withTimestampsAndWatermarks.keyBy(0).process(new CountWithTimeoutFunction());

	// 输出查看
        result.print();

	// 执行流程序
	env.execute("Process Function");
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

低级处理函数ProcessFunction 的相关文章

  • 什么是熔断\降级

    熔断与降级 熔断降级是一种分布式系统的保护机制 用于应对服务不稳定或不可用的情况 熔断是指当某个服务的调用失败次数或异常比例达到一定阈值时 自动切断对该服务的调用 让请求快速失败 避免影响其他服务而导致雪崩效应 熔断后 一段时间内不再调用该

随机推荐

  • Java Frame Panel JFrame JPanel

    Frame 和 JFrame 默认的是BorderLayout布局 而 Panel 和 JPanel 默认的是 FlowLayout布局
  • 使用jxl解析Excel出现的问题

    jxl read biff BiffException Unable to recognize OLE stream at jxl read biff CompoundFile CompoundFile java 116 at jxl re
  • 【Unity】如何将3D模型呈现在2D平面上

    步骤 一 将2D平面所在Canvas的Render Mode改为Screen Space Camera 改成World Space也行 二 将Main Camera拖动到Render Camera处 三 调整3D模型的大小 2D平面和Mai
  • 基于Numpy构建全连接前馈神经网络进行手写数字识别

    文章目录 一 问题描述 二 设计简要描述 三 程序清单 四 结果分析 五 调试报告 六 实验小结 一 问题描述 不使用任何机器学习框架 仅仅通过Numpy库构建一个最简单的全连接前馈神经网络 并用该网络识别mnist提供的手写数字体 二 设
  • 2023年Java面试题_Redis

    Index Redis 基础 1 基本数据结构 1 1 String 字符串 1 1 1 底层结构 1 1 2 相关指令 1 2 List 列表 1 2 1 底层结构 1 2 2 相关指令 1 3 Hash 哈希 k v 1 3 1 底层结
  • 记一次流量攻击的处理方式

    我本人只是做程序开发的 只会一些基础的linux命令和处理 所以在网上找到了不少方案并且尝试 最终限制了本次的流量攻击 现总结起来 供各位参考 1 首先 我们需要统计一下ip连接数 找到请求过多的ip 将其进行封禁 查看代码如下 netst
  • 人工神经网络算法的学习率有什么作用

    神经网络的结构 例如2输入3隐节点1输出 建好后 一般就要求神经网络里的权值和阈值 现在一般求解权值和阈值 都是采用梯度下降之类的搜索算法 梯度下降法 牛顿法 列文伯格 马跨特法 狗腿法等等 这些算法会先初始化一个解 在这个解的基础上 确定
  • [JAVAee]SpringBoot日志文件

    目录 日志的作用 SpringBoot中的日志 框架说明 日志对象的获取 日志的分类 日志的级别设置 日志的打印 日志的持久化 日志的作用 日志可以帮助我们发现程序的问题并进行定位 日志还可以记录用户的登录信息 分析用户的意图 日志能记录程
  • Vue详情页面el-row el-col做出word样式效果和打印(element-ui)

    场景 业务给了个word文档 然后说要前端可以看到样式如文档 并且可以打印出来 记录一下 element ui word表格样式详情页面 vue页面打印 更细的内容可以查看下面两篇文章原文 样式参考文章 elementUI自定义查看详情组件
  • ModuleNotFoundError: No module named ‘tensorflow‘错误

    环境 win10 64位 情境 eclipse运行python文件 错误 ModuleNotFoundError No module named tensorflow 分析 没有安装tensorflow包 解决方法 pip install
  • QT QProcess执行终端命令并实时输出回显

    https blog csdn net weixin 43690347 article details 84146821 utm medium distribute pc aggpage search result none task bl
  • 【ISP】低亮度图片增强方法(1)

    本文介绍改进INDANE算法的低照度图像增强改进算法 AINDANE算法 Adaptive and integrated neighborhood dependent approach for nonlinear enhancement o
  • 如何用RDP来连接计算机上的WSL2(Ubuntu)图形界面(要求安装Gnome桌面)

    您可以使用 Remote Desktop Protocol RDP 连接 Windows Subsystem for Linux WSL 中的 Ubuntu 系统的图形界面 需要安装 Gnome 桌面 在 Ubuntu 系统中安装并启动 V
  • TCP/IP四层模型简述

    1 TCP IP协议是由七层模型简化成四层而来 七层有底向上分别是 物理层 数据链路层 网络层 传输层 会话层 表示层 应用层 简化后的四层分别是 主机到网络层 比特 网络层 数据帧 传输层 数据包 应用层 数据段 每一层对于上一层来讲是透
  • axure读取服务器文件,axure 云服务器

    axure 云服务器 内容精选 换一换 监控是保持弹性云服务器可靠性 可用性和性能的重要部分 通过监控 用户可以观察弹性云服务器资源 为使用户更好地掌握自己的弹性云服务器运行状态 公有云平台提供了云监控 您可以使用该服务监控您的弹性云服务器
  • (pytorch进阶之路)Masked AutoEncoder论文及实现

    文章目录 1 导读 2 论文地址 3 代码实现思路 3 1 预处理阶段 3 2 Encoder 3 3 Decoder 3 4 fine tuning 3 5 linear probing 3 6 evaluation 4 代码地址 5 如
  • 闪烁回路的例子 三菱PLC ST语言 梯形图

    闪烁回路的例子 使可编程控制器运行 通过初始脉冲 M8002 驱动状态S3 在状态S3中输出Y000 1秒钟以后转移到状态S20 在状态S20中输出Y001 1 5秒钟以后返回状态S3 ST SET M8002 S3 STL TRUE S3
  • Docker 中国官方镜像加速

    通过 Docker 官方镜像加速 中国区用户能够快速访问最流行的 Docker 镜像 该镜像托管于中国大陆 本地用户现在将会享受到更快的下载速度和更强的稳定性 从而能够更敏捷地开发和交付 Docker 化应用 Docker 中国官方镜像加速
  • vue3+ts+element-plus 列表查询条件/筛选条件组件二次封装(条件查询组件新增继承第三方组件事件)

    2023 06 08 条件查询组件新增继承第三方组件事件 效果如下 一 需求 对于后台管理系统项目必不可少的就是 增删改查 而 查 就会根据表格的列数来显示多少个查询 筛选条件 为了方便因此封装了查询条件 筛选条件 组件 二 组件功能 1
  • 低级处理函数ProcessFunction

    原文链接 https zhuanlan zhihu com p 130708277 1 ProcessFunction定义 ProcessFunction 函数是低阶流处理算子 可以访问流应用程序所有 非循环 基本构建块 事件 数据流元素