Canal入门(二)

2023-05-16

Canal入门(二)

canal kafka quickStart

1、基本说明

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:kafka,RocketMQ

2、环境版本要求

  • 操作系统:CentOS release 6.6 (Final)
  • java版本: jdk1.8
  • canal 版本: 请下载最新的安装包,本文以当前v1.1.1 的canal.deployer-1.1.1.tar.gz为例
  • MySQL版本 :5.7.18
  • 注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口

3、安装canal.server

进行这一步之前你应该已经装好了你的zookeeper和kafka
zookeeper安装配置:https://blog.csdn.net/h_big_tiger/article/details/100018921
kafka安装:https://blog.csdn.net/h_big_tiger/article/details/100018956

3.1 下载压缩包

到官网地址(release)下载最新压缩包,此处我下载的是1.1.1.版本。请下载 canal.deployer-latest.tar.gz

3.2 将canal.deployer 复制到固定目录并解压
mkdir -p /usr/local/canal
cp canal.deployer-1.1.1.tar.gz   /usr/local/canal
tar -zxvf canal.deployer-1.1.1.tar.gz 
3.3 配置修改参数
  1. 修改instance 配置文件

    vi conf/example/instance.properties
    
    #  按需修改成自己的数据库信息
    #################################################
    ...
    canal.instance.master.address=ip:port
    # username/password,数据库的用户名和密码
    ...
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    ...
    # mq config
    canal.mq.topic=example
    # 针对库名或者表名发送动态topic
    #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    #库名.表名: 唯一主键,多个表之间用逗号分隔
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    #################################################
    

    对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考我上一篇文章(链接)

  2. 修改canal 配置文件

    vi /usr/local/canal/conf/canal.properties
    
    # ...
    # 可选项: tcp(默认), kafka, RocketMQ
    canal.serverMode = kafka
    # ...
    # kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
    canal.mq.servers = 127.0.0.1:6667
    canal.mq.retries = 0
    # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
    canal.mq.batchSize = 16384
    canal.mq.maxRequestSize = 1048576
    # flatMessage模式下请将该值改大, 建议50-200
    canal.mq.lingerMs = 1
    canal.mq.bufferMemory = 33554432
    # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
    canal.mq.canalBatchSize = 50
    # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
    canal.mq.canalGetTimeout = 100
    # 是否为flat json格式对象
    canal.mq.flatMessage = true
    canal.mq.compressionType = none
    canal.mq.acks = all
    # kafka消息投递是否使用事务
    canal.mq.transaction = false
    

    关于各项配置意义请看最下面

3.4 启动
cd /usr/local/canal/
sh bin/startup.sh
3.5 查看日志
  • 查看 logs/canal/canal.log

    vi logs/canal/canal.log
    
  • 查看instance的日志

    vi logs/example/example.log
    
3.6 关闭
cd /usr/local/canal/
sh bin/stop.sh
3.7 MQ数据消费

canal.client下有对应的MQ数据消费的样例工程,包含数据编解码的功能

关于kafka模式的可看(链接)

注释:

mq相关参数说明(与kafka对应)

参数名参数说明默认值
canal.mq.serversbootstrap.servers127.0.0.1:6667
canal.mq.retries发送失败重试次数0
canal.mq.batchSizeProducerConfig.BATCH_SIZE_CONFIG16384
canal.mq.maxRequestSizeProducerConfig.MAX_REQUEST_SIZE_CONFIG1048576
canal.mq.lingerMsProducerConfig.LINGER_MS_CONFIG,如果是flatMessage格式建议将该值调大, 如: 2001
canal.mq.bufferMemoryProducerConfig.BUFFER_MEMORY_CONFIG33554432
canal.mq.canalBatchSize获取canal数据的批次大小50
canal.mq.canalGetTimeout获取canal数据的超时时间100
canal.mq.flatMessage是否为json格式
如果设置为false,对应MQ收到的消息为protobuf格式
需要通过CanalMessageDeserializer进行解码true
canal.mq.transaction消息投递是否使用事务,flatMessage模式下建议开启false
canal.mq.topicmq里的topic名
canal.mq.dynamicTopicmq里的动态topic规则, 1.1.3版本支持
canal.mq.partition单队列模式的分区下标1
canal.mq.partitionsNum散列模式的分区数
canal.mq.partitionHash散列规则定义
库名.表名 : 唯一主键,比如mytest.person: id
1.1.3版本支持新语法,见下文

canal.mq.dynamicTopic 表达式说明

设置匹配规则,建议MQ开启自动创建topic的能力

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

  • 例子1:test\.test 指定匹配的单表,发送到以test_test为名字的topic上
  • 例子2:.\… 匹配所有表,则每个表都会发送到各自表名的topic上
  • 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
  • 例子4:test\.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
  • 例子5:test,test1\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值

为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

  • 例子1: test:test\.test 指定匹配的单表,发送到以test为名字的topic上
  • 例子2: test:.\… 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
  • 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
  • 例子4:testA:test\.* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
  • 例子5:test0:test,test1:test1\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值

canal.mq.partitionHash 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

  • 例子1:test\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
  • 例子2:.\…:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
  • 例子3:.\…: p k pk pk 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
  • 例子4: 匹配规则啥都不写,则默认发到0这个partition上
  • 例子5:.\… ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
    • 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
  • 例子6: test\.test:id,.\…* , 针对test的表按照id散列,其余的表按照table散列

设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

mq顺序性问题

binlog本身是有序的

  1. canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
  2. canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
    • canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
    • canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
  3. canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
    • 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
    • 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
    • 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Canal入门(二) 的相关文章

  • 使用canal连接kafka

    这篇主要是项目还原 xff0c 目的是记录构建时遇到的各种奇葩坑 xff0c 避免下次迷路 废话不多说 xff0c 直接上手 默认已安装docker xff0c docker compose xff0c nodejs xff0c yarn
  • canal 修改配置信息后监听不到mysql数据并报错can‘t find start position for example

    原由 xff1a 数据库地址变化 canal 需要修改监听 问题 xff1a 修改配置信息后重启canal 但并无监听到数据库信息变化 分析 xff1a canal 与数据库之间断层 xff0c 导致信息传输失败 解决 xff1a xff0
  • 数据同步之初识Canal

    git地址 xff1a 阿里巴巴Canal的Git地址 Canal基于日志增量订阅和消费的业务包括 xff1a 数据库镜像 数据库实时备份索引构建和实时维护 拆分异构索引 倒排索引 业务cache刷新 带业务逻辑的增量数据处理 Mysql
  • Java Canal binlog 日志监控

    参考地址 超详细的Canal入门 xff0c 看这篇就够了 xff01 java技术爱好者 R的博客 CSDN博客 canal 有需要的参考博客 xff01 xff01 xff01 xff01 xff01 xff01
  • 谷粒学院(二十)Canal数据同步工具

    一 Canal介绍 1 应用场景 在前面的统计分析功能中 xff0c 我们采取了服务调用获取统计数据 xff0c 这样耦合度高 xff0c 效率相对较低 xff0c 目前我采取另一种实现方式 xff0c 通过实时同步数据库表的方式实现 xf
  • Canal监控MySQL数据到Kafka详细步骤(jdk+zookeeper+kafka+canal+mysql)

    目录 一 前言二 环境准备三 安装JDK四 安装zookeeper五 安装Kafka六 安装MySQL七 安装canal服务端 xff08 canal监控mysql数据发送到kafka xff09 八 测试是否可以监控到数据九 结语 一 前
  • [搬运]Ali Canal Prometheus QuickStart

    Prometheus QuickStart lcybo edited this page on 29 Aug 2018 3 revisions Pages 38 Contents 目录 HomeIntroduction 简介Quick St
  • canal 修改配置信息后监听不到mysql数据并报错can‘t find start position for example

    原由 xff1a 数据库地址变化 canal 需要修改监听 问题 xff1a 修改配置信息后重启canal 但并无监听到数据库信息变化 分析 xff1a canal 与数据库之间断层 xff0c 导致信息传输失败 解决 xff1a xff0
  • Canal数据库监听

    1 什么是canal canal是用java开发的基于数据库增量日志解析 xff0c 提供增量数据订阅 amp 消费的中间件 目前 xff0c canal主要支持了MySQL的binlog解析 xff0c 解析完成后才利用canal cli
  • canal

    目录 搭建canal测试canal 监控MySQL的binlog的工具 搭建canal 1 开启mysql binlog cp usr share mysql my medium cnf etc my cnf 修改my cnf vim et
  • 使用canal配合rocketmq监听mysql的binlog日志

    目录 一 安装配置canal 1 1 安装canal 1 2 配置canal基本属性 1 3 配置canal的mysql 二 mysql配置 2 1 开启mysql的binlog日志 2 2 配置 canal 专用用户 2 3 启动cana
  • canal文档

    简介 github地址 canal k n l xff0c 译意为水道 管道 沟渠 xff0c 主要用途是基于 MySQL 数据库增量日志解析 xff0c 提供增量数据订阅和消费 canal 工作原理 canal 模拟 MySQL slav
  • Elasticsearch7.9集群部署,head插件,canal同步mysql数据到es,亲自测试,无坑

    Elasticsearch集群部署 1 服务器规划 10 4 7 11 node1 10 4 7 12 node2 10 4 7 13 node3 1 集群相关 一个运行中的 Elasticsearch 实例称为一个节点 xff0c 而集群
  • prometheus+grafana监控mysql、canal服务器

    一 prometheus配置 1 prometheus安装 1 1官网下载安装包 xff1a https prometheus io download 1 2解压安装包 xff1a tar zxvf prometheus 2 6 1 lin
  • 不同业务场景下数据同步方案设计

    企业开发实践中通常需要提供数据搜索的功能 例如 电商系统中的商品搜索 订单搜索等 通常 搜索任务通常由搜索引擎担当 如Elasticsearch 而我们的原始数据为了安全性等问题通常存储在关系型数据库中 在搜索数据前 我们需要先将数据从关系
  • 搭建: canal部署与实例运行

    1 准备 github https github com alibaba canal 里面有包括canal的文档 server端 client端的 例子 源码包等等 2 canal概述 canal是应阿里巴巴存在杭州和美国的双机房部署 存在
  • 谈谈对Canal(增量数据订阅与消费)的理解

    概述 canal是阿里巴巴旗下的一款开源项目 纯Java开发 基于数据库增量日志解析 提供增量数据订阅 消费 目前主要支持了mysql 也支持mariaDB 起源 早期 阿里巴巴B2B公司因为存在杭州和美国双机房部署 存在跨机房同步的业务需
  • 深入解析中间件之-Canal

    canal 阿里巴巴mysql数据库binlog的增量订阅 消费组件 MySQL binlog MySQL主从复制 mysql服务端修改配置并重启 1 2 3 4 5 6 7 8 9 10 11 12 vi etc my cnf mysql
  • Springboot2(44)集成canal

    源码地址 springboot2教程系列 canal高可用部署安装和配置参数详解 前言 canal是阿里巴巴的基于数据库增量日志解析 提供增量数据订阅 消费 目前主要支持了mysql 可以用于比如数据库数据变化的监听从而同步缓存 如Redi
  • 使用canal同步数据,踩坑排雷全过程

    1 mysql配置 1 检查binlog功能是否有开启 mysql gt show variables like log bin Variable name Value log bin OFF 1 row in set 0 00 sec 如

随机推荐

  • c memcpy 带重叠部分 实现

    主要是要注意当目标地址在源地址后面且存在重叠区域的时候 xff0c 需要从后往前复制 span class token macro property span class token directive hash span span cla
  • 主流PCB画图软件的对比区别(AD、Pads、Allegro)

    国内的EDA软件市场几乎被三家瓜分 xff0c 分别是Altium Mentor Pads Cadence xff0c 也是我们这次主要分析和比较的软件 本人用的多的是 Alitum 也用过allegro xff0c pads目前还没用过
  • 新书推荐 |《Prometheus监控实战》

    新书推荐 Prometheus监控实战 点击上图了解及购买 Docker公司前服务与支持副总裁 Kickstarter前首席技术官 Empatico首席技术官撰写 xff0c 全方位介绍继Kubernetes之后的第二个CNCF毕业项目 P
  • 腾讯大数据总体架构图,对外公开!

    导读 xff1a 腾讯作为国内体量最大的互联网公司之一 xff0c 业务涵盖用户日常生活的方方面面 xff0c 面对如此巨大业务数据量 xff0c 如果不能对数据进行专业化处理并高效有序地存 管 用 xff0c 如果不能使数据产生应有的价值
  • API安全实战

    一提起 信息安全 xff0c 不管是业内专家还是所谓的 吃瓜群众 xff0c 多半都会在脑海中浮现 网络安全 Web安全 软件安全 数据安全 等常见的词汇 市面上绝大多数安全类书籍也多集中在这几个领域 xff0c 而从API视角阐释信息安全
  • 【第115期】世界一流大学计算机专业,都在用哪些书当教材?

    导读 xff1a 转眼间离新学期开学又不远了 清华 北大 MIT CMU 斯坦福的学霸们在新学期里要学什么 xff1f 本文就带你盘点一下那些世界名校计算机专业采用的教材 不用多说 xff0c 每本都是经典的烧脑技术书 xff0c 建议配合
  • 什么是AB实验?能解决什么问题?终于有人讲明白了

    导读 xff1a 走向身边的AB实验 作者 xff1a 木羊同学 来源 xff1a 大数据DT xff08 ID xff1a hzdashuju xff09 AB实验 是一个从统计学中借来的工具 我和大家一样 xff0c 每次只要看到 统计
  • 树莓派3b引脚图

    如上图所示 xff0c 我们可以很清楚的看到各个引脚的功能 例如我们想使用pwm引脚来控制舵机 xff0c 则我们可以考虑使用其中的 BCM18 PWM0 和 BCM13 PWM1 在使用wiringPi库时 xff0c 我们定义的引脚即B
  • 跟踪slab分配堆栈流程的方法(perf、systemtap)

    跟踪slab分配堆栈流程的方法 xff08 perf systemtap xff09 内存泄露是在解决内核故障会遇到的棘手情况 xff0c 根据具体的内存使用情况 xff0c 追踪相应slab cache的分配堆栈流程 xff0c 是追踪泄
  • prometheus+grafana监控mysql、canal服务器

    一 prometheus配置 1 prometheus安装 1 1官网下载安装包 xff1a https prometheus io download 1 2解压安装包 xff1a tar zxvf prometheus 2 6 1 lin
  • mac配置jmeter

    一 步骤 1 安装jdk1 8版本 xff0c 因为jmeter是基于java环境运行的 2 安装jmeter5 x版本 二 安装jdk 1 下载jdk Java Downloads Oracle 2 下载好之后安装 xff0c 全部下一步
  • 操作系统(四):动态链接与静态链接的区别

    在回答这个问题之前希望大家大概了解一个文件编译的过程 xff0c 比如一个C文件在编译成功后文件夹里的文件会有什么变化 xff0c 大家可以先去创建一个helloworld c的文件 xff0c 观察其编译后的变化 那么问题来了 面试官经常
  • 【OpenVINS】(一)ZUPT

    参考 xff1a Measurement Update Derivations Zero Velocity Update 在典型的自主汽车场景中 xff0c 传感器系统将在停止灯处变得静止 xff0c 其中动态物体 xff08 例如交叉路口
  • OpenVINS与MSCKF_VIO RK4积分对比

    VIO系统在使用IMU测量值进行状态预测时 xff0c 需要将连续时间的微分方程离散化为差分方程 xff0c 离散化的本质是积分 xff0c 根据数值积分近似程度不同 xff0c 常用的有欧拉法 中点法和四阶龙格库塔法等 xff0c Ope
  • 全盘拷贝linux系统,转移至另一硬盘

    首先制作ubuntu启动盘 xff0c 选择try ubuntu进入live ubuntu系统 查看需拷贝硬盘盘符 span class token function sudo span span class token function
  • EKF SLAM

    EKF 方法是解决 SLAM 问题的一种经典方法 xff0c 其应用依赖于运动模型和观测模型的高斯噪声假设 在 SLAM 问题首次提出不久后 xff0c Smith 和 Cheesman 及 Durrant Whyte对机器人和路标间的几何
  • 如何将立创EDA中的元器件的原理图/封装和3D模型导入AD的库中

    如何将立创EDA中的元器件的原理图 封装和3D模型导入AD的库中 工具 xff1a AD 立创EDA专业版 fusion360 或其他3D软件 导入原理图 封装 在立创商城复制所需元器件的编号 打开立创EDA标准版或专业版 xff0c 这里
  • Xshell 提示 “要继续使用此程序,您必须应用最新的更新或使用新版本“的解决方案

    要想解决Xshell提示更新最新版问题 有两种方案 方案一 手动修改系统时间 步骤如下 右键右下角时间 弹出如下窗口 2 选中 调整日期 时间 A 并点击 弹出如下页面 更改时间 更改成之前的年份 如下图 更改成功后 再打开相应的应用 Xs
  • 2020.2.22 排位赛 G - Bucket Brigade(BFS)

    Bucket Brigade 题面 题目分析 BFS模板题 代码 span class token macro property span class token directive keyword include span span cl
  • Canal入门(二)

    Canal入门 xff08 二 xff09 canal kafka quickStart 1 基本说明 canal 1 1 1版本之后 默认支持将canal server接收到的binlog数据直接投递到MQ 目前默认支持的MQ系统有 ka