datax编译clickhousewriter插件,及mysql->ck自动增量

2023-11-12

安装clickhosue:

Clickhouse安装(新手必看)_初念、LL的博客-CSDN博客_clickhouse安装

安装mysql

clickhouse创建表,字段和需导入的mysql表相同;

datax实现mysql到clickhouse同步:

官方编译的datax包默认没有clickhousewriter插件,需下载源码手动编译:

GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。

下载并解压到本地

安装maven环境(参考网上教程,修改mirror为阿里云源)

安装jdk1.8(建议安装1.8版本,15以上版本编译会报错)

1.编译clickhousewriter

我们只需要clickhousewriter,将其他模块从datax目录下pom.xml中删除,否则其他组件可能找不到依赖,导致编译失败。

 

使用官方提供的命令进行编译:

mvn -U clean package assembly:assembly -Dmaven.test.skip=true

编译成功:

 

编译好的文件都在datax对应目录target下:

 

 

2.安装使用

下载官方编译好的包:

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

$ cd  {YOUR_DATAX_HOME}/bin

$ python datax.py {YOUR_JOB.json}

目录结构:

bin目录下是python脚本文件,主要用来执行job文件(默认需要依赖Python2的环境,也可以修改为Python3)

conf目录存放一些配置文件

job目录下存放了一个job测试文件(我们通过datax-web生成的临时job文件不会放在这里,而是在data-web里边自己配置存放目录)

lib是依赖的一些jar包

log目录存放job文件的执行日志

plugin目录存放的是对不同数据源读取(Reader)和写入(Writer)的插件支持

如果没有在plugin目录下发现自己需要的Reader或者Writer则需要自己手动安装(比如ES的Reader和Writer)。

job文件是一个JSON格式的文件,每一个job文件都代表一个同步任务,执行job文件需要使用bin目录datax.py脚本。执行命令如下命令执行job任务:

python datax.py job文件

生成模板:

执行 python datax.py -r mysqlreader -w mysql_writer,然后将控制台生成的模板,保存到 datax的job目录下,存为一个json文件。也可以执行 python datax.py -r {YOUR_READER} -w {YOUR_WRITER} > ../job/test.json 直接将输出重定至到文件中

因为官方编译包默认没有带clickhosue插件,这里提示没有clickhousewriter插件,我们将刚才编译好的clickhouse插件放到服务器datax/plugin/writer目录下。

执行python datax.py  -r mysqlreader -w clickhousewriter

生成模板成功,基于模板进行修改:

{

    "job": {

        "content": [

            {

                "reader": {

                    "name": "mysqlreader",

                    "parameter": {

                        "column": [

                                "id",

                                "col1",

                                "col2",

                                "create_date"

                        ],

                        "connection": [

                            {

                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/oms?useUnicode=true&characterEncoding=utf-8&useSSL=false"],

                                "table": [

                                        "test02"

                                ]

                            }

                        ],

                        "password": "xxxx",

                        "username": "root",

                        "where": ""

                    }

                },

                "writer": {

                    "name": "clickhousewriter",

                    "parameter": {

                        "batchByteSize": 134217728,

                        "batchSize": 65536,

                        "column": [

                                                        "id",

                                                        "col1",

                                                        "col2",

                                                        "create_date"

                        ],

                        "connection": [

                            {

                                "jdbcUrl": "jdbc:clickhouse://127.0.0.1:8123/test", 

                                "table": [

                                    "test02"

                                ]

                            }

                        ],

                        "dryRun": false,

                        "password": "xxx",

                        "postSql": [],

                        "preSql": [],

                        "username": "default",

                        "writeMode": "insert"

                    }

                }

            }

        ],

        "setting": {

            "speed": {

                "channel": "5"

            }

        }

    }

}

运行job

python datax.py ../job/mysql2clickhouse.json

 

3.增量同步

按天增量同步:

 

新增两条日期为今天的数据:

 

vi increment.sh

添加到定时,每天执行一次

#!/bin/bash  

. /etc/profile  

#当前时间,用于增量判断  

curr_time=$(date -d last-day +%Y-%m-%d)  

dodir=`pwd`  

#datax增量同步  

python /home/datax/datax/bin/datax.py  /home/datax/datax/job/mysqlday2clickhouse.json  -p "-Dcurr_time=$curr_time"  >>$dodir$(date "+%Y%m%d").log  2>&1 & 

json文件:

………

"where": "create_date='${curr_time}'"

………

通过datetime类型字段实现增量同步:

#!/bin/bash

. /etc/profile

set -x

#当前时间,用于增量判断

clickhouse_passwd=xxxxx

clickhouse_user=default

curr_time=$(date +%Y-%m-%d\ %H:%M:%S)

last_time=`clickhouse-client -u $clickhouse_user --password="$clickhouse_passwd" -q "select max(create_date) from test.test02"`

#datax增量同步

python /home/datax/datax/bin/datax.py  /home/datax/datax/job/mysqlIncremental2clickhouse.json  -p "-Dcurr_time='$curr_time' -Dlast_time='$last_time'" >>/home/datax/datax/bin/sh$(date "+%Y%m%d").log  2>&1 &

~

json:

将脚本添加到定时任务

插入几条数据

 

查看脚本输出,同步成功

部分转载:
DataX 同步mysql到clickhouse_福州-司马懿的博客-CSDN博客_datax支持clickhouse
 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

datax编译clickhousewriter插件,及mysql->ck自动增量 的相关文章

  • 根据另一列过滤一列中的数据值,然后将值插入到同一个 SQL 表中的不同列中

    这是我试图使用 SSIS 和条件分割转换来解决的一个难题 我有一个 csv 文件 其中一行中包含每个唯一用户的属性数据 另一列中包含每个属性的值 IE Attribute Attribute Type ID 0000000001 Birth
  • 无法将变量值传递给 ssis 中的存储过程

    执行SSIS包时 出现以下错误 OLE DB 源 83 错误 SQL 命令需要名为的参数 Sales person 在参数映射中找不到 SSIS Pipeline 错误 OLE DB 源在预执行阶段失败 并且 返回错误代码 0xC02070
  • 为什么 Visual Studio 2019 社区中我的 SSIS 工具箱为空?

    我安装了 Visual Studio 2019 Community 然后安装了数据工具 我可以打开 Integration Services 项目 但当我查看 SSIS 工具箱时 它是空的 我该如何解决 我使用的是 Visual Studi
  • ODI12c 中每个映射的最大用户数

    我是 ODI 新手 在从事 ODI 项目时 我面临一个问题 我在 ODI12c 中有 10 个映射 并且所有映射都使用相同的目标表 但由于某些性能问题 我希望一次最多只有 2 个用户可以执行映射 最多 2 个映射 因为他们使用相同的目标表
  • 使用 AWS Glue 时如何查找更新的行?

    我正在尝试使用 Glue 对从 RDS 迁移到 Redshift 的数据进行 ETL 据我所知 Glue 书签仅使用指定的主键查找新行 而不跟踪更新的行 然而 我正在处理的数据往往会频繁更新行 我正在寻找可能的解决方案 我对 pyspark
  • JOLT 移位转换以过滤数组中的值

    我想使用 JOLT 转换来做两件事 过滤名为 myarray 的数组中的元素 以便仅保留具有 v 518 属性的元素 过滤掉除 v 518 和 lfdn 之外的其余元素的所有属性 Input isError false isValid tr
  • SQL 脚本到 SSIS 表达式

    我有下面的 T SQL 查询行 我正在尝试将其转换为 Visual Studio SSIS 表达式到派生列任务 So tableA刚刚 Work item Submission no 列 但我需要将它们分成两列 例如SubmissionCo
  • 如何在平面文件连接管理器上重新配置列信息?

    我有一个正在从平面文件读取数据的平面文件源 我们最近在此平面文件中添加了一个新列 平面文件数据被插入到数据库表中 为了适应目标组件中的新字段 我使用了ALTER TABLE语句将新列添加到表中 这是我所做的唯一改变 平面文件和目标组件之间的
  • 如何将 TFS 源代码管理与 Business Intelligence Studio 集成?

    我正在运行 Visual Studio 2010 Ultimate 它与 TFS 源代码管理集成 但是 当我运行 SQL Server 2008 Business Inteligence Studio 时 没有提供源代码管理 当我查看 工具
  • ErrorColumn 值不作为 Lineage ID 存在

    在插入目标表期间 发生的任何错误都会被重定向到错误表 我们可以在其中看到ErrorCode and ErrorColumn 问题是我们得到了一个值ErrorColumn它不存在于包中的任何地方 也就是说 没有一个列具有LineageID等于
  • 从事务性平面数据库填充事实表和维度表的最佳实践

    我想在 SSIS SSAS 中填充星型模式 多维数据集 我准备了所有维度表和事实表 主键等 源是一个 平面 项目级别 表 我现在的问题是如何拆分它 并将其从一个放入相应的表中 我做了一些谷歌搜索 但找不到令人满意的解决方案 人们会认为这是
  • 释放对执行进程任务中使用的变量的锁定SSIS

    我有一个包裹Foreach容器 and 执行流程任务 inside 对于每个容器 在执行流程任务中出现一些错误时 它会重定向到OnError事件处理程序对于每个容器 我正在使用 exe 捕获错误标准误差变量任务的属性并在脚本任务中使用它On
  • 根据单元格位置将选择性字段从 Excel 批量插入到 SQL

    我有一个 SSIS 包 我必须从 Excel 工作表中选择一些值并将它们插入到 SQL Server 数据库表中 我是通过执行 sql 任务来完成的 这些是步骤 从映射表中选择所有记录 单元格位置是动态的 因此将其保留在 SQL 表中 大约
  • 在 SSIS 中使用 OLE DB 从 Sybase 提取数据时出错

    我在 SSIS 2017 中使用 Advantage 11 OLE DB Provider 从 Sybase 提取数据时遇到问题 我可以连接到数据库 查看表列表 并且在选择表作为数据源时 我可以看到列 但是 当我单击 预览 或运行数据流任务
  • 从原始数据创建 n 个新行,例如 (1000....1000+n)

    我需要从 Excel 工作簿中读取数据 其中数据以这种方式存储 Company Accounts Company1 3000 3999 Company2 4000 4019 4021 4024 在 SSIS 中使用 OLE DB 目标的预期
  • 如何在 ssis 包 2016 中捕获毫秒时间戳

    如何在 ssis 包 2016 中捕获当前时间戳 我声明了一个变量并使用表达式 但缺少毫秒 currenttimestamp DT WSTR 50 DT DBTIMESTAMP System StartTime 我也想要毫秒 Thanks
  • 如何跳过 SSIS 数据流中的最后一行

    我在用FlatFile Source Manager gt Script COmponent as Trans gt OLEDB destination在我的数据流中 源从平面文件读取所有行 我想跳过更新数据库的最后一行 预告片记录 由于它
  • 删除或更改 ETL 中的记录

    我有一个表 我在上面构建了 ETL 服务 货物记录 到达 离开 进入表格 我已经这样做了 我的桌子将被删除 当项目标识符第二次到达数据库时 两条记录都被删除 label cost time x2 29 14 5 2020 01 00 00
  • SSIS:“错误:表达式“@[User::FileName].....无法写入属性”的结果

    过去 10 个小时我一直在尝试解决以下问题 我有一个 ForEach 循环容器 它在我的控制流中枚举 Excel 文件名 在 ForEach 循环容器中 我有一个将数据导入 Sql Server 的 Excel 源 这使用 User Fil
  • 在压缩存档内的文本文件上运行“head”,而不解压存档

    问候 我接手了之前的团队并编写了处理 csv 文件的 ETL 作业 我在 ubuntu 上结合使用 shell 脚本和 perl csv 文件很大 它们以压缩档案形式到达 解压后 很多都超过 30Gb 是的 那是 G 旧进程是在 cron

随机推荐

  • qt中lable中更改字体字号加粗等

    以下内容摘抄博客 https blog csdn net superbfly article details 53199731 utm medium distribute pc relevant none task blog BlogCom
  • 音频驱动篇之pop音攻略

    接触音频驱动工作也有2年的时间了 这这段时间里深刻感受了手机行业的更新换代是MB的迅速 2年的时间里 从TI到QUALCOMM 从android2 1到4 2 从单核到四核 经我参与的项目就有20款 日子是相当的难过 今天回头来说一些我在研
  • CentOS 使用nc命令进行端口扫描

    目录 CentOS 6 中nc命令的使用 CentOS 7 中nc命令的使用 使用nc命令可以探测目标主机的端口 但是在Centos 6 和 CentOS 7中这个命令的使用有所不同 甚至可以说功能已经不同 下面分别是CentOS 6 和
  • gitee密码修改后,pycharm权限不够提醒(windows10)

    问题 gitee密码修改后 pycharm更新代权限不够提醒 windows10 解决办法 gitee密码修改后 要在windows中同时进行修改 步骤如下 具体如图 控制面板 gt 所有控制面板项 gt 凭据管理器 gt Windows凭
  • vue 学习相关笔记大全

    与三阶段无关 框架是什么 封装与业务 功能 无关的代码块 简化了我们对于某些功能的代码量 但是我们需要记一套当前框架的语法 淘宝镜像 npm的服务器在国外 咱们国内下载的时候很慢 淘宝就自建了一个服务器 每个10分钟 就把npm的服务器里面
  • 这是一份面向3年以上Android开发者的中高级面试宝典,拔剑金九银十,大厂直通车

    前言 这是 拔剑金九银十 的第二篇文章 本文主要针对3年以上的Android开发者进阶面试中高级开发工程师而整理 三年以下小伙伴请移步 这是一份面向0 3年Android开发者的面试宝典 2020一线互联网大厂面试真题系统收录 希望可以对你
  • Arthur系统性详解微服务-完善中

    第一篇 微服务的意义 1 常见架构对比 第二篇 微服务的构建 1 微服务建模关注点及方法论 1 1 服务分类 1 2 服务模型 1 3 服务边界 1 4 服务数据 2 服务拆分和集成 2 1 服务拆分及方法论 2 1 1 服务拆分的维度 策
  • 配置环境说明

    工具及环境介绍 Eclipse 是一个开放源代码的 基于Java的可扩展开发平台 就其本身而言 它只是一个框架和一组服务 用于通过插件组件构建开发环境 Tomcat是一个应用服务器 他可以运行按照J2EE中的Servlet规范编写好的Jav
  • mysql数据库常用命令

    1 显示当前数据库服务器中的数据库列表 mysql gt show databases 2 创建数据库 mysql gt create database item character set utf8mb4 3 创建用户 mysql gt
  • unity3d 摄像机跟随角色时被物体遮挡解决方案

    unity3d 摄像机跟随角色时被物体遮挡解决方案 未被遮挡时 为了解决这个问题 在这里我们需要用到 Physics RaycastAll 使用方法详见圣典 首先 我们引入命名空间 System Collections Generic 然后
  • 电压电流双环控制PI参数计算01

    1 截止频率 将内环看作一个采样环节 对外环给定信号进行采样 同理驱动电路对内环给定信号进行采样 为保持环路稳定 外环截止频率 lt 内环截止频率 lt 开关频率 通常内环截止频率一般设置为开关频率的1 10 外环截止频率一般设置为开关频率
  • 内部排序算法比较(超详解)

    一 题目描述 通过随机数据比较各排序算法的关键字比较次数和关键字移动次数 以 及执行时间 取得直观感受 二 设计要求 一 需求分析 实现各排序算法 分别进行以下各组比较 并进行总结 一 各算法在不同规模下的比较 1 比较范围 直接插入排序
  • 辗转相除法求最大公约数 C/C++

    文章目录 辗转相除法的概念 用递归实现 用循环实现 辗转相除法的概念 辗转相除法是用来求两个正整数最大公约数的算法 古希腊数学家欧几里得在其著作 The Elements 中最早描述了这种算法 所以又被命名为欧几里得算法 扩展欧几里得算法可
  • 什么是Portlet ?

    什么是Portlet 作者 Sunil Patil 译者 observer 版权声明 任何获得Matrix授权的网站 转载时请务必以超链接形式标明文章原始出处和作者信息及本声明 作者 Sunil Patil observer 原文地址 ht
  • springboot 之yaml配置文件注入

    配置文件 spring boot使用一个全局配置文件 配置文件的名称是固定的 application properties 和application yml文件 默认是 properties文件 配置文件的作用 修改spring boot自
  • MVC中Ajax的简单实现(多种传值方法)

    这几天在练习下MVC中Ajax中视图与控制器之间传值问题 时不时有些写法错误 导致传值失败 特把成功传值实现方法写下 Index cshtml视图
  • Python代码~加油

    import turtle as t import time 由于会重复用到多次以下操作 故写成函数 def hua a b c d t goto a b t down t goto c d t up def heng a b c hua
  • Unhandled exception:java.text.ParseException

    最近遇到一个这样的错 在我敲完这两句话后 下面自动跳红线了那么这是怎么回事呢 解决办法 在方法声明后加 throws Exception 为什么要这么写 throws 的作用是不在本方法中进程异常处理 而是抛给调用此方法的类中进行处理 解释
  • n个顾客等待时间最短(贪心算法)

    设有n个顾客同时等待一个服务 顾客i需要的服务时间为ti 1 lt i lt n 共有s处可以提供此服务 应如何安排n个顾客的服务才能使平均等待时间达到最短 平均的带时间时n个顾客等待服务时间的总和除以n 方法 先按从大到小排序 然后再挨个
  • datax编译clickhousewriter插件,及mysql->ck自动增量

    安装clickhosue Clickhouse安装 新手必看 初念 LL的博客 CSDN博客 clickhouse安装 安装mysql 在clickhouse创建表 字段和需导入的mysql表相同 datax实现mysql到clickhou