Apache Flink SQL 详解与实践

2023-10-31

问题导读

1.为何会有Flink SQL?
2.本文哪些地方涉及Flink 1.7?
4.如何定义源(sources )和接收器(sinks)?
5.Flink SQL本文介绍了哪些sql?
6.将数据格式化为正确的格式以便进一步处理?
7.如何监控Flink sql查询
8.使用Flink SQL中的视图的作用是什么?
9.本文使用Flink sql实现了什么案例?

虽然Flink SQL最初于2016年8月与Flink 1.1.0一起发布,但最近的Flink版本增加了相当多的功能,使Flink SQL更易于使用,无需会编写Java / Scala代码。 在这篇文章中,我们希望(重新)从这些变化所带来的新角度介绍Flink SQL,同时为经验丰富的用户提供一些额外的知识。

新添加的SQL命令行(SQL CLI)可以轻松快速浏览流中的数据或静态数据(例如,在数据库或HDFS中)。 它还可用于构建功能强大的数据转换管道或分析管道。 在这篇文章中,我们想要探索当前可用的功能,而后续文章将更详细地介绍特定功能,并介绍Flink 1.7即将推出的令人兴奋的新功能,例如使用MATCH_RECOGNIZE扩展的复杂event处理和改进 基于时间的enrichment(富集) join。
在我们深入研究一些实践实例之前,我们列出了Flink SQL的一些亮点:
 

  • Flink SQL是批处理和流处理的统一API:这允许使用相同的查询来处理历史数据和实时数据
  • 支持处理时间和事件时间语义
  • 支持使用嵌套的Avro和JSON数据
  • 用户定义的scalar,聚合和表值(table-valued)函数
  • 无需编码的SQL命令行(即没有Java / Scala编码)
  • 支持各种类型的流连接
  • 支持聚合,包括窗口和没有窗口


定义 Sources 和Sinks

使用Flink SQL的命令行客户端时,我们要做的第一件事就是定义源(sources )和接收器(sinks)。 否则,我们将无法读取或写入任何数据。 源和接收器在YAML配置文件中定义,以及其他配置设置。 YAML文件中的源和接收器配置类似于SQL DDL语句(Flink社区目前正在讨论对SQL DDL的支持)。 对于我们正在进行的示例,我们假设我们有一个Kafka主题(topic),其中存储了我们想要进一步处理和分析的出租车游乐设施的信息。 它的配置如下所示:

tables:
  - name: TaxiRides
    type: source
    update-mode: append
    schema:
    - name: rideId
      type: LONG
    - name: rowTime
      type: TIMESTAMP
      rowtime:
        timestamps:
          type: "from-field"
          from: "rideTime"
        watermarks:
          type: "periodic-bounded"
          delay: "60000"
    - name: isStart
      type: BOOLEAN
    - name: lon
      type: FLOAT
    - name: lat
      type: FLOAT
    - name: taxiId
      type: LONG
    - name: driverId
      type: LONG
    - name: psgCnt
      type: INT
    connector:
      property-version: 1
      type: kafka
      version: 0.11
      topic: TaxiRides
      startup-mode: earliest-offset
      properties:
      - key: zookeeper.connect
        value: zookeeper:2181
      - key: bootstrap.servers
        value: kafka:9092
      - key: group.id
        value: testGroup
    format:
      property-version: 1
      type: json
      schema: "ROW(rideId LONG, isStart BOOLEAN, 
rideTime TIMESTAMP, lon FLOAT, lat FLOAT, 
psgCnt INT, taxiId LONG, driverId LONG)"

在Flink SQL中,源,接收器以及介于两者之间的所有内容称为表。 在这里,我们基于包含JSON格式的事件的Kafka主题定义初始表。 我们定义Kafka配置设置,格式以及我们如何将其映射到模式,以及我们希望如何从数据中导出watermarks 。 除了JSON之外,Flink SQL还内置了对CSV和Avro格式的支持,并且还可以使用自定义格式对其进行扩展。 Flink SQL始终支持在JSON和Avro架构中处理嵌套数据。

Flink SQL的使用
现在我们讨论了源表的配置和格式,下面我们说说 Flink SQL的使用
从Flink SQL命令行客户端,我们可以列出我们定义的表:

Flink SQL> SHOW TABLES;
TaxiRides
TaxiRides_Avro

我们还可以检查任何表的schema :

Flink SQL> DESCRIBE TaxiRides;
root
|-- rideId: Long
|-- rowTime: TimeIndicatorTypeInfo(rowtime)
|-- isStart: Boolean
|-- lon: Float
|-- lat: Float
|-- taxiId: Long
|-- driverId: Long
|-- psgCnt: Integer

有了这个,让我们看看我们可以用我们的表做什么。
有关配置Flink SQL以及定义源,接收器及其格式的详细信息,请参阅文档https://ci.apache.org/projects/f ... l#environment-files)。

格式化数据

我们可能想要做的最简单的事情之一是将数据格式化为正确的格式以便进一步处理。 这可能包括:
 

  • 在schema之间转换,例如将JSON事件流转换为Avro编码
  • 用SQL语句中删除字段或将其投影
  • 过滤掉我们不感兴趣的整个事件(events )


让我们看一下从架构转换开始我们将如何做到这些。 当我们想要从Kafka读取数据时,将数据转换为不同的格式,并将数据写回不同的Kafka主题以进行下游处理,我们所要做的就是定义源表(如上所述)然后定义 作为接收器的表格具有不同的格式:

tables:
  - name: TaxiRides_Avro0
    type: sink
    update-mode: append
    schema:
    - name: rideId
      type: LONG
    - name: rowTime
      type: TIMESTAMP
    - name: isStart
      type: BOOLEAN
    - name: lon
      type: FLOAT
    - name: lat
      type: FLOAT
    - name: taxiId
      type: LONG
    - name: driverId
      type: LONG
    - name: psgCnt
      type: INT
    connector:
      property-version: 1
      type: kafka
      version: 0.11
      topic: TaxiRides_Avro
      properties:
      - key: zookeeper.connect
        value: zookeeper:2181
      - key: bootstrap.servers
        value: kafka:9092
      - key: group.id
        value: trainingGroup
    format:
      property-version: 1
      type: avro
      avro-schema: >
          {
            "type": "record",
            "name": "test",
            "fields" : [
              {"name": "rideId", "type": "long"},
              {"name": "rowTime", "type": {"type": "long", "logicalType": "timestamp-millis"}},
              {"name": "isStart", "type": "boolean"},
              {"name": "lon", "type": "float"},
              {"name": "lat", "type": "float"},
              {"name": "taxiId", "type": "long"},
              {"name": "driverId", "type": "long"},
              {"name": "psgCnt", "type": "int"}
            ]
          }

通过我们定义的源和接收器转换数据变得如此简单:

Flink SQL> INSERT INTO TaxiRides_Avro SELECT * FROM TaxiRides;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: ffa9109b9cad077ec83137f55ec6d1c5
Web interface: http://jobmanager:8081

我们的查询作为常设查询提交给Flink集群。可以通过访问http://localhost:8081来监视和控制来自Flink的WebUI的查询。
我们可以通过引入(projection)投影和(filtering)过滤来构建这个简单的模式。 如果我们只想在结果中包含某些字段,我们可以在SELECT查询中指定。 例如:

Flink SQL> INSERT INTO TaxiRides_Avro SELECT rideIdId, taxiId, driverId FROM TaxiRides;

这只会给我们events中的ID。 (请记住,需要调整接收器的格式才能使此查询起作用。)

基于此,我们可以做的另一件简单事情就是过滤掉整个事件。 考虑一下我们只对在某个城市发生的出租车乘坐感兴趣的情况。 事件具有lon和lat字段,分别给出事件发生的经度和纬度。 我们可以使用它们来确定事件是否发生在某个城市:

Flink SQL> SELECT * FROM TaxiRides WHERE isInNYC(lon, lat);

你可能会注意到,那就是isInNYC()。 这是我们在SQL客户端配置中定义的用户定义函数或UDF。 我们可以通过以下方式查看我们提供的用户功能:

Flink SQL> SHOW FUNCTIONS;
timeDiff
toCoords
isInNYC
toAreaId

就像在Flink SQL客户端配置文件中配置的其他内容一样:

functions:
- name: timeDiff
  from: class
  class: com.dataartisans.udfs.TimeDiff
- name: isInNYC
  from: class
  class: com.dataartisans.udfs.IsInNYC
- name: toAreaId
  from: class
  class: com.dataartisans.udfs.ToAreaId
- name: toCoords
  from: class
  class: com.dataartisans.udfs.ToCoords

UDF是实现特定接口并在客户端注册的Java类。 有不同类型的用户功能:(scalar )标量函数,表函数和聚合函数。 其中详细介绍了用户定义的函数,可以查看UDF文档

使用Flink SQL中的视图构建查询
一旦我们有足够复杂的SQL查询,它们就会变得有点难以理解。 我们可以通过在Flink SQL中定义视图来缓解这种情况。 这类似于在编程语言中定义变量以给出某个名称的方式,以便以后能够重用它。 假设我们想要在早期的例子的基础上进行构建,并创建一个在给定日期之后在某个城市发生的游乐设施的视图。 我们会这样做:

Flink SQL> CREATE VIEW TaxiRides_NYC AS SELECT * FROM TaxiRides
  WHERE isInNYC(lon, lat)
  AND rowTime >= TIMESTAMP '2013-01-01 00:00:00';
[INFO] View has been created.

我们可以通过以下方式找出视图:

Flink SQL> SHOW TABLES;
TaxiRides
TaxiRides_Avro
TaxiRides_NYC

需要注意的一点是,创建视图实际上并不实例化任何常设查询或产生任何输出或中间结果。 视图只是可以重用的查询的逻辑名称,并允许更好地构建查询。 这与其他一些类似SQL的流式系统不同,在这些系统中,每个中间查询都会创建数据并使用资源。

视图是Flink 1.7的即将推出的功能,但它已经实现并合并到主分支中( master branch),这就是为什么我们已经在这里提到它。 另外,它非常有用。

基于事件时间的窗口化聚合
作为最后一步,我们希望展示一个更复杂的查询,它将我们到目前为止所解释的内容汇集在一起。 考虑一种情况,我们希望监控正在发生的游乐设施,并且需要知道某个城市某个特定区域的游乐设施数量何时超过阈值(比如说5)。 这是这样做的查询:

SELECT
  toAreaId(lon, lat) AS area,
  TUMBLE_END(rowTime, INTERVAL '5' MINUTE) AS t,
  COUNT(*) AS c
FROM TaxiRides_NYC
WHERE isStart = TRUE
GROUP BY
  toAreaId(lon, lat),
  TUMBLE(rowTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;

在上面的示例中,我们执行以下操作:

  • 我们使用之前创建的视图,其中包含在特定日期之后发生的某个城市的事件,
  • 我们过滤掉那些不是“开始事件”的事件,
  • 我们使用另一个用户定义的函数将lon,lat对转换为区域id和group by,
  • 我们指定我们想要有五分钟的窗口,最后
  • 我们过滤掉那些计数小于5的窗口。


在现实世界的用例中,我们现在将其写入Elasticsearch接收器并使用它为仪表板或通知系统供电。留给大家思考。

总结
在这篇博文中,我们解释了如何在不编写Java代码的情况下使用Flink SQL实现简单的数据转换和数据Massaging作业。 我们还解释了如何使用视图来构建更复杂的查询并使其易于理解。 最后,我们开发了一个更复杂的查询,它结合了用户定义的函数,窗口聚合和事件时间支持。

在后续文章中,我们将提供有关如何开发和使用用户定义函数的更多内容,我们将深入了解Flink SQL的强大连接以及如何使用它们来丰富数据。 在Flink 1.7.0发布之后使用Flink SQL的数据丰富,复杂事件处理和模式检测引入强大的新增功能。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Flink SQL 详解与实践 的相关文章

随机推荐

  • BLE MESH组网(一)简介和基本概念

    BLE MESH组网 一 BLE MESH简介 BLE MESH来源 BLE MESH用处 BLE MESH的通讯方式 管理洪水 市场内蓝牙设备支持 安全性 BLE MESH协议栈模型 BLE MESH基本概念 节点 元素 模型和状态 地址
  • vue+饿了么 点击当前元素之外收起弹框

    v clickoutside指令解决此问题 先引入 import Clickoutside from element ui src utils clickoutside export default directives Clickouts
  • linux系统部署jenkins详细教程

    一 Linux环境 1 下载war包 官网下载地址 https get jenkins io war stable 2 332 4 jenkins war 2 将war包上传至服务器 创建目录 home ubuntu jenkins 上传w
  • AndroidStudio的一些代码恢复功能

    我一个好兄弟 也是一个程序员 一天写代码的时候 他要删除一个apk文件 点击delete的时候 Androidstudio卡了一下 就出问题了 导致他的所有app下面的所有东西全没了 代码 jar包全没了 用过as的都知道 在as中删除文件
  • vulnhub-VULNOS: 2渗透测试靶场

    靶场下载地址VulnOS 2 VulnHub 靶场文件时virtualBox 虚拟机的靶场 vm无法导入 安装vbox之后直接双击靶场文件导入 之后设置网卡为默认桥接模式即可 开机 信息收集 使用nmap 192 168 2 0 24 进行
  • Dockerfile中的CMD和ENTRYPOINT有什么区别?

    本文翻译自 What is the difference between CMD and ENTRYPOINT in a Dockerfile In Dockerfiles there are two commands that look
  • Java正则工具类从地址中提取省市区

    Java正则工具类从地址中提取省市区 最近有个需求 从一串地址中提取出省市区 然后开始寻找解决方案 最终通过网上一些正则 再加上自己改动的 貌似弄成一个比较匹配的工具类 其中代码如下 有需要的可以参考下 其中一些自治区还有直辖市均已兼容 自
  • Linux基础命令

    Linux基础命令 1 ls和cd命令 2 修改文件权限 2 1初识权限 2 2修改权限 3 修改组别 4 查看日志 4 1tail 4 2head 4 3cat 4 4more 4 5less 4 6grep 5 编辑文本 6 创建软链
  • 日志类型汇总

    Slf4j slf4j 的全称是 Simple Loging Facade For Java 即它仅仅是一个为 Java 程序提供日志输出的统一接口 并不是一个具体的日志实现方案 就比如 JDBC 一样 只是一种规则而已 所以单独的 slf
  • python机器学习算法(赵志勇)学习笔记( Logistic Regression,LR模型)

    Logistic Regression 逻辑回归 分类算法是典型的监督学习 分类算法通过对训练样本的学习 得到从样本特征到样本的标签之间的映射关系 也被称为假设函数 之后可利用该假设函数对新数据进行分类 通过训练数据中的正负样本 学习样本特
  • vue遮罩加载动画(可以当作全屏弹窗)

    最终效果 加载动画部分 div span span span span span span span span span span div
  • 免费馅饼【暑期集训I题】【经典DP】

    这不是一道很废脑汁的题目 可以说和前面的数塔相同 只是题目讲的长了些而已 都说天上不会掉馅饼 但有一天gameboy正走在回家的小径上 忽然天上掉下大把大把的馅饼 说来gameboy的人品实在是太好了 这馅饼别处都不掉 就掉落在他身旁的10
  • android 卡片滑动详情页,在Mugeda中制作顺畅的左右滑动切换卡片效果的教程

    之前在做 刁角武汉 的时候对如何选择景点这个问题做了好几个方案 一个是画一张大地图 另一个是做垂直的列表选择 但我还是选择了左右滑动来切换景点 因为在多次尝试之后发现发现在手机屏上似乎不适合做可以上下左右滑动的大地图 而垂直列表在 Muge
  • QThread 事件循环

    对QThread的run函数描述如下 The run implementation is for a thread what the main entry point is for the application All code exec
  • matlab练习程序(最大中值滤波)

    clear clc width 3 xwidth width 1 2 imgn imread 1 bmp imshow imgn imgn double imgn m n size imgn imgn1 imgn z zeros 4 wid
  • java面经整理

    面试问题 一 Java基础 1 jdk1 7到jdk1 8HashMap发生了什么变化 底层 2 jdk1 7到jdk1 8虚拟机发生了什么变化 3 String StringBuilder StringBuffer 4 ArrayList
  • 为什么SELECT * 会导致查询效率低?

    无论在工作还是面试中 关于SQL中不要用 SELECT 都是大家听烂了的问题 虽说听烂了 但普遍理解还是在很浅的层面 并没有多少人去追根究底 探究其原理 废话不多说 本文带你深入了解一下 SELECT 效率低的原因及场景 一 效率低的原因
  • 用Python和selenium下载pdf文件

    今天要从国外的网站上下载一个学术会议的几百篇pdf文献 具体网址为https www onepetro org conferences SPE 17ADIP all start 0 rows 700 这个网站需要登录后手动一篇一篇的下载 非
  • 使用ssh直连docker容器的方法 :解决Connection refused报错

    以root權限進入到docker后重啓ssh服務即可重啓ssh服務 docker exec it u root 容器号 bin bash 重啓ssh服務 service ssh restart
  • Apache Flink SQL 详解与实践

    问题导读1 为何会有Flink SQL 2 本文哪些地方涉及Flink 1 7 4 如何定义源 sources 和接收器 sinks 5 Flink SQL本文介绍了哪些sql 6 将数据格式化为正确的格式以便进一步处理 7 如何监控Fli