数据同步之初识Canal

2023-05-16

git地址:阿里巴巴Canal的Git地址

Canal基于日志增量订阅和消费的业务包括:

  • 数据库镜像、数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引)
  • 业务cache刷新、带业务逻辑的增量数据处理

Mysql 的数据同步的架构图:
在这里插入图片描述
Canal是把自己伪装成一个从库:
在这里插入图片描述

Canal的优点: 实时性好、分布式、ACK机制
Canal的缺点

  • 只支持增量同步,不支持全量同步
  • MYSQL -> ES、RDB
  • 一个instance只能有一个消费端消费
  • 单点压力大

Canal的组件
在这里插入图片描述
Cancal 和 ES 的数据同步的架构: Canal可以和Kafka无缝连接
在这里插入图片描述
Cancal配置:

  1. 开启mysql得bin-log日志
	## mysql配置修改文件:
	vim /etc/my.cnf
    
    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    ## 重启服务
    ## systemctl stop mariadb  
    ## systemctl start mariadb
    mysql -uroot -proot
    show variables like '%log_bin%';
    
    ## 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
    CREATE USER root IDENTIFIED BY 'root';  
    GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'root' WITH GRANT OPTION;
    
    -- CREATE USER canal IDENTIFIED BY 'canal';  
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' WITH GRANT OPTION;
    -- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    
    FLUSH PRIVILEGES;
  1. Canal 配置
	## 创建文件夹并 解压 canal
    mkdir /usr/local/canal
    tar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal/
    
    ## 配置文件
    vim /usr/local/canal/conf/canal.properties
    ## java程序连接端口
    canal.port = 11111
    
    vim /usr/local/canal/conf/example/instance.properties
    ## 不能与已有的mysql节点server-id重复
    canal.instance.mysql.slaveId=1001
    ## mysql master的地址
    canal.instance.master.address=192.168.11.31:3306
    
    ## 修改内容如下:
    canal.instance.dbUsername=root #指定连接mysql的用户密码
    canal.instance.dbPassword=root
    canal.instance.connectionCharset = UTF-8 #字符集
    
    ## 启动canal
    cd /usr/local/canal/bin
    ./startup.sh
    
    ## 验证服务
    cat /usr/local/canal/logs/canal/canal.log
    ## 查看实例日志
    tail -f -n 100 /usr/local/canal/logs/example/example.log

Canal 与 MQ 整合
官方文档: Canal Kafka RocketMQ QuickStart

Java 操作 Canal例子

官方java例子: ClientSample.java

package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

数据同步之初识Canal 的相关文章

  • 使用canal连接kafka

    这篇主要是项目还原 xff0c 目的是记录构建时遇到的各种奇葩坑 xff0c 避免下次迷路 废话不多说 xff0c 直接上手 默认已安装docker xff0c docker compose xff0c nodejs xff0c yarn
  • canal 修改配置信息后监听不到mysql数据并报错can‘t find start position for example

    原由 xff1a 数据库地址变化 canal 需要修改监听 问题 xff1a 修改配置信息后重启canal 但并无监听到数据库信息变化 分析 xff1a canal 与数据库之间断层 xff0c 导致信息传输失败 解决 xff1a xff0
  • 数据同步之初识Canal

    git地址 xff1a 阿里巴巴Canal的Git地址 Canal基于日志增量订阅和消费的业务包括 xff1a 数据库镜像 数据库实时备份索引构建和实时维护 拆分异构索引 倒排索引 业务cache刷新 带业务逻辑的增量数据处理 Mysql
  • Java Canal binlog 日志监控

    参考地址 超详细的Canal入门 xff0c 看这篇就够了 xff01 java技术爱好者 R的博客 CSDN博客 canal 有需要的参考博客 xff01 xff01 xff01 xff01 xff01 xff01
  • 谷粒学院(二十)Canal数据同步工具

    一 Canal介绍 1 应用场景 在前面的统计分析功能中 xff0c 我们采取了服务调用获取统计数据 xff0c 这样耦合度高 xff0c 效率相对较低 xff0c 目前我采取另一种实现方式 xff0c 通过实时同步数据库表的方式实现 xf
  • canal 修改配置信息后监听不到mysql数据并报错can‘t find start position for example

    原由 xff1a 数据库地址变化 canal 需要修改监听 问题 xff1a 修改配置信息后重启canal 但并无监听到数据库信息变化 分析 xff1a canal 与数据库之间断层 xff0c 导致信息传输失败 解决 xff1a xff0
  • Canal数据库监听

    1 什么是canal canal是用java开发的基于数据库增量日志解析 xff0c 提供增量数据订阅 amp 消费的中间件 目前 xff0c canal主要支持了MySQL的binlog解析 xff0c 解析完成后才利用canal cli
  • 使用canal配合rocketmq监听mysql的binlog日志

    目录 一 安装配置canal 1 1 安装canal 1 2 配置canal基本属性 1 3 配置canal的mysql 二 mysql配置 2 1 开启mysql的binlog日志 2 2 配置 canal 专用用户 2 3 启动cana
  • canal文档

    简介 github地址 canal k n l xff0c 译意为水道 管道 沟渠 xff0c 主要用途是基于 MySQL 数据库增量日志解析 xff0c 提供增量数据订阅和消费 canal 工作原理 canal 模拟 MySQL slav
  • canal监听mysql实践

    canal监听mysql实践 canal是用java开发的基于数据库增量日志解析 xff0c 提供增量数据订阅 amp 消费的中间件 目前 xff0c canal主要支持了MySQL的binlog解析 xff0c 解析完成后才利用canal
  • prometheus+grafana监控mysql、canal服务器

    一 prometheus配置 1 prometheus安装 1 1官网下载安装包 xff1a https prometheus io download 1 2解压安装包 xff1a tar zxvf prometheus 2 6 1 lin
  • Canal AdminGuide

    背景 先前开源了一个开源项目 xff1a 阿里巴巴开源项目 基于mysql数据库binlog的增量订阅 amp 消费 本文主要是介绍一下如何部署 amp 使用 环境要求 1 操作系统 a 纯java开发 xff0c windows linu
  • Canal 读取 mysql bin_log

    场景 xff1a 在微服务开发的过程中多个项目协同完成一个功能 xff0c 工程与工程之间存在数据上的解耦 xff0c 底层服务为上层服务提供数据 而底层服务有需要对数据进行管理 解决方案 xff1a 基本底层服务 通过 canal 获取
  • 不同业务场景下数据同步方案设计

    企业开发实践中通常需要提供数据搜索的功能 例如 电商系统中的商品搜索 订单搜索等 通常 搜索任务通常由搜索引擎担当 如Elasticsearch 而我们的原始数据为了安全性等问题通常存储在关系型数据库中 在搜索数据前 我们需要先将数据从关系
  • 搭建: canal部署与实例运行

    1 准备 github https github com alibaba canal 里面有包括canal的文档 server端 client端的 例子 源码包等等 2 canal概述 canal是应阿里巴巴存在杭州和美国的双机房部署 存在
  • 谈谈对Canal(增量数据订阅与消费)的理解

    概述 canal是阿里巴巴旗下的一款开源项目 纯Java开发 基于数据库增量日志解析 提供增量数据订阅 消费 目前主要支持了mysql 也支持mariaDB 起源 早期 阿里巴巴B2B公司因为存在杭州和美国双机房部署 存在跨机房同步的业务需
  • 深入解析中间件之-Canal

    canal 阿里巴巴mysql数据库binlog的增量订阅 消费组件 MySQL binlog MySQL主从复制 mysql服务端修改配置并重启 1 2 3 4 5 6 7 8 9 10 11 12 vi etc my cnf mysql
  • 使用TCP方式拉取Canal数据

    1 Canal对接Kafka联调 1 1 配置修改 canal properties 修改 zk canal zkServers 10 51 50 219 2181 instance properties 开启配置项 canal mq dy
  • 使用canal同步数据,踩坑排雷全过程

    1 mysql配置 1 检查binlog功能是否有开启 mysql gt show variables like log bin Variable name Value log bin OFF 1 row in set 0 00 sec 如
  • canal简介及canal部署、原理和使用介绍

    阿里canal简介及canal部署 原理和使用介绍 canal入门 什么是canal 阿里巴巴B2B公司 因为业务的特性 卖家主要集中在国内 买家主要集中在国外 所以衍生出了杭州和美国异地机房的需求 从2010年开始 阿里系公司开始逐步的尝

随机推荐

  • 没有Android SDK选项的解决办法+修改Android Studio中的Sdk路径

    安装教程 安装Android Studio时没有Android SDK选项 xff0c 可以先不管 xff0c 继续安装 注意在安装的过程中 xff0c 应该在最后一步install时 xff0c 会出现一个sdk的位置 比如我的在C Us
  • Android Studio一直在Download https://services.gradle.org/distributions/gradle-5.4.1-all.zip的解决方法

    Android Studio的新建工程下面一直出现Download https services gradle org distributions gradle 5 4 1 all zip 解决方法 xff1a 去https service
  • TDEngine 集群安装 (K8S)

    1 构建镜像 1 1 entrypoint sh span class token shebang important bin bash span span class token builtin class name set span 4
  • 设置Android Studio中的模拟器

    怎么设置Android Studio中的模拟器 xff0c 下面记录一下大概流程 然后自己选择设备 xff0c next 下好了之后next 建立后可能会出现以下图片所示问题 位于 的ADB二进制文件已过时 xff0c 并且在Android
  • 算法题算法题!!!!

    0223 思路 xff1a 先计算出老板没控制自己的情绪时的满意数量sum xff0c 再根据X的值 xff0c 维护一个滑动窗口 xff0c 遍历grumpy数组 xff0c 计算增加的满意数量add xff0c 选取最大的一个 xff0
  • MongoDB使用教程

    1 下载 xff1a https www mongodb com try download community 2 安装 解压下载包后正常步骤安装 创建服务 e Application develop MongoDB bin为路径 data
  • 动态规划几个例题!!

    动态规划法 xff01 xff01 xff01 dp i j 61 true表示字符串从下标 i 到下标 j 的位置是一个回文子串 xff08 所谓的状态转移 xff09 span class token keyword var span
  • 小白自学PIX飞控学习笔记

    小白自学飞控学习笔记 xff08 三 xff09 飞控开发准备工作准备阶段Misson Planner 高端操作 飞控开发准备工作 准备阶段 地面站电脑上安装mission planner 校准你的飞行器 Pixhawk指示灯的含义 红灯和
  • 线程同步的四种方式

    原文链接 xff1a https blog csdn net qq 40261882 article details 100538711 1 临界区 xff1a 通过对多线程的串行化来访问公共资源或一段代码 xff0c 速度快 xff0c
  • 分卷压缩与解压分卷

    分卷压缩与解压分卷 分卷压缩 名词解释 分卷压缩 分卷压缩操作 应用场景 解压分卷 解压踩坑 解压操作 nbsp nbsp nbsp nbsp 之前有写过一篇关于 Cesium 加载OSGB倾斜摄影三维模型 的文章 对OSGB模型的特点和文
  • C++输出到.txt文档,并被python读取

    C 43 43 中 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 in
  • 主流消息中间件及选型

    应用最为广泛的三大消息中间件 xff1a RabbitMQ RocketMQ kafka 在传统金融机构 银行 政府机构等有一些老系统还在使用IBM等厂商提供的商用MQ产品 选取原则 1 首先 xff0c 产品应该是开源的 开源意味着如果队
  • K8S Core-DNS

    1 Kube dns 1 1 概述 KubeDNS 由三部分构成 xff1a kube dns xff1a 核心组件 KubeDNS xff1a 依赖 client go 中的 informer 机制 xff0c 监听 Service 和
  • 什么是Batch,什么是Epoch?在训练模型的时候经常看到的参数,自己的见解。

    1 首先我们要大概了解一下什么是梯度下降法 xff1a 梯度下降法的基本思想可以类比为一个下山的过程 假设这样一个场景 xff1a 一个人被困在山上 xff0c 需要从山上下来 找到山的最低点 xff0c 也就是山谷 但此时山上的浓雾很大
  • mavlink协议发送与接收--串口版

    mavlink官网 MAVLINK现分为两个版本V1和V2 xff0c 区别是V2的MsgId扩展到24位 xff0c V1只有8位 xff08 0 255 xff09 原理都是差不多的 xff0c 这里以V1为例 xff0c V2也实际测
  • 转载-关于VDDA、VSSA 、参考电压的问题

    在小于等于64Pin的芯片中 xff0c 在芯片的内部Vref 43 是和VDDA连接在一起的 xff0c 也就是说ADC的是以VDDA为参考电压的 那么还有一点需要注意的就是VDDA和VDD的压差必须小于300mV xff0c 否则可能由
  • wsl,Ubuntu,关于解决 mysql-server : 依赖: mysql-server-5.7 但是它将不会被安装 问题

    出现问题 xff1a 安装mysql时 xff0c sudo apt span class token operator span get install mysql span class token operator span serve
  • jQuery-获取/设置 属性(标准属性,自定义属性)和内容

    一 获取 设置内容 text 设置或返回元素的文本内容 xff1b html 设置或返回元素的内容 xff08 包括html标记 xff09 xff1b val 设置或返回表单字段的值 具体例子如下 xff1a 控制台调试 34 Dcoun
  • idea--java开发最常用快捷键

    复制行 xff1a ctrl 43 d 删除行 xff1a ctrl 43 y 将某行代码向下移动 xff1a ctrl 43 shift 43 将某行代码向上移动 xff1a ctrl 43 shift 43 向下插入新行 xff08 e
  • 数据同步之初识Canal

    git地址 xff1a 阿里巴巴Canal的Git地址 Canal基于日志增量订阅和消费的业务包括 xff1a 数据库镜像 数据库实时备份索引构建和实时维护 拆分异构索引 倒排索引 业务cache刷新 带业务逻辑的增量数据处理 Mysql