Apache Flink 使用DataStream API进行数据处理

2023-11-03

问题导读

1.流处理和批处理分别入口是什么?
2.对于本地和远程运行程序,都可以使用哪个函数?
3.Flink数据源分为哪两类?
4.Flink DataStream和DataSet source都是基于什么格式?
5.Flink中kafka source是否为自定义?
执行环境
为了开始编写Flink程序,我们首先根据自己的需要,可以获得现有的或创建一个执行环境(executionenvironment)。 Flink支持:

  • 获取一个已经有的environment
  • 创建一个本地environment
  • 创建一个远程environment


通常,只需要使用getExecutionEnvironment()。 它会根据你的环境来选择。 如果你在IDE中的本地环境中执行,那么它将启动本地执行环境。 否则,如果正在执行JAR,则Flink集群管理器将以分布式方式执行该程序。

如果要自己创建本地或远程环境,则还可以选择使用createLocalEnvironment()和createRemoteEnvironment(String host,int port,String和.jar文件)等方法来执行此操作。
那么这个具体怎么使用,下面给大家分别给出批处理程序和流处理程序。

流处理程序:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print()

    env.execute("Window Stream WordCount")
  }
}

批处理程序:【补充参考

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}

从上面我们看出批处理和流处理他们获取实例都是通过getExecutionEnvironment方法,而StreamExecutionEnvironment代表的是流处理。ExecutionEnvironment代表的是批处理。

数据源
Sources是Flink从中获取数据的地方。  Flink支持许多预先实现的数据源功能。 它还支持编写自定义数据源函数,因此可以轻松编程任何不受支持的函数。 首先让我们尝试理解内置的源函数。

由于以前版本中,Flink只有流处理,因此下面是介绍的Flink1.7以前的版本,大体了解即可

Flink以前版本

DataStream API
基于Socket
DataStream API支持从套接字读取数据。 只需指定要从中读取数据的主机和端口,它就可以完成工作:

socketTextStream(hostName, port);

还可以选择指定分隔符:

socketTextStream(hostName,port,delimiter)

还可以指定API尝试获取数据的最大次数:

socketTextStream(hostName,port,delimiter,maxRetry)

基于文件
还可以选择使用Flink中基于文件的源函数从文件源流式传输数据。 可以使用readTextFile(String path)从路径中指定的文件中流式传输数据。 默认情况下,它将读取TextInputFormat并逐行读取字符串。
如果文件格式不是文本,则可以使用以下函数指定相同的格式:

readFile(FileInputFormat<Out> inputFormat, String path)

Flink还支持,使用readFileStream()读取文件流

readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType)

只需指定文件路径,轮询文件路径的轮询间隔以及监视类型。 监控类型包括三种类型:
 

  • 当系统应仅处理新文件时使用FileMonitoringFunction.WatchType.ONLY_NEW_FILES
  • 当系统仅追加文件内容时使用FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED
  • 当系统不仅要重新处理文件的追加内容而且还要重新处理文件中的先前内容时,将使用FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED


如果文件不是文本文件,那么我们可以选择使用以下函数,它允许我们定义文件输入格式:

readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

在内部,它将读取文件任务划分为两个子任务。 一个子任务仅根据给定的WatchType监视文件路径。 第二个子任务并行执行读取文件。监控文件路径的子任务是不是并行子任务。 它的工作是根据轮询间隔扫描文件路径并报告要处理的文件,拆分文件,并将拆分分配给相应的下游线程:

Flink1.7版本
DataStream API
source依然是程序从中读取输入的位置。 可以使用StreamExecutionEnvironment.addSource(sourceFunction)将源附加到程序。 Flink附带了许多预先实现的源函数,但可以通过-非并行Source实现SourceFunction,或者通过实现ParallelSourceFunction接口或为并行源扩展RichParallelSourceFunction来编写自己的自定义源。

可以从StreamExecutionEnvironment访问几个预定义的流源:
这里我们可以跟以前的版本比较增加Collection-based,一些函数页发生了变化。

基于文件
readTextFile(path) - 读取文本文件,即逐行读取,并将它们作为字符串返回。
readFile(fileInputFormat,path) - 按指定的文件输入格式指定读取(一次)文件。
readFile(fileInputFormat,path,watchType,interval,pathFilter) - 这是前两个内部调用的方法。 它根据给定的fileInputFormat读取路径中的文件。 根据提供的watchType,此source可以定期监控(每隔ms)新数据的路径(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理当前在路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。 使用pathFilter,用户可以进一步排除处理文件。这里篇幅有限更多信息可查看链接

基于Socket
socketTextStream  - 从socket读取。 元素可以用分隔符分隔。详细可查看上文

基于Collection
fromCollection(Seq) - 从Java Java.util.Collection创建数据流。 集合中的所有元素必须属于同一类型。

fromCollection(Iterator) - 从迭代器创建数据流。 该类指定迭代器返回的元素的数据类型。

fromElements(elements:_ *) - 从给定的对象序列创建数据流。 所有对象必须属于同一类型。

fromParallelCollection(SplittableIterator) - 并行的从迭代器创建数据流。 该类指定迭代器返回的元素的数据类型。

generateSequence(from,to) - 并行生成给定间隔中的数字序列。


自定义
addSource  - 附加新的source函数。 例如,要从Apache Kafka读取,可以使用addSource(new FlinkKafkaConsumer08 <>(...))。 请参阅连接器以获取更多内容。

由于Flink1.7最新版本,这里也把DataSet API
补充上:
DataSet API


DataSet API 跟DataStream API 既有相同也有区别。

基于文件
readTextFile(path)/ TextInputFormat  - 按行读取文件并将它们作为字符串返回。

readTextFileWithValue(path)/ TextValueInputFormat  - 按行读取文件并将它们作为StringValues返回。 StringValues是可变字符串。

readCsvFile(path)/ CsvInputFormat  - 解析逗号(或其他字符)分隔字段的文件。 返回元组,案例类对象或POJO的DataSet。 支持基本java类型及其Value对应作为字段类型。

readFileOfPrimitives(path,delimiter)/ PrimitiveInputFormat  - 使用给定的分隔符解析新行(或其他char序列)分隔的原始数据类型(如String或Integer)的文件。

readSequenceFile(Key,Value,path)/ SequenceFileInputFormat  - 创建JobConf并从指定路径读取文件,类型为SequenceFileInputFormat,Key class和Value类,并将它们返回为Tuple2 <Key,Value>。

基于Collection
fromCollection(Seq) - 从Seq创建数据集。 集合中的所有元素必须属于同一类型。

fromCollection(Iterator) - 从迭代器创建数据集。 该类指定迭代器返回的元素的数据类型。

fromElements(elements:_ *) - 根据给定的对象序列创建数据集。 所有对象必须属于同一类型。

fromParallelCollection(SplittableIterator) - 并行地从迭代器创建数据集。 该类指定迭代器返回的元素的数据类型。

generateSequence(from,to) - 并行生成给定间隔中的数字序列。

通用
readFile(inputFormat,path)/ FileInputFormat  - 接受文件输入格式。

createInput(inputFormat)/ InputFormat  - 接受通用输入格式。

例子:

val env  = ExecutionEnvironment.getExecutionEnvironment

// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")

// read text file from a HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
  "hdfs:///the/CSV/file",
  pojoFields = Array("name", "age", "zipcode"))

// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")

// generate a number sequence
val numbers = env.generateSequence(1, 10000000)

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
"hdfs://nnHost:nnPort/path/to/file")

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Flink 使用DataStream API进行数据处理 的相关文章

  • Flink实战之实时风控规则引擎

    问题导读 1 怎样构建一个风控业务架构 2 风控规则模型有哪些 3 怎样实现Flink CEP 动态更新 一 项目背景 目前钱大妈基于云原生大数据组件 DataWorks MaxCompute Flink Hologres 构建了离线和实时
  • Flink之IntervalJoin介绍

    InterValJoin算子 间隔流 一条流去join另一条流去过去一段时间内的数据 该算子将keyedStream与keyedStream转化为DataStream 再给定的时间边界内 默认包含边界 相当于一个窗口 按指定的key对俩个K
  • Flink 1.17教程:聚合算子(Aggregation)之按键分区(keyBy)

    聚合算子 Aggregation 计算的结果不仅依赖当前数据 还跟之前的数据有关 相当于要把所有数据聚在一起进行汇总合并 这就是所谓的 聚合 Aggregation 类似于MapReduce中的reduce操作 按键分区 keyBy 对于F
  • 使用arthas在线诊断flink的那些事

    最近在使用arthas诊断工具 诊断java服务的一些问题 突然想到能不能使用arthas诊断flink的jobManager和taskManager呢 答案是可以的 采用javaagent 在flink启动jobmanager和taskM
  • Apache Flink不止于计算,数仓架构或兴起新一轮变革

    2021 年初 在 InfoQ 编辑部策划的全年技术趋势展望中 我们提到大数据领域将加速拥抱 融合 或 一体化 演进的新方向 本质是为了降低大数据分析的技术复杂度和成本 同时满足对性能和易用性的更高要求 如今 我们看到流行的流处理引擎 Ap
  • Macbook Pro 鼠标卡顿问题

    Macbook Pro 鼠标卡顿问题 目前无解 只能改善 该问题最早能追溯到 2015年 https jingyan baidu com article ff42efa93632c5c19e220208 html 原因 据说是无线频段冲突
  • 微众银行DSS部署单机-普通版

    DSS 普通版部署 我的服务器 我的配置 vim conf config sh vim conf db sh QA 我的服务器 centos 7 0 8C16G 100G机械硬盘 我的配置 bashrc文件内容 JDK export JAV
  • Flink---1、概述、快速上手

    1 Flink概述 1 1 Flink是什么 Flink的官网主页地址 https flink apache org Flink的核心目标是 数据流上有状态的计算 Stateful Computations over Data Stream
  • Flink State 和 Fault Tolerance详解

    有状态操作或者操作算子在处理DataStream的元素或者事件的时候需要存储计算的中间状态 这就使得状态在整个Flink的精细化计算中有着非常重要的地位 记录数据从某一个过去时间点到当前时间的状态信息 以每分钟 小时 天汇总事件时 状态将保
  • Flink常用算子总结

    Streaming 算子 Map 将元素处理转换 再输出 map算子对一个DataStream中的每个元素使用用户自定义的Mapper函数进行处理 每个输入元素对应一个输出元素 最终整个数据流被转换成一个新的DataStream 输出的数据
  • Flink checkPoint和SavePoint

    savepoint和checkpoint都是flink为容错提供的强大功能特性 能够自动或手动保存job的运行状态 两者区别 checkpoint 应用定时触发 用户保存状态 会过期 内部应用失败重启的时候启用 但是手动cancel时 会删
  • flink 1.4版本flink table方式消费kafka写入hive方式踩坑

    最近在搞flink 搞了一个当前比较新的版本试了一下 当时运行了很长时间 hdfs里面查询有文件 但是hive里面查询这个表为空 后面用了很多种方式 一些是说自己去刷新hive表 如下 第一种方式刷新 alter table t kafka
  • flink学习42:tableAPI的join、union、排序、插入操作

    连接 内连接 外连接 集合操作 union 获取交集 获取差集 in 操作 排序操作 插入操作
  • Apache Flink Checkpoint 应用实践

    Checkpoint 与 state 的关系 Checkpoint 是从 source 触发到下游所有节点完成的一次全局操作 下图可以有一个对 Checkpoint 的直观感受 红框里面可以看到一共触发了 569K 次 Checkpoint
  • 【基础】Flink -- ProcessFunction

    Flink ProcessFunction 处理函数概述 处理函数 基本处理函数 ProcessFunction 按键分区处理函数 KeyedProcessFunction 定时器与定时服务 基于处理时间的分区处理函数 基于事件时间的分区处
  • flink学习之state

    state作用 保留当前key的历史状态 state用法 ListState
  • Flink_05_状态(个人总结)

    声明 1 本文为我的个人复习总结 并非那种从零基础开始普及知识 内容详细全面 言辞官方的文章 2 由于是个人总结 所以用最精简的话语来写文章 3 若有错误不当之处 请指出 状态 状态就是一块内存 一个变量 如果要访问历史窗口 或批次 的数据
  • Apache Flink(十五):Flink任务提交模式

    个人主页 IT贫道 大数据OLAP体系技术栈 Apache Doris Clickhouse 技术 CSDN博客 私聊博主 加入大数据技术讨论群聊 获取更多大数据资料 博主个人B栈地址 豹哥教你大数据的个人空间 豹哥教你大数据个人主页 哔哩
  • 【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(1)- 窗口介绍、分类、函数

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比

随机推荐

  • notepad: 怎么在notepad里面,将字符串替换成换行

    用Notepad 可以 利用查找和替换功能 选择正则表达式 查找目标框里输入你想要替换的字符串 替换为框里输入 r 点击替换即可 这是个 很强大的功能 利用还它可以批量替换任何字符串 比如你链接是123 123 123 123这样不换行的
  • python 实现 熵值法 确定指标权重

    步骤 设指标共p个 评价对象共g个 则构成评价值 得分 矩阵如下 xj i 表示评价对象j在指标i上的评价值 j 1 2 g i 1 2 p 指标i对应的熵值为ei的计算公式如下 根据熵值来计算指标i的权重wi 计算公式如下 程序 4个指标
  • pm2 进程管理工具,相关命令

    1 pm2需要全局安装 npm install g pm2 2 进入项目根目录 2 1 启动进程 应用 pm2 start bin www 或 pm2 start app js 2 2 重命名进程 应用 pm2 start app js n
  • minicom安装、配置和使用

    在开发过程中 我们经常需要通过串口连接Android开发板的底层系统 Linux QNX 等 minicom是一个常用串口连接终端软件 在命令行终端下通过文本界面进行操作使用 安装 sudo apt install minicom 配置 首
  • BufferedReader与FileReader及FileInputStream

    BufferedReader 是缓冲字符输入流 它继承于Reader BufferedReader 的作用是为其他字符输入流添加一些缓冲功能 BufferedReader的作用 从字符输入流中读取文本 缓冲各个字符 从而实现字符 数组和行的
  • C# Ocr离线式识别,文字提取,(附源码下载)

    源代码下载 效果图 文字内容提取后填充 JObject obj ocr GeneralBasic bt ops this richTextBox1 Text string str if obj Count gt 0 JArray jo JA
  • LeetCode - 移除元素

    一 题目描述 给定一个数组 nums 和一个值 val 你需要原地移除所有数值等于 val 的元素 返回移除后数组的新长度 不要使用额外的数组空间 你必须在原地修改输入数组并在使用 O 1 额外空间的条件下完成 元素的顺序可以改变 你不需要
  • GBase8a MPP Cluster 安装部署——操作系统配置建议

    编写目的 本文档面向GBase 8a产品的售后人员 用户使用人员 以及广大GBase 8a感兴趣的技术人员 以便用于指导其更好的完成GBase8a MPP Cluster 安装部署工作 对硬件配置 网络环境 操作系统及软件配置等系统实施过程
  • vue 接口数据返回之后再渲染页面_Vue怎么让数据请求成功以后再渲染页面?

    需求如下 进入页面有一个检测按钮 点击即可向后端请求数据 进入页面如果不点击检测 则显示如下 点击检测 如果返回的是正常的状态则显示 如果返回的状态是异常 则显示 目前有个BUG 就是点击检测的时候 先从变为 然后马上变为 因为逻辑里面我是
  • IJCAI2023 Summary Reject公布

    点击文末公众号卡片 找对地方 轻松参会 北京时间2023年2月25日上午6点四十左右 cmt上状态已变 分为awaiting list 和reject 此前不少人预测2月24日晚上八点或凌晨两点左右出 截至2023年2月25日 7 16 a
  • 调用TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()时遇到的一些问题

    之前在其他地方写的 一直要求手机验证 之前能跳过 麻烦点就麻烦点了 今天编辑文章的时候直接不能改了 无奈 如果手动调用 TransactionAspectSupport currentTransactionStatus setRollbac
  • mybatis获取插入数据时自动生成的主键id

    mapper文件 void insert Map
  • LaTeX

    简介 首先要介绍一下我用的Visio文件转为 eps的办法 vsd 文件 利用Visio打开 然后另存为 选择存为 pdf文件 pdf文件 利用Inkscape打开 然后另存为 选择存为 eps格式 之前一直用visio2010版 然后按照
  • 【React+TS】从零开始搭建react+typescript+router+redux+less+px2rem自适应+axios反向代理+别名@+Antd-mobile

    一 通过create react app脚手架创建项目 npx create react app testproject template typescript 在vscode中打开项目 可以看到顺利生成了react项目且组件的后缀为tsx
  • Java web项目创建笔记23 之《spring整合xxl-job》

    xxl job是一款功能强大的分布式任务调度系统 部署方法按照官网写的说明即可 https www xuxueli com xxl job 1 下载release版本代码 https github com xuxueli xxl job r
  • 先电Openstack云平台搭建【超级详细】【附带镜像】

    前言 大二上学期学习Openstack 苦于百度与CSDN上没有对应版本的教程 学的十分艰难 在此 将我的Openstack云平台搭建过程写出 留给新手学习 准备工作 VMware Workstation Pro 虚拟机 我使用版本 15
  • C++模板,模板具体化,特例化

    1 模板重载原则 函数同名 重载 时 调用优先级通常为 普通函数 gt 显式具体化 template specilazation gt 显式实例化 gt 一般模版函数 但更一般而言 有两条规则 1 gt 如果各自函数形参和调用处的实参 并非
  • Java锁的基本用法

    文章目录 Java锁的基本用法 synchronized和lock synchronized 首先在没有加锁的情况下 加锁的情况 Lock 首先在没有加锁的情况下 加锁的情况下 线程的通信 synchronized 通过wait和notif
  • Js 代替eval的方法 字符串转对象

    js中常用eval 函数将一个字符串当作一个JavaScript表达式一样去执行 但在安全漏洞上是存在隐患的 现找到eval函数的替代方法 let a custId 9860131056 custName custAdd const res
  • Apache Flink 使用DataStream API进行数据处理

    问题导读1 流处理和批处理分别入口是什么 2 对于本地和远程运行程序 都可以使用哪个函数 3 Flink数据源分为哪两类 4 Flink DataStream和DataSet source都是基于什么格式 5 Flink中kafka sou