基于Canal与Flink实现数据实时增量同步(一)

2023-11-17

点击上方蓝色字体,关注我

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

准备

配置MySQL的binlog

常见的binlog命令

# 是否启用binlog日志
show variables like 'log_bin';
# 查看binlog类型
show global variables like 'binlog_format';
# 查看详细的日志配置信息
show global variables like '%log%';
# mysql数据存储目录
show variables like '%dir%';
# 查看binlog的目录
show global variables like "%log_bin%";
# 查看当前服务器使用的biglog文件及大小
show binary logs;
# 查看最新一个binlog日志文件名称和Position
show master status;

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

部署canal

安装canal

  • 下载:https://github.com/alibaba/canal/releases

  • 解压缩

[kms@kms-1 softwares]$ tar -xzvf canal.deployer-1.1.4.tar.gz  -C /opt/modules/canal/
  • 目录结构

drwxr-xr-x 2 root root 4096 Mar  5 14:19 bin
drwxr-xr-x 5 root root 4096 Mar  5 13:54 conf
drwxr-xr-x 2 root root 4096 Mar  5 13:04 lib
drwxrwxrwx 4 root root 4096 Mar  5 14:19 logs

配置修改

  • 修改conf/example/instance.properties,修改内容如下:

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = kms-1.apache.com:3306
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
# mq config,kafka topic名称
canal.mq.topic=test
  • 修改conf/canal.properties,修改内容如下:

# 配置zookeeper地址
canal.zkServers =kms-2:2181,kms-3:2181,kms-4:2181
# 可选项: tcp(默认), kafka, RocketMQ,
canal.serverMode = kafka
# 配置kafka地址
canal.mq.servers = kms-2:9092,kms-3:9092,kms-4:9092

启动canal

sh bin/startup.sh

关闭canal

sh bin/stop.sh

部署Canal Admin(可选)

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

要求

canal-admin的限定依赖:

  • MySQL,用于存储配置和节点等相关数据

  • canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

安装canal-admin

  • 下载

    https://github.com/alibaba/canal/releases

  • 解压缩

[kms@kms-1 softwares]$ tar -xzvf canal.admin-1.1.4.tar.gz  -C /opt/modules/canal-admin/
  • 目录结构

drwxrwxr-x 2 kms kms 4096 Mar  6 11:25 bin
drwxrwxr-x 3 kms kms 4096 Mar  6 11:25 conf
drwxrwxr-x 2 kms kms 4096 Mar  6 11:25 lib
drwxrwxr-x 2 kms kms 4096 Sep  2  2019 logs
  • 配置修改

vi conf/application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: kms-1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin
  • 初始化原数据库

mysql -uroot -p
# 导入初始化SQL
#注:(1)初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化
#    (2)canal_manager.sql默认会在conf目录下
> mysql> source /opt/modules/canal-admin/conf/canal_manager.sql
  • 启动canal-admin

sh bin/startup.sh
  • 访问

可以通过 http://kms-1:8089/ 访问,默认密码:admin/123456

  • canal-server端配置

使用canal_local.properties的配置覆盖canal.properties,将下面配置内容配置在canal_local.properties文件里面,就可以了。

# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
  • 启动canal-serve

sh bin/startup.sh  local

注意:先启canal-server,然后再启动canal-admin,之后登陆canal-admin就可以添加serve和instance了。

启动kafka控制台消费者测试

bin/kafka-console-consumer.sh --bootstrap-server kms-2:9092,kms-3:9092,kms-4:9092  --topic test --from-beginning

此时MySQL数据表若有变化,会将row类型的log写进Kakfa,具体格式为JSON:

  • insert操作

{
    "data":[
        {
            "id":"338",
            "city":"成都",
            "province":"四川省"
        }
    ],
    "database":"qfbap_ods",
    "es":1583394964000,
    "id":2,
    "isDdl":false,
    "mysqlType":{
        "id":"int(11)",
        "city":"varchar(256)",
        "province":"varchar(256)"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "city":12,
        "province":12
    },
    "table":"code_city",
    "ts":1583394964361,
    "type":"INSERT"
}
  • update操作

{
    "data":[
        {
            "id":"338",
            "city":"绵阳市",
            "province":"四川省"
        }
    ],
    "database":"qfbap_ods",
    "es":1583395177000,
    "id":3,
    "isDdl":false,
    "mysqlType":{
        "id":"int(11)",
        "city":"varchar(256)",
        "province":"varchar(256)"
    },
    "old":[
        {
            "city":"成都"
        }
    ],
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "city":12,
        "province":12
    },
    "table":"code_city",
    "ts":1583395177408,
    "type":"UPDATE"
    }
  • delete操作

{
    "data":[
        {
            "id":"338",
            "city":"绵阳市",
            "province":"四川省"
        }
    ],
    "database":"qfbap_ods",
    "es":1583395333000,
    "id":4,
    "isDdl":false,
    "mysqlType":{
        "id":"int(11)",
        "city":"varchar(256)",
        "province":"varchar(256)"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "city":12,
        "province":12
    },
    "table":"code_city",
    "ts":1583395333208,
    "type":"DELETE"
}


JSON日志格式解释

  • data:最新的数据,为JSON数组,如果是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据,如果是删除,则表示被删除的数据

  • database:数据库名称

  • es:事件时间,13位的时间戳

  • id:事件操作的序列号,1,2,3...

  • isDdl:是否是DDL操作

  • mysqlType:字段类型

  • old:旧数据

  • pkNames:主键名称

  • sql:SQL语句

  • sqlType:是经过canal转换处理的,比如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal

  • table:表名

  • ts:日志时间

  • type:操作类型,比如DELETE,UPDATE,INSERT

小结

本文首先介绍了MySQL binlog日志的配置以及Canal的搭建,然后描述了通过canal数据传输到Kafka的配置,最后对canal解析之后的JSON数据进行了详细解释。本文是基于Canal与Flink实现数据实时增量同步的第一篇,在下一篇介绍如何使用Flink实现实时增量数据同步。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于Canal与Flink实现数据实时增量同步(一) 的相关文章

  • 如何将完整的日期格式拆分为日期和时间?

    我有很多格式为我的示例所示的字符串 我必须解析它们 我正在尝试确定今天是哪根弦 我的问题是 时间快到了 我只需要比较那个日期 接下来我想检查时间是否在 after 和 before 的两个时间戳 HH mm ss 之间 但存在问题 日期几乎
  • Grails 项目 - Servlet 调用 - ClassNotFoundException:javax.servlet.AsyncContext

    我在用 IntelliJ IDEA 终极版 12 4 grails 2 2 0 BuildConfig groovy 文件中的 grails servlet version 2 5 并实现了简单的 servlet post 请求 使用 RE
  • org.openqa.selenium.NoSuchSessionException:会话 ID 为空。调用 quit() 后使用 WebDriver?

    我已经进行了一些搜索 但仍然遇到同样的问题 我相信这可能是由于我的网络驱动程序是静态的造成的 我不太确定 在我的主课中 我包括了 BeforeTest and AfterTest BeforeTest包括根据我的 XML 文件启动新浏览器
  • 在 libgdx 中渲染 box2d

    我有一个使用 FitViewport 的大小为 800x480 的游戏世界 并且最初使用像素渲染 box2d 实体 固定装置 因此所有物理效果都显得浮动且缓慢 查看文档后 我意识到 box2d 使用度量单位 因此我将 box2d 位置和大小
  • SSLContext 初始化

    我正在看JSSE参考指南 我需要获取一个实例SSLContext为了创建一个SSLEngine 所以我可以使用它Netty以启用安全性 获取实例SSLContext I use SSLContext getInstance 我看到该方法被重
  • Java 8 Stream - 并行执行 - 不同的结果 - 为什么?

    假设我有一个List
  • 自 JRE 1.7.0_25 起,Batik 无法进行转换

    自从我更新到 JAVA 1 7 0 25 以来 蜡染在应用转换时会抛出异常 堆栈跟踪是 java awt image ImagingOpException Unable to transform src image at java awt
  • Java 声音可视化器

    我正在尝试制作一个java声音可视化工具 但我完全不知道如何在实时处理音频后立即从提取的音频中获取字节 我可以将程序与 wav 文件同步 但这不是我想要做的 我想用程序生成声音 然后播放它 而不将其保存在任何地方 谢谢您的帮助 本文可以帮助
  • 为什么replaceAll在这行代码中不起作用? [复制]

    这个问题在这里已经有答案了 String weatherLocation weatherLoc 1 toString weatherLocation replaceAll how weatherLocation replaceAll wea
  • SQlite 获取最近的位置(带有纬度和经度)

    我的 SQLite 数据库中存储有纬度和经度的数据 我想获取距我输入的参数最近的位置 例如我当前的位置 纬度 经度等 我知道这在 MySQL 中是可能的 并且我已经做了相当多的研究 SQLite 需要一个自定义外部函数来实现半正弦公式 计算
  • JavaFX Integer Spinner (IntegerSpinnerValueFactory) 不会将值回绕到最小值

    我创建了一个带有值的整数微调器 min 5 max 15 and initialValue 12 and wrapAround true 一旦旋转器到达max 15 增量期间的值 而不是将值重置为min 5 正如它所说文档 https op
  • 带等待/通知的同步块与不带等待/通知的同步块之间的区别?

    如果我只是使用synchronized 不是wait notify方法 它仍然是线程安全的吗 有什么不同 Using synchronized使方法 块一次只能由一个线程访问 所以 是的 它是线程安全的 这两个概念是结合在一起的 而不是相互
  • 短 2 个字节

    我正在从串行端口读取一个长度为 133 字节的数据包 最后 2 个字节包含 CRC 值 我使用 Java 将 2 个字节值制成单个 我认为很短 这就是我所做的 short high 48 0x00ff short low 80 short
  • Python:如何使用生成器来避免 sql 内存问题

    我有以下方法来访问 mysql 数据库 并且查询在服务器中执行 我无权更改有关增加内存的任何内容 我对生成器很陌生 并开始阅读更多有关它的内容 并认为我可以将其转换为使用生成器 def getUNames self globalUserQu
  • 按钮悬停和按下效果 CSS Javafx

    我是 CSS 新手 为按钮定义了以下 CSS 样式 其中id并且应用了自定义样式 但不应用悬停和按下效果 bevel grey fx background color linear gradient f2f2f2 d6d6d6 linear
  • 如何在 Hibernate 中自动递增复合主键中的 Id?

    我有一个带有复合主键的表 groupId and batchId 实体类看起来像 Entity name EMPLOYEE public class Employee EmbeddedId private EmployeePK employ
  • java mysql 准备好的语句

    我正在尝试使用 java 向数据库中进行简单的插入 它告诉我我的 sql 语法已关闭 但是 当我复制打印出来的字符串并将其放入 phpmyadmin 中的 sql 命令中时 它会正确执行该命令 并且我似乎无法弄清楚 java 中的字符串查询
  • 空检查时可能未初始化错误

    我正在检查变量是否已初始化 但此时 netbeans 给了我variable reader might not have been initialized警告 我该如何解决 抑制这个问题 这是我的代码 摘要 final Reader rea
  • Volley 在第一次调用方法时返回 null

    我正在尝试使用 volley 从服务器检索数据 但是当我第一次调用此方法时 我收到服务器的响应 但该方法返回 null 如果我第二次调用它 我会得到最后的响应 public String retrieveDataFromServer Str
  • 将 SQL 数据中的一行映射到 Java 对象

    我有一个 Java 类 其实例字段 以及匹配的 setter 方法 与 SQL 数据库表的列名相匹配 我想优雅地从表中获取一行 到 ResultSet 中 并将其映射到此类的实例 例如 我有一个 Student 类 其中包含实例字段 FNA

随机推荐

  • IP首部报文字段

    一 IP首部报文字段 字段如下图所示 二 每个字段的含义 版本 表示 IP 协议的版本 通信双方使用的 IP 协议版本必须一致 目前广泛使用的IP协议版本号为 4 即 IPv4 首部长度 这个字段所表示数的单位是 32 位字长 1 个 32
  • postgreSQL中无法更改数据的问题

    增对这个bug 参考博客 解决Navicat修改Mysql数据后刷新恢复原样的问题 无法提交事务 Studiouss的博客 CSDN博客 发现我的问题是解决了 因为我确实没有设置主键 或者是设置主键没有保存造成的 这样就解决了 点击刷新 表
  • 自动化测试面试题及答案大全(2)

    问题1 Selenium是什么 流行的版本有哪些 是一个开源的web自动化测试的框架 支持多种编程语言 支持跨浏览器平台进行测试 Selenium 1 0或Selenium RC Selenium 2 0或Selenium Webdrive
  • VisualStudio神级插件——JetBrains Resharper 2018.2.3 Ultimate完美破解版教程

    VisualStudio神级插件 JetBrains Resharper 2018 2 3 Ultimate完美破解版 教程 ReSharper是一个JetBrains公司出品的著名的代码生成工具 是Visual Studio里面的一个插件
  • 中职本科计算机大学课程设置,中职学校计算机专业课程设置问题与对策研究——以湖南省五所中职学校为例...

    摘要 随着我国市场经济的发展 产业结构和劳动力结构不断调整 因此对劳动者的素质和结构都提出了新的要求 形成了对技能型人才需求的调整增长态势 技能型人才的紧缺 结构性失业问题已成为制约我国经济增长的瓶颈 中职教育作为目前职业教育的主体 它承担
  • 文件服务器 安全,文件服务器 安全

    文件服务器 安全 内容精选 换一换 云堡垒机支持批量导出资源信息 用于本地备份资源配置 以及便于快速管理资源基本信息 为加强资源信息安全管理 支持加密导出资源信息 导出的主机资源文件中包含主机基本信息 主机下所有资源账户信息 主机资源账户明
  • linuxmake没有指明目标并且找不到makefile_Makefile笔记

    一般来说 无论是C C 还是pas 首先要把源文件编译成中间代码文件 在Windows下也就是 obj 文件 UNIX下是 o 文件 即 Object File 这个动作叫做编译 compile 然后再把大量的Object File合成执行
  • ssh 连接报错:Unable to negotiate with 192.168.xx.xx port 22: no matching key exchange method found.

    用 ssh 连接 Linux 服务器时 很偶然的情况下出现了如下报错 Unable to negotiate with xx xx xx xx port 22 no matching key exchange method found Th
  • LeetCode题目笔记--12.整数转罗马数字

    题目描述 题目跟前面13题描述一样 就是问题变为整数转成罗马数字 思路 上一道题罗马数字转整数比较简单 因为不存在罗马数字表示冲突的问题 即不存在一个罗马数字对应多个整数 而这个问题中 就要考虑一下这个问题了 因为如果不加以约束的话 一个整
  • 【设计模式】用Java实现状态模式

    一 状态模式介绍与使用场景 状态模式是一种行为设计模式 它允许对象在内部状态发生改变时改变其行为 该模式将对象的行为包装在不同的状态类中 使得对象的行为可以根据其当前状态动态改变 状态模式通常由以下几个角色组成 环境类 Context 环境
  • c++中的时间处理(1)localtime、localtime_r和localtime_s

    c 中对时间的处理有好几个函数 很多C 程序员可能用过 但不一定完全搞得清楚 这里 我先讲解下 localtime localtime r和localtime s的使用 1 localtime localtime用来获取系统时间 精度为秒
  • Python 3.4安装pandas库时遇到的问题:no matching distribution found for numpy==1.9.3

    Window XP 其实已经安装了numpy10 0 1 但在cmd中pip install pandas时提示 no matching distribution found for numpy 1 9 3 然后卸载了之前的numpy 又使
  • selenium3和selenium4的区别

    1 初始化浏览器对象 在初始化driver对象的时候 selenium4多了一个Service类 用来管理驱动程序的启动 停止 service Service r D python39 chromedriver exe driver web
  • 手写字符识别

    一 手写字符识别原理 以下来源网上 手写数字识别 可以采用图像识别的方法 左边的x是手写之后的图像 右边的y是对应的数字 对于图像信息 计算机是用数值来进行表示的 机器学习让计算机具备智能 实际上是训练出数值模型w对于新的输入x 可以通过与
  • STL容器总结

    1 Vector 本质是动态数组 拥有一段连续的内存空间 并且起始地址不变 能非常好的支持随机存取 即 操作符 但由于它的内存空间是连续的 所以在中间进行插入和删除会造成内存块的拷贝 如果空间不够 则另外分配新的两倍大小的空间 然后把旧空间
  • mysql不等于的写法_mysql 不等于 符号写法

    经过测试发现mysql中用 lt gt 与 都是可以的 但sqlserver中不识别 所以建议用 lt gt selece from jb51 where id lt gt 45 sql 里 符号 lt gt 于 的区别 lt gt 与 都
  • 打印机驱动安装教程

    工作中 尤其是从事半文秘工作的人 不是全文秘 没有安装打印机驱动经验 这里就来说说如何安装佳能打印机驱动 准备安装资料 佳能打印驱动 安装步骤 1 我这里是压缩文件 解压后 点击Setup exe开始安装 2 要同意才能下一步安装 3 根据
  • jenkins运行Linux后台命令

    这里是指广义上的后台 不管是shell命令nohub或者其他 只要是需要常驻linux后台的命令或者程序 如果通过Jenkins启动 当任务结束时Jenkins都会清理掉此次任务中的所有相关进程 现象就是明明运行成功了但是实际找不到进程 解
  • 安装CPU版本的pytorch和torchvision(Win10)

    前言 在使用以下方法之前 我是用了网上说的搭建清华镜像进行下载 虽然pytorch下载成功了 但是在下载torchvision的时候就一直成功不了 在网络的大千世界中 我终于悟到先本地下载再安装的方法 第一步 找好对应的版本 第二步 下载本
  • 基于Canal与Flink实现数据实时增量同步(一)

    点击上方蓝色字体 关注我 canal是阿里巴巴旗下的一款开源项目 纯Java开发 基于数据库增量日志解析 提供增量数据订阅 消费 目前主要支持了MySQL 也支持mariaDB 准备 配置MySQL的binlog 常见的binlog命令 是