【Flink入门】Flink自定义Source读取MySQL数据

2023-11-11

在前一篇博客中 【Flink入门】Flink读取Kafka数据Demo 已经简单介绍了Flink读取Kafka数据并通过Demo进行实践,这篇博客简单介绍Flink通过自定义Source读取MySQL数据并通过Demo进行演练。

首先我们来简单了解下SourceFunction 接口,它是所有 stream source 的根接口,它继承自一个标记接口(空接口)Function。

在IDEA中打开SourceFunction,按下图右击鼠标,选择Diagrams–>show Diagrams
在这里插入图片描述

SourceFunction 定义了两个接口方法:
在这里插入图片描述

1、run : 启动一个 source,即对接一个外部数据源然后 emit 元素形成 stream(大部分情况下会通过在该方法里运行一个 while 循环的形式来产生 stream)。
2、cancel : 取消一个 source,也即将 run 中的循环 emit 元素的行为终止。
正常情况下,一个 SourceFunction 实现这两个接口方法就可以了。其实这两个接口方法也固定了一种实现模板。

接下来通过Demo实现
首先 pom.xml 中添加 MySQL 依赖:

<dependency>
     <groupId>mysql</groupId>
     <artifactId>mysql-connector-java</artifactId>
     <version>5.1.27</version>
</dependency>

MySQL数据库建表

drop table if exists user_order_count;
create table user_order_count (
user_id varchar(25) NOT NULL,
count int(11),
primary key (user_id)
) engine=innodb default charset=utf8 collate=utf8_bin;

导入模拟数据

insert into user_order_count values ('16935394', 6), ('16374609', 4), ('16570065', 4), ('4611433', 3), ('17308713', 3);

新建对应的实体类:UserOrderCount

package com.fuyun.flink.model;

public class UserOrderCount {
    public String userId;
    public int count;

    public UserOrderCount() {
    }

    public UserOrderCount(String userId, int count){
        this.userId = userId;
        this.count = count;
    }
    @Override
    public String toString() {
        return "UserOrderCount{" +
                "userId=" + userId +
                ", count=" + count +
                '}';
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }
}

新建 Source 类 SourceFromMySQL.java,该类继承 RichSourceFunction ,实现里面的 open、close、run、cancel 方法:

package com.fuyun.flink.souce;

import com.fuyun.flink.model.UserOrderCount;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class SourceFromMySQL extends RichSourceFunction<UserOrderCount> {
    PreparedStatement ps;
    private Connection connection;

    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "select * from user_order_count;"; // 编写具体逻辑代码
        ps = this.connection.prepareStatement(sql);
    }

    /**
     * 程序执行完毕就可以进行,关闭连接和释放资源的动作了
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) { //关闭连接和释放资源
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    @Override
    public void run(SourceContext<UserOrderCount> ctx) throws Exception {
        ResultSet resultSet = ps.executeQuery(); // 执行SQL语句返回结果集
        while (resultSet.next()) {
            UserOrderCount userOrderCount = new UserOrderCount(
                    resultSet.getString("user_id").trim(),
                    resultSet.getInt("count"));
            ctx.collect(userOrderCount);
        }
    }

    @Override
    public void cancel() {
    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://bigdata-training.fuyun.com:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
        }
        return con;
    }
}

Flink主程序

package com.fuyun.flink

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import com.fuyun.flink.souce.SourceFromMySQL

object SourceMain {
  def main(args: Array[String]): Unit = {
    // 创建流处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment()

    env.addSource(new SourceFromMySQL).print

    env.execute("Flink add data sourc")
  }
}

运行结果:
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【Flink入门】Flink自定义Source读取MySQL数据 的相关文章

  • 经典SQL题目-求第N高的薪水的解法汇总及知识点复习

    这几天在看Leetcode的时候逐步开始留意SQL题目 不做不知道 一做才感觉自己的SQL太弱了 因此将一道经典题目 求第N高的薪水的解法进行汇总 MySQL 相关解法的原文链接已标注在文末 题目的链接为 第N高的薪水 一 题干 第N高的薪
  • [云原生专题-45]:Kubesphere云治理-基于Kubernetes 构建的企业级容器平台简介与总体架构

    作者主页 文火冰糖的硅基工坊 文火冰糖 王文兵 的博客 文火冰糖的硅基工坊 CSDN博客 本文网址 https blog csdn net HiWangWenBing article details 122905834 目录 前言 第1章
  • pip换源+更改默认安装位置

    本文档创建于2023年3月9日 本文记录了pip换源和更改默认安装位置的操作 主要用于pip的一些配置 方便下载和文件管理 pip换源 使用pip安装库时 如果用默认的库经常会遇到连接不上或下载慢的问题 更换为国内的库下载会更快 临时换源
  • 使用TCP方式拉取Canal数据

    1 Canal对接Kafka联调 1 1 配置修改 canal properties 修改 zk canal zkServers 10 51 50 219 2181 instance properties 开启配置项 canal mq dy
  • 1056 组合数的和

    给定 N 个非 0 的个位数字 用其中任意 2 个数字都可以组合成 1 个 2 位的数字 要求所有可能组合出来的 2 位数字的和 例如给定 2 5 8 则可以组合出 25 28 52 58 82 85 它们的和为330 输入格式 输入在一行
  • Springboot3 + SpringSecurity + JWT + OpenApi3 实现认证授权

    Springboot3 SpringSecurity JWT OpenApi3 实现双token 目前全网最新的 Spring Security JWT 实现双 Token 的案例 收藏就对了 欢迎各位看友学习参考 此项目由作者个人创作 可
  • 即使失业,也要把第二个一万小时坚持下去

    这个月打的我有点懵逼 不知所措了 所以 在此写贴 即使失业 也要把第二个一万小时坚持下去 每天8小时学习 反正已经非工资收入九千了 基本上可以活下去了

随机推荐

  • Karma 自动化测试框架搭建文档

    一 前言 此文档为前端自动化单元测试框架 Karma 的搭建以及使用文档 二 准备环境 先列出我们此次搭建测试框架 Karma 必须的环境和包 1 node js node 引擎 2 npm node 包管理器 3 cnpm 可选 淘宝镜像
  • 数列分段(贪心入门)

    问题 对于给定的一个长度为 n 的正整数数列 ai 现要将其分成连续的若干段 并且每段和不超过 m 可以等于 m 问最少能将其分成多少段使得满足要求 算法复杂度为O n 思路 对于已给出数列 从前往后扫描一遍 在扫描过程中 不断记录当前最大
  • win10maven环境变量配置(简洁版):

    准备工作 下载了maven 可以官网下载 也可以通过其他途径获取 没安装之前 在命令行输入mvn v是这样的 解决方案 1 此电脑 属性 高级 环境变量 系统变量 2 新建变量 变量名 MAVEN HOME 值 本地maven的文件夹路径
  • 如何在Geany中添加python的中文注释

    在Geany中编译Python中直接添加中文注释会出现如下错误 只需要在程序的开始位置添加一句 coding utf 8
  • 全网最全Python兼职接单方式,赶快收藏!

    前言 近年来 Python凭借其简洁易入门的特点受到越来越多人群的青睐 当然这不仅仅是针对程序员来说 对于一些学生 职场人士也是如此 Python为什么会大受欢迎 1 Python还被大家称为 胶水语言 它适用于网站 桌面应用开发 自动化脚
  • Unity_DoTween_Path路径动画的使用

    using System Collections using System Collections Generic using System Linq using DG Tweening using UnityEngine public c
  • ResNet学习笔记

    目录 1 背景 2 BN Batch Normalization 层 3 residual结构 残差结构 1 背景 在 ResNet 之前 所有的神经网络都是通过卷积层和池化层的叠加组成的 人们认为卷积层和池化层的层数越多 获取到的图片特征
  • oracle 启动时出现ORA-01157: cannot identify/lock data和ORA-01110: data file 错误

    SQL gt shutdown ORA 01109 database not open Database dismounted ORACLE instance shut down SQL gt startup ORACLE instance
  • 微隔离(MSG)

    微隔离 MSG 参考文章 用 微隔离 实现零信任 什么是微隔离 当下哪家微隔离最靠谱 参考视频 不仅是防火墙 用微隔离实现零信任 定义 微隔离 Micro Segmentation 微隔离是一种网络安全技术 其核心的能力要求是聚焦在东西向流
  • NER标注----使用BILSTM模型训练招投标实体标注模型

    NER标注 BILSTM模型训练招投标实体标注模型 TOC NER标注 BILSTM模型训练招投标实体标注模型 前言 一 NER标注简介 二 从头开始训练一个NER标注器 二 使用步骤 1 引入库 2 数据处理 3 模型训练 前言 上文中讲
  • Python3 迭代器与生成器

    迭代器 迭代是Python最强大的功能之一 是访问集合元素的一种方式 迭代器是一个可以记住遍历的位置的对象 迭代器对象从集合的第一个元素开始访问 直到所有的元素被访问完结束 迭代器只能往前不会后退 迭代器有两个基本的方法 iter 和 ne
  • android 前后台保活 实现定位数据定时上传并展示轨迹 (下)

    上一篇地址 https blog csdn net qq 40803752 article details 86304508 上2篇写完了 保活 这一篇写进入业务逻辑 大概5分钟定一次位置 上传到服务器 并且展示 定位的话 我这里使用的百度
  • Qt5.9.0下载与安装(windows版本)

    1 下载 Qt5 9 0开源版本官网下载 选择图中2 3GB的安装包 即可进行下载 2 安装 双击安装包 弹出qt5 9 0的安装界面 点击下一步 这里的账户如果没有 可以不填 直接点Next 点击下一步 选择安装目录 勾选下面的勾选框 点
  • linux移除ntp,[笔记]Linux NTP命令

    推荐阅读 etc ntp conf 文件是ESX Linux NTP的主要配置文件 启动 停止 重启NTP 用下面的命令 root bigboy tmp service ntpd start root bigboy tmp service
  • 爬虫笔记2--爬取2345网站历史天气

    爬虫笔记2 爬取2345网站历史天气 最近需要获取某些地区的历史气象信息 墨迹天气无法获取历史数据 就在网上看了下 发现2345网站有相对完善的历史气象信息 就爬了下来并保存到MySql数据中 1 功能 本代码主要功能为 爬取2345天气历
  • vue中store模块化

    在进行书写store时 我们会分模块来管理我们的各个部分 我们会创建如下图目录 注意 每个模块中namespaced true是不可或缺的 export default namespaced true state mutations act
  • 【pytorch】迁移学习

    在很多场合中 没有必要从头开始训练整个卷积网络 随机初始化参数 因为没有足够丰富的数据集 而且训练也是非常耗时 耗资源的过程 通常 采用pretrain a ConvNet的方式 然后用ConvNet作为初始化或特征提取器 有两种迁移学习
  • ValueError: check_hostname requires server_hostname

    在pip install过程中出现该问题 具体错误如下 ERROR Exception Traceback most recent call last File C Users kevin AppData Roaming Python Py
  • ipsec.conf

    Name ipsec conf IPsec configuration and connections Description The optional ipsec conf file specifies most configuratio
  • 【Flink入门】Flink自定义Source读取MySQL数据

    在前一篇博客中 Flink入门 Flink读取Kafka数据Demo 已经简单介绍了Flink读取Kafka数据并通过Demo进行实践 这篇博客简单介绍Flink通过自定义Source读取MySQL数据并通过Demo进行演练 首先我们来简单