Flink从入门到真香(17、使用flink table api 输出到文件和kafka)

2023-11-02

对于流式查询,需要声明如何在表和外部连接器之间进行转换
与外部系统交换的消息类型,由更新模式(update model)指定,下面3种,能使用那种模式取决于输出的目标,比如如果输出到文件你就没法用更新和撤回模式,因为不知道,只能追加,但是如果换成mysql就都可以用

  1. 追加模式(Append)--文件系统只支持追加模式
    表只做插入操作,和外部连接器只交换插入(insert)消息
  2. 撤回模式(Retract)--先删除再插入,实现更新操作
    表和外部连接器交换添加(Add)和撤回(Retract)消息
    插入操作(insert)编码为add消息;删除(delete)编码为retract消息;更新(update)编码为上一条的retract和下一条的add消息
  3. 更新插入模式(upsert)
    更新和插入都被编码为upsert消息;删除编码为delete消息

栗子1-从一个文件读出来,做一波操作写到另一个文件

/**
 *
 * @author mafei
 * @date 2020/11/22
 */

package com.mafei.apitest.tabletest

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object FileOutputTest {
  def main(args: Array[String]): Unit = {
    //1 、创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv = StreamTableEnvironment.create(env)
    //2、读取文件
    val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt"
    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv()) //因为txt里头是以,分割的跟csv一样,所以可以用oldCsv
      .withSchema(new Schema() //这个表结构要跟你txt中的内容对的上
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temper", DataTypes.DOUBLE())
      ).createTemporaryTable("inputTable")

    val sensorTable = tableEnv.from("inputTable")

    //做简单转换
    val simpleTramsformTable = sensorTable
      .select("id,temper")
      .filter("id='sensor1'")

    //聚合转换

    val aggTable = sensorTable
      .groupBy('id)
      .select('id, 'id.count as 'count)

    //直接打印输出效果:
    simpleTramsformTable.toAppendStream[(String, Double)].print("simpleTramsformTable: ")

    //聚合的结果就不能用toAppendStream   因为他实现的是后面再来一条数据,表中就会增加一条,但是聚合的不是,是要更新之前的结果
    aggTable.toRetractStream[(String, Long)].print("aggTable")
    /**
     * 输出的效果:
     * aggTable> (true,(sensor1,1))
     * simpleTramsformTable: > (sensor1,1.0)
     * aggTable> (true,(sensor2,1))
     * aggTable> (true,(sensor3,1))
     * aggTable> (true,(sensor4,1))
     * aggTable> (false,(sensor4,1))  //false代表重新计算了
     * aggTable> (true,(sensor4,2))
     * aggTable> (false,(sensor4,2))
     * aggTable> (true,(sensor4,3))
     */

    // 输出到文件中
    val outputPath = "/opt/java2020_study/maven/flink1/src/main/resources/output.txt"

    tableEnv.connect(new FileSystem().path(outputPath))
        .withFormat(new Csv())
        .withSchema(
          new Schema()
            .field("id", DataTypes.STRING())
            .field("temper", DataTypes.DOUBLE())
        )
        .createTemporaryTable("outputTable")
    simpleTramsformTable.insertInto("outputTable")
    env.execute("file ouput")
  }
}

代码结构及运行效果

第二个栗子, 从kakfa的一个topic读出来,写到另一个topic里头

/**
 *
 * @author mafei
 * @date 2020/11/23
 */

package com.mafei.apitest.tabletest

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, Kafka, Schema}

object KafkaOutputTest {
  def main(args: Array[String]): Unit = {
    //1 、创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv = StreamTableEnvironment.create(env)

    //2、从kafka中读取数据
    tableEnv.connect(
      new Kafka()
        .version("0.11")
        .topic("sourceTopic")
        .startFromLatest()
        .property("zookeeper.connect", "localhost:2181")
        .property("bootstrap.servers", "localhost:9092")
    ).withFormat(new Csv())
      .withSchema(new Schema() // 这个表结构要跟你kafka中的内容对的上
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaInputTable")

    val sensorTable = tableEnv.from("kafkaInputTable")

    //做简单转换
    val simpleTramsformTable = sensorTable
      .select("id,temperature")
      .filter("id='sensor1'")

    tableEnv.connect(
      new Kafka()
        .version("0.11")
        .topic("sinkTopic")
        .startFromLatest()
        .property("zookeeper.connect", "localhost:2181")
        .property("bootstrap.servers", "localhost:9092")
    ).withFormat(new Csv())
      .withSchema(new Schema() //这个表结构要跟你kafka中的内容对的上
        .field("id", DataTypes.STRING())
        .field("temper", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaOutputTable")

    simpleTramsformTable.insertInto("kafkaOutputTable")
    env.execute("kafka sink test by table api")
  }
}

这时候就可以起2个窗口,一个窗口往"sourceTopic" 这个topic里面写,Flink程序会从这个topic里面读出来写到"sinkTopic" 这个topic里面,再起一个consumer的命令行去消费这个topic就可以看到效果了

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink从入门到真香(17、使用flink table api 输出到文件和kafka) 的相关文章

  • Java - 将节点添加到列表的末尾?

    这是我所拥有的 public class Node Object data Node next Node Object data Node next this data data this next next public Object g
  • 使用 Android 发送 HTTP Post 请求

    我一直在尝试从 SO 和其他网站上的大量示例中学习 但我无法弄清楚为什么我编写的示例不起作用 我正在构建一个小型概念验证应用程序 它可以识别语音并将其 文本 作为 POST 请求发送到 node js 服务器 我已确认语音识别有效 并且服务
  • 在 HTTPResponse Android 中跟踪重定向

    我需要遵循 HTTPost 给我的重定向 当我发出 HTTP post 并尝试读取响应时 我得到重定向页面 html 我怎样才能解决这个问题 代码 public void parseDoc final HttpParams params n
  • 在 PHP 字符串中格式化 MySQL 代码

    是否有任何程序 IDE 可以在 PHP 字符串中格式化 MySQL 代码 例如 我使用 PHPStorm IDE 但它无法做到这一点 它对 PHP 和 MYSQL 执行此操作 但不适用于 php 字符串内的 MYSQL 我已准备好使用新的
  • INSERT..RETURNING 在 JOOQ 中不起作用

    我有一个 MariaDB 数据库 我正在尝试在表中插入一行users 它有一个生成的id我想在插入后得到它 我见过this http www jooq org doc 3 8 manual sql building sql statemen
  • 如何从shell脚本自动登录MySQL?

    我有一个 MySQL 服务器 其中有一个用户和密码 我想在 shell 脚本中执行一些 SQL 查询而不指定密码 如下所示 config sh MYSQL ROOT root MYSQL PASS password mysql sh sou
  • Spring Data JPA 应用排序、分页以及 where 子句

    我目前正在使用 Spring JPA 并利用此处所述的排序和分页 如何通过Spring data JPA通过排序和可分页查询数据 https stackoverflow com questions 10527124 how to query
  • 我可以使用 HSQLDB 进行 junit 测试克隆 mySQL 数据库吗

    我正在开发一个 spring webflow 项目 我想我可以使用 HSQLDB 而不是 mysql 进行 junit 测试吗 如何将我的 mysql 数据库克隆到 HSQLDB 如果您使用 spring 3 1 或更高版本 您可以使用 s
  • 路径中 File.separator 和斜杠之间的区别

    使用有什么区别File separator和一个正常的 在 Java 路径字符串中 与双反斜杠相反 平台独立性似乎不是原因 因为两个版本都可以在 Windows 和 Unix 下运行 public class SlashTest Test
  • Spring @RequestMapping 带有可选参数

    我的控制器在请求映射中存在可选参数的问题 请查看下面的控制器 GetMapping produces MediaType APPLICATION JSON VALUE public ResponseEntity
  • 无法解析插件 Java Spring

    我正在使用 IntelliJ IDEA 并且我尝试通过 maven 安装依赖项 但它给了我这些错误 Cannot resolve plugin org apache maven plugins maven clean plugin 3 0
  • 从 127.0.0.1 到 2130706433,然后再返回

    使用标准 Java 库 从 IPV4 地址的点分字符串表示形式获取的最快方法是什么 127 0 0 1 到等效的整数表示 2130706433 相应地 反转所述操作的最快方法是什么 从整数开始2130706433到字符串表示形式 127 0
  • 使用Caliper时如何指定命令行?

    我发现 Google 的微型基准测试项目 Caliper 非常有趣 但文档仍然 除了一些示例 完全不存在 我有两种不同的情况 需要影响 JVM Caliper 启动的命令行 我需要设置一些固定 最好在几个固定值之间交替 D 参数 我需要指定
  • 总是使用 Final?

    我读过 将某些东西做成最终的 然后在循环中使用它会带来更好的性能 但这对一切都有好处吗 我有很多地方没有循环 但我将 Final 添加到局部变量中 它会使速度变慢还是仍然很好 还有一些地方我有一个全局变量final 例如android Pa
  • 加密 JBoss 配置中的敏感信息

    JBoss 中的标准数据源配置要求数据库用户的用户名和密码位于 xxx ds xml 文件中 如果我将数据源定义为 c3p0 mbean 我会遇到同样的问题 是否有标准方法来加密用户和密码 保存密钥的好地方是什么 这当然也与 tomcat
  • Java Integer CompareTo() - 为什么使用比较与减法?

    我发现java lang Integer实施compareTo方法如下 public int compareTo Integer anotherInteger int thisVal this value int anotherVal an
  • Java列表的线程安全

    我有一个列表 它将在线程安全上下文或非线程安全上下文中使用 究竟会是哪一个 无法提前确定 在这种特殊情况下 每当列表进入非线程安全上下文时 我都会使用它来包装它 Collections synchronizedList 但如果不进入非线程安
  • 在 MySQL 中存储表情符号的编码问题:如何使用 Prisma ORM 在 NodeJS 中定义字符排序规则?

    亲爱的 Nodejs 专家和数据库专家 我们在 MySQL 数据库中存储表情符号和其他特殊字符时遇到问题 我们使用 Prisma 得到一个错误 这是我们使用的 ORM 参数无法从排序规则 utf8 general ci 转换为 utf8mb
  • 有没有办法为Java的字符集名称添加别名

    我收到一个异常 埋藏在第 3 方库中 消息如下 java io UnsupportedEncodingException BIG 5 我认为发生这种情况是因为 Java 没有定义这个名称java nio charset Charset Ch
  • java.lang.IllegalStateException:驱动程序可执行文件的路径必须由 webdriver.chrome.driver 系统属性设置 - Similiar 不回答

    尝试学习 Selenium 我打开了类似的问题 但似乎没有任何帮助 我的代码 package seleniumPractice import org openqa selenium WebDriver import org openqa s

随机推荐

  • encoder-decoder

    Encoder decoder框架 Encoder decoder框架最抽象的一种表示 即由一个句子生成另一个句子的通过处理模型 对于句子对
  • vulhub打靶第二周

    第二周 信息收集 主机发现 arping 和arp scan 都可以使用 arping 在大多Linux 发行版都默认包含 但是arping无法扫一个网段 可以用shell脚本补足这个缺陷 for i in seq 1 254 do sud
  • nginx 握手失败SSL_do_handshake() failed (SSL: error:1408A0C1:SSL routines:SSL3_GET_CLIENT_HELLO:no share...

    SSL do handshake failed SSL error 1408A0C1 SSL routines SSL3 GET CLIENT HELLO no shared cipher 因为nginx不支持客户端的算法套件 1 可更新n
  • Python 数据采集-爬取学校官网新闻标题与链接(进阶)

    Python 爬虫爬取学校官网新闻标题与链接 进阶 前言 一 拼接路径 二 存储 三 读取翻页数据 四 完整代码展示 五 小结 前言 本文基于学校的课程内容进行总结 所爬取的数据均为学习使用 请勿用于其他用途 准备工作 爬取地址 https
  • mysql bitmap redis_Redis 中 BitMap 的使用场景

    BitMap 原本的含义是用一个比特位来映射某个元素的状态 由于一个比特位只能表示 0 和 1 两种状态 所以 BitMap 能映射的状态有限 但是使用比特位的优势是能大量的节省内存空间 在 Redis 中 可以把 Bitmaps 想象成一
  • C语言文件操作之fgets()

    来说一说fgets 函数 原型 char fgets char s int n FILE stream 参数 s 字符型指针 指向存储读入数据的缓冲区的地址 n 从流中读入n 1个字符 stream 指向读取的流 返回值 1 当n lt 0
  • 2023华为OD机试真题Python实现【食堂供餐/二分法】

    题目内容 某公司员工食堂以盒饭方式供餐 为将员工取餐排队时间降低为0 食堂的供餐速度必须要足够快 现在需要根据以往员工取餐的统计信息 计算出一个刚好能达成排队时间为0的最低供餐速度 即 食堂在每个单位时间内必须至少做出多少份盒饭才能满足要求
  • 使用easyExcel导入大批量数据

    常用来导入excel的工具有poi 但笔者实测中发现 poi导入的excel数据少于5000条时是没有任何问题的 当导入excel里的数据大于5000条时 内存会被占满 从而导致解析错误 导入失败 这种情况俗称 OOM Out Of Mem
  • 震撼!国产自研多环境开发软件 CEC-IDE 问世!遥遥领先!!

    程序员的成长之路 互联网 程序员 技术 资料共享 关注 阅读本文大概需要 2 8 分钟 来自 网络 侵删 震撼到了 厉害 继国产自研浏览器 国产自研操作系统 国产自研手机系统后的全新力作 国产自研 IDE 它就是 CEC IDE 一款由数字
  • Scanvenger游戏制作笔记(一)Unity3D状态机转换

    Scanvenger游戏制作笔记 一 Unity3D状态机转换 前言 一 打开Controller 二 选择 Parameters 创建新的trigger 三 选择状态转换线 四 返回到原状态 选择返回的剪头 系列链接 前言 本文章是我学习
  • 使用yum 源安装nginx

    执行以下命令 添加Nginx到yum源 sudo rpm Uvh http nginx org packages centos 7 noarch RPMS nginx release centos 7 0 el7 ngx noarch rp
  • 树模型-决策树

    树模型 1 决策树 ID3 C4 5 CART 2 随机森林RF 3 Adaboost 4 GBDT 5 XGboost 6 孤立森林 异常检测 一 决策树 决策树是一种基本的分类和回归方法 用于分类主要借助每一个叶子节点对应一种属性判定
  • torchtext 教程 超详细

    torchtext 教程超详细 https www jianshu com p da3a5d5ed2ba
  • 数字电路实验(02)小规模组合逻辑电路实验1:交通灯状态

    数字电路实验 02 小规模组合逻辑电路实验1 交通灯状态 2020 5 11 一 实验要求 1 1 实验目的 1 认识解决实际组合逻辑问题的一般方法和过程 2 熟悉基本逻辑门的使用 1 2 实验器材 1 2输入与门 2 3输入与门 3 4输
  • Java用类实现结构体的功能

    我们都知道C C 里面的结构体在储存数据的时候很方便 但是在Java中没有Struct 但是我们可以用类来实现Struct的功能 与Struct声明功能一样的类的定义 public class platform private int x
  • C++类模板特化全总结

    基础模板一 template
  • CTR预估的几种方式

    1 CTR预估 CTR预估是计算广告中最核心的算法之一 那么CTR预估是指什么呢 简单来说 CTR预估是对每次广告的点击情况做出预测 预测用户是点击还是不点击 具体定义可以参考 CTR CTR预估和很多因素相关 比如历史点击率 广告位置 时
  • 解决unity5.6之前打包的apk在android8.0真机上运行黑屏的bug

    我的项目使用unity5 4 1开发 在Android8 0的时候启动会出现黑屏 同时发现日志中有这一句日志Unable to query for permission Fragment null must be a public stat
  • Spring事务当中propagation=“REQUIRED“和PROPAGATION=“REQUIRES_NEW“的区别

    3 propagation REQUIRED 和PROPAGATION REQUIRES NEW 的区别 官方 PROPAGATION REQUIRED 支持当前事务 如果当前没有事务 就新建一个事务 这是最常见的选择 PROPAGATIO
  • Flink从入门到真香(17、使用flink table api 输出到文件和kafka)

    对于流式查询 需要声明如何在表和外部连接器之间进行转换与外部系统交换的消息类型 由更新模式 update model 指定 下面3种 能使用那种模式取决于输出的目标 比如如果输出到文件你就没法用更新和撤回模式 因为不知道 只能追加 但是如果