FlinkCDC第四部分-同步mysql到mysql,ctrl就完事~(flink版本1.17.1)

2023-11-06

 本文介绍了不同源单表-单表同步,不同源多表-单表同步。

注:此版本支持火焰图

Flink版本:1.17.1

环境:Linux CentOS 7.0、jdk1.8

基础文件:

flink-1.17.1-bin-scala_2.12.tgz

flink-connector-jdbc-3.0.0-1.16.jar、(maven仓库目录:corg.apache.flink/flink-connector-jdbc/3.0.0-1.16)

flink-sql-connector-mysql-cdc-2.3.0.jar、(maven仓库目录:com.ververica/flink-sql-connector-mysql-cdc/2.3.0)
安装Flink步骤详见文章第二篇

支持的mysql版本: 

一、 数据源ip为***.51的源表,同步数据到数据源ip为***.50的目标表中,需要以下几个步骤:

1. 启动flink服务:

[root@localhost bin]#  ./start-cluster.sh

2. 停止flink服务:

[root@localhost bin]#  ./stop-cluster.sh

3. 启动FinkSQL:

[root@localhost bin]# ./sql-client.sh

4. 编写FlinkSql,创建临时表和job:

FlinkSql与mysql字段的类型映射

 把写好的Sql粘贴到FlinkSql客户端命令行中,分号'  ;  '是语句结束标识符,按回车创建:

 创建来源表结构:

来源表链接类型为'connector' = 'mysql-cdc'

Flink SQL> CREATE TABLE source_alarminfo51 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp,
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = '***',
>     'port' = '3306',
>     'username' = '***',
>     'password' = '***',
>     'database-name' = 'alarm',
>     'server-time-zone' = 'Asia/Shanghai',
>     'table-name' = 'alarminfo'
>  );

[INFO] Execute statement succeed.

 创建目标表结构(目标表结构可比来源表字段多,可使用视图指定字段默认值):

目标表链接类型为'connector' = 'jdbc',注意url需要跟后面以下属性值

Flink SQL> CREATE TABLE target_alarminfo50 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
>     'username' = '***',
>     'password' = '****',
>     'table-name' = 'alarminfo',
>     'driver' = 'com.mysql.cj.jdbc.Driver'
>  );

[INFO] Execute statement succeed.

 最后创建同步关系:

INSERT INTO target_alarminfo50 SELECT * FROM source_alarminfo51;

如下:

创建完表结构可使用下列语句查看和删除:

查看表:show tables;

删除表:drop table if exists  target_alarminfo; 

flink-UI页面效果:

打开火焰图:

编辑flink-conf.yaml:最后面添加 

rest.flamegraph.enabled: true

配置后重启flink服务,重新创建任务。

火焰图效果:

数据同步效果:

源表:

目标表数据:首次数据全量,后面数据变更增量 

 注:

在分析火焰图时,可以关注以下几点:
函数的执行时间:纵向的轴显示了函数的嵌套层级,越往下表示越深层的函数调用。横向轴表示时间,通过不同颜色的方块来表示函数的执行时间。
热点函数:寻找占据执行时间大部分的函数,这些函数可能是需要优化的关键点。
函数之间的关系:观察函数之间的调用关系,查看是否有不必要的函数调用或循环。
I/O 操作:关注是否有大量的数据读取、写入或网络通信,这可能是性能瓶颈的来源。
根据火焰图的分析结果,您可以进一步定位和排查潜在的性能问题,并在代码、配置或资源分配方面进行优化。
请注意,为了准确分析火焰图,建议在负载较高的情况下生成火焰图,并保持足够的监视时间。此外,Flink 的火焰图功能在生产环境中可能会造成一定的开销,因此建议在测试或开发环境中使用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

FlinkCDC第四部分-同步mysql到mysql,ctrl就完事~(flink版本1.17.1) 的相关文章

  • POINT 列上的 MySQL INSERT/UPDATE

    我正在尝试用我国家的地理位置填充我的数据库 我的一张表有 4 个字段 ID PK 纬度 经度和地理点 EDIT SCDBs Punto Geografico SET lat 18 469692 SET lon 63 93212 SET g
  • 仅当值发生更改时如何插入数据库?

    我需要更新 替换 MySQL 数据库中的字段 但前提是它们已更改 该表包含 ID 文本字段和更改日期 用户根据更改日期通过 ID 查询数据 即 如果该日期早于用户上次查询数据的时间 则他不想要它 仅当文本字段与具有相同 ID 的现有文本字段
  • 从另一台计算机访问 MYSQL

    我想开发一个java桌面应用程序 我想在其中设置服务器 这意味着我在这里使用mysql db 该数据库将仅存储在一台电脑上 其余所有用户都可以访问该数据库 所以 我听说了mysql远程连接 其中尝试了一些事情 这些措施如下 我的电脑已连接w
  • 如何在 MySQL 中求和时间?

    正如您在图片中看到的 我有一份停机报告 显示了所选工厂在选定日期的停机时间 现在我想添加所有的值 Time Duration 列并将其显示在附近的单独显示中 TOTAL TIME DURATION 例如 在图像中 所选日期为 2015 年
  • MySQL 和 Hibernate 之间的主键自增由谁负责?

    MySQL CREATE TABLE role id role INT 11 unsigned NOT NULL AUTO INCREMENT PRIMARY KEY id role AUTO INCREMENT 1 休眠 Entity p
  • 合并两个 MYSQL SELECT 查询[重复]

    这个问题在这里已经有答案了 可能的重复 如何将两个 Post Category 表 MYSQL SELECT 查询合并为一个 https stackoverflow com questions 12972130 how to combine
  • Google Cloud SQL 在重新启动时卡住

    我的云 sql 实例长时间处于重新启动状态 在操作窗格中 重新启动的状态显示为待处理 并且还发生了导出 其状态仍为Running 有没有办法可以强制重新启动或取消重新启动或从常规备份中恢复数据 不 没有办法 如果您向 Google 支付高级
  • mysql转储到derby

    我正在使用 derby 在 eclipse 中进行开发 是否可以从 MySQL 转储表并以某种方式将其用于 derby 我知道 ddl 和 dml 对于两个 dbms 来说是不同的 但我正在寻找一种除了转储 导出之外的合适方法 我可以找到两
  • libmysqlclient.a 和 libmysqlclient_r.a 有什么区别?

    我应该使用哪个来链接 mysqlclient 库 它们之间有什么区别 我似乎找不到答案 谢谢 较新版本的 MySQL 客户端发行版不包含 r 版本 有些可能有从 libmyqslclient r a 到 libmyqslclient a 的
  • 无法在 Mac 上启动 MySQL

    使用 Brew 安装后 我无法运行 MySQL 我使用的是 OS X El Capitan 版本 10 11 3 和 MySQL Server 版本 5 7 11 当我启动服务器时 我收到 启动 MySQL 错误 服务器退出而不更新 PID
  • 错误代码:1305。函数或过程不存在

    因此 我在 MySQL 中创建一个函数 然后尝试向用户授予使用该函数的权限 但我无法这样做 这就是我正在做的 DELIMITER USE rxhelp36 scbn DROP FUNCTION IF EXISTS businessDayDi
  • MySQL 错误 1172 - 结果包含多行

    在存储过程中运行查询时 我从 MySQL 收到此错误 错误代码 1172 结果包含多行 我理解错误 我正在做一个SELECT INTO var list 因此查询需要返回单行 当我使用LIMIT 1 or SELECT DISTINCT 错
  • MySQL 排序顺序 - 排序规则?

    我在对 MySQL 中的 char 字段进行排序时遇到困难 问题是重音字符与非重音字符混淆 例如 Abc bd Acc 我认为这可能与整理有关 所以我将表格的排序规则更改为utf8 ut8 bin 看完之后这个帖子 https stacko
  • mysql 中的二进制、十六进制和八进制值

    我对在 mysql 数据库中使用二进制 十六进制和八进制系统非常感兴趣 首先 请给我一个建议 为什么我们在存储信息时需要它们 因为信息太多 或者为什么 另外 哪种类型的值必须存储在标记系统中 另外这里还有像 这是例子 gt SELECT 5
  • 如何从批量数据中的mysql列中删除所有非数字字符

    我想从列中删除所有非数字字符 我的数据库中有大量数据 目前我正在使用以下链接中描述的方法 http venerableagents wordpress com 2011 01 29 mysql numeric functions http
  • 如何在MYSQL中将整个字符串小写并保持第一个大写[重复]

    这个问题在这里已经有答案了 我的表栏目 我预期的输出会在列中发生变化 Smith Allen Doyle Dennis Baker Waker 这是我尝试过的 但不起作用 UPDATE TABLE employee SET last nam
  • PHP MySQL 使用选项/选择 HTML 表单标签进行多重搜索查询

    我正在尝试使用两个搜索字段设置基本的 MySQL LIKE 搜索 我不想拥有它 所以它有多个可选搜索字段 例如if isset POST city isset POST name 我不知道如何用 HTML 来做到这一点
  • SQL查询查找表的主键?

    我怎样才能找到哪一列首要的关键使用查询来创建表 这是重复的question https stackoverflow com questions 893874 mysql determine tables primary key dynami
  • MySQL 追加字符串

    How can I append a string to the end of an existing table value Let s say I have the table below And let s say that Mari
  • 通过触发器应用表的列权限

    现在 我有一个名为 Members 的表 其中包含内容 分为联系人数据 银行数据 现在 管理员应该能够创建 更新 删除用户 这些用户保存在另一个表中 该表只能访问管理员 用户应该获得自己的 mysql 用户帐户 管理员还应该能够设置权限 例

随机推荐