基于dataX的数据同步平台搭建

2023-11-05

前言

基于Java和DataX工具实现数据同步的后台管理,包括数据同步任务的生成,任务的管理,查看任务的执行日志,解析任务的执行结果等功能。
内含一些技术实现方案、心得体会和填坑经验等干货。
阅读本文之前,需要提前了解一下DataX的含义、使用场景和基本使用方法。

1.项目描述

公司要做一个数据中心的项目,包括数据标准平台,数据集成平台,数据监控平台,数据共享平台。主要从两大技术方向去实现:

  • 消息队列 – 选型为 Kafka。负责微服务之间实时的消息共享
  • 数据同步 – 选型为DataX。数据同步负责大数据量的全量/增量同步,同时也是对消息队列的一种补救措施,即使消息共享失败,在下一次数据同步时,也能够确保全部数据的一致性。

2.思路

DataX的介绍此文不做说明,如需了解,请查阅相关资料。本文仅记录相关使用心得。Datax开源地址

Datax实现数据同步任务的方式是执行一个python命令 操作一个配置了同步信息的json文件:

python [datax.py的路径]  [同步任务的json文件路径] 
python datax.py D:\Software\install\Environment\DataX\datax\job\mysql2mysql.json 

Datax的数据同步,落到实处就是执行上述的python命令。但是对于一个数据中心项目来说,我们需要的不仅仅只是完成这个数据同步的操作。我们需要实现的是全流程的可配置,可视化,可监控,可定时

全流程描述:

  • 前端传入生成json文件所需要的数据、cron表达式
  • 后端解析前端入参,在linux服务器上生成一个json文件,一个包含上述python命令输出日志路径shell脚本,一个linux定时任务。通过定时任务执行shell脚本,即可完成定时执行数据同步任务的操作。(所有文件的命名都与本次的任务ID相关)。
  • 后端在数据库中记录本次的数据同步任务信息,包括任务ID,所有文件的存放路径,cron表达式等信息
  • 前端需要查看数据同步任务的执行结果和执行日志。每次查看的时候,后端根据数据库中记录的当前任务的日志存放路径,读取该路径下的所有文件,解析并记录其中的内容,输出执行成功与否和全部的日志信息,并将解析结果存入数据库中,方便下次查阅。
  • 每个数据同步任务,都可进行暂停、开始和删除操作。通过删除和新增linux的定时任务实现任务的暂停和开始。同步任务只有在暂停状态下可进行删除操作,包括删除数据库记录,删除linux存放的日志文件等。

3.DataX数据同步任务的生成

3.1 环境准备

在 linux服务器上的准备:

  • jdk1.6及以上
  • python2.6及以上
  • datax3.0压缩包下载,解压后需要进行授权:chmod –R 755 {你的目录}

3.2 json生成

  • 创建实体类接收前端参数并生成datax需要的标准json数据。

    分享一个在线JSON转实体类的网址

{
   "job":
   {
      "content":
      [
         {
            "reader":
            {
               "parameter":
               {
                  "password":"YourPassword",
                  "column":
                  [
                     "id",
                     "school_code",
                     "link_id"              
                   ],
                  "connection":
                  [
                     {
                        "jdbcUrl":
                        [
                           "YourUrl"
                        ],
                        "table":
                        [
                           "c_pass_dormitory_1"
                        ]
                     }
                  ],
                  "where":"",
                  "splitPk":"",
                  "username":"YourUserName"
               },
               "name":"mysqlreader"
            },
            "writer":
            {
               "parameter":
               {
                  "postSql":
                  [
                  ],
                  "password":"YourPassWord",
                  "column":
                  [
                     "id",
                     "school_code",
                     "link_id"
                  ],
                  "connection":
                  [
                     {
                        "jdbcUrl":"YourUrl",
                        "table":
                        [
                           "c_pass_dormitory"
                        ]
                     }
                  ],
                  "writeMode":"insert",
                  "batchSize":"1024",
                  "preSql":
                  [
                  ],
                  "username":"YourUserName"
               },
               "name":"mysqlwriter"
            }
         }
      ],
      "setting":
      {
         "errorLimit":
         {
            "percentage":0.15,
            "record":0
         },
         "speed":
         {
            "record":100,
            "channel":1,
            "byte":0
         }
      }
   }
注意:在根据json生成实体类的时候,speed设置中的byte参数转为实体类会有点小问题

在这里插入图片描述
转为实体类就是:

private int byte;

这是不合法的,这里可以重命名一下:

@Data
public class Speed {
    private int channel;
    private int record=100;
    private int sByte;
}

但是要在生成json文件的时候,将sByte替换为byte:
在这里插入图片描述

  • 将json文件存放在linux指定的目录下

java代码:

//生成json文件
private String json(String jsonString) throws Exception {
    // 拼接文件完整路径
    String fullPath = dataxJsonAddress + Tools.getId() + ".json";
    // 生成json格式文件
    // 保证创建一个新文件
    File file = createFile(fullPath);
    // 格式化json字符串
    jsonString = JsonFormatTool.formatJson(jsonString);

    // 将格式化后的字符串写入文件
    java.io.Writer write = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");
    write.write(jsonString);
    write.flush();
    write.close();
    return fullPath;
}

linux存放:
在这里插入图片描述

  • 记录json文件的路径和json数据存表:

在这里插入图片描述

3.3 shell脚本生成

Datax是通过python命令执行Datax任务,因此所生成的shell脚本中 要包括 datax.py、json文件和log的路径信息。

生成shell脚本的代码:

	//创建.sh文件
    private String createShell(String jsonString, String logAddress) throws Exception {
        //拼接shell脚本描述
        String sellString = "#!/bin/bash\n" +
                "source /etc/profile\n" +
                "# 截至时间设置为当前时间戳\n" +
                "end_time=$(date +%s)\n" +
                "# 开始时间设置为60s前时间戳\n" +
                "create_time=$(($end_time - 60))\n" +
                "[dataxPy] [dataxJson]  >>[dataxLog].`date +%Y%m%d%H%M`  2>&1 &";
        sellString = sellString.replace("[dataxPy]", dataxPyAddress)
                .replace("[dataxJson]", jsonString)
                .replace("[dataxLog]", logAddress);
        // 拼接文件完整路径
        String fullPath = dataxShellAddress + Tools.getId() + ".sh";
        File file = createFile(fullPath);
        // 将格式化后的字符串写入文件
        java.io.Writer write = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");
        write.write(sellString);
        write.flush();
        write.close();
        return fullPath;
    }

服务器上生成的shell脚本:
在这里插入图片描述
有了shell脚本后,就可以根据前台传递的cron表达式,创建linux的定时任务,定时执行shell脚本,实现数据同步。因此必须要给新生成的shell脚本添加 可执行权限,Java代码如下:

//给shell文件添加 X 权限,shellAddress:shell脚本的全路径
new ProcessBuilder("/bin/chmod", "755", shellAddress).start();

最终的shell脚本内容:

#!/bin/bash
source /etc/profile
# 截至时间设置为当前时间戳
end_time=$(date +%s)
# 开始时间设置为60s前时间戳
create_time=$(($end_time - 60))
/root/datax/bin/datax.py /root/datax/datax_json/fce1411e47ef45649c5ff8f8930db5a0.json  >>/root/datax/datax_log/d32f460e1339480cb84c286af8223370.`date +%Y%m%d%H%M`  2>&1 &

内容说明:

# datax的python可执行文件路径
/root/datax/bin/datax.py 
# json文件的路径
/root/datax/datax_json/fce1411e47ef45649c5ff8f8930db5a0.json  
# 本次任务的输出日志路径
>>/root/datax/datax_log/d32f460e1339480cb84c286af8223370.`date +%Y%m%d%H%M`  2>&1 &
>>/root/datax/datax_log/d32f460e1339480cb84c286af8223370.`date +%Y%m%d%H%M`  2>&1 &

日志文件的命名采用 任务ID.时间 方式,这样就可以根据任务ID获取该任务下的所有执行日志,用于数据监控。

3.4 定时任务生成

Java代码:

	//配置 linux上的定时任务
    private void createCron(String cron,String shellAddress) throws Exception{
        //执行操作指令 (echo "*/1 * * * * /root/datax/datax_sh/text.sh >>/root/crontab 2>&1 &";crontab -l )| crontab
        String cmd = "(echo \"" + cron + " " + shellAddress + " >>/root/crontab 2>&1 &\";crontab -l )| crontab";
        System.out.println("cmd命令" + cmd);
        Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
    }

部分定时任务信息:
在这里插入图片描述

3.5 定时任务执行

定时任务的执行脚本里,已经配置了执行日志的输出路径,每次执行后都会在指定目录下生成指定命名规则的日志文件。
部分日志文件如图:
在这里插入图片描述
上文也描述了,日志命名的规范,这样的命名方式是为后期数据监控做准备。可以解析出每次数据同步任务的执行状态和执行日志。

3.6 任务信息存入任务表

任务表的字段信息:

CREATE TABLE `c_info_datax_task` (
  `id` varchar(36) NOT NULL DEFAULT '' COMMENT '主键',
  `school_code` varchar(20) DEFAULT NULL COMMENT '学校编码',
  `task_name` varchar(50) DEFAULT '' COMMENT '任务名',
  `task_log_address` varchar(300) DEFAULT '' COMMENT '日志地址',
  `task_json_address` varchar(300) DEFAULT '' COMMENT 'json地址',
  `task_json_data` varchar(2000) DEFAULT NULL COMMENT 'json字符串信息',
  `task_shell_address` varchar(300) DEFAULT '' COMMENT 'shell地址',
  `task_cron` varchar(300) DEFAULT '' COMMENT 'cron表达式',
  `status` varchar(10) DEFAULT '1' COMMENT '状态',
  `deleted` varchar(1) DEFAULT '0' COMMENT '1:已删除,0:正常',
  `remarks` varchar(255) DEFAULT '' COMMENT '备注',
  `create_user` varchar(36) DEFAULT NULL COMMENT '创建者',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_user` varchar(36) DEFAULT NULL COMMENT '更新人',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='数据同步-任务表';

部分数据信息:
在这里插入图片描述
数据同步任务信息查看:
在这里插入图片描述

总结

至此,一个基于DataX的数据同步任务就配置好了,大体步骤:

  • 生成json文件
  • 配置shell脚本
  • 设置linux定时任务
  • 所需信息入库,生成一条任务信息。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于dataX的数据同步平台搭建 的相关文章

  • 获取对 JOptionPane 静态方法创建的对象的引用

    我想知道是否可以获取对由 JOptionPane 的静态方法之一 例如 showMessageDialog 创建的 JDialog 对象的引用 我打算修改对话框在屏幕上出现的位置 更具体地说 我希望对话框默认显示在主应用程序窗口的左上角 而
  • 如何从另一个 Flux 中排除 Flux 中的所有元素

    我有两个Flux一个用于成功元素 另一个用于保存错误元素 Flux
  • MySQL 和 Hibernate 之间的主键自增由谁负责?

    MySQL CREATE TABLE role id role INT 11 unsigned NOT NULL AUTO INCREMENT PRIMARY KEY id role AUTO INCREMENT 1 休眠 Entity p
  • “错误:无法找到或加载主类 org.apache.hadoop.util.RunJar”是什么意思?

    我正在尝试运行一个示例 因为它指出 Hadoop 实践 一书 http www manning com lam 第 15 页 这是需要运行的命令 bin hadoop jar hadoop examples jar 但我收到这个错误 Err
  • 使用 GIN 注入 Class

    有没有办法注入类类型Class
  • Java 8 流排序字符串列表[重复]

    这个问题在这里已经有答案了 我正在流上调用排序方法 java 文档说 Sorted 方法返回一个由该流的元素组成的流 并根据自然顺序排序 但是当我运行下面的代码时 List
  • 如何仅使用命令行运行 Maven 创建的 jar 文件

    我需要一些帮助来尝试使用命令行运行以下 Maven 项目 https github com sarxos webcam capture https github com sarxos webcam capture webcam captur
  • IntelliJ IDEA:忽略代码覆盖率中的琐碎方法

    在 IntelliJ IDEA 15 0 2 中 如何在测试覆盖率测量期间忽略琐碎的 getter 和 setter 琐碎方法 should be measure public void complex fancy interesting
  • 使用 Arrays.copyOf 复制不同类型的数组时出现问题

    我正在尝试创建一个方法 该方法几乎将任何内容作为参数 并返回带有某些分隔符的值的串联字符串表示形式 public static String getConcatenated char delim Object names String st
  • CellTables 和 css (GWT)

    有人可以帮我设置 CellTable 行的背景吗 拜托 我整晚都在努力做这件事 但一直失败 这是 让我发疯 我尝试过调用setRowStyles 但是那个 似乎不起作用 我读到你无法更改 设置样式后 CellTable 的样式 作为默认样式
  • 使 @Schedule 在集群环境中仅运行一次

    我有两个 tomee 实例集群 每个都有一个方法注释如下 Schedule dayOfWeek public void runMeDaily 我只想每天运行一次这个方法 每天不两次 每个实例一次 我可以使用此处描述的标志仅在一个WebLog
  • 将文件内容存储到数组中

    我的刽子手程序有问题 我真的认为我需要做的事情超出了我对java的理解 这是我的代码 import java io BufferedReader import java io FileReader import java io FileNo
  • JavaFX:在 WebView img 标签中未加载本地图像

    以下是我的代码 一切安好 我可以加载远程页面 我可以放置 HTML 内容 但我的img标签显示一个X标志表示无法加载图像 Note 我的图像与类位于同一个包中JavaFX在 Smiley 文件夹中 我可以列出所有图像 这意味着路径没有问题
  • 更新 Maven 项目模块中的父版本

    我有一个奇怪的场景 我有一个项目 Y 它有一个模块 X 和一些其他模块 X 是项目 Y 的一部分 但它不作为该项目的模块链接 因此 每次发布 Y 的新版本时 都需要有人手动更新 X 中的父版本 我需要以这样的方式更新 Y 项目 a 每次发布
  • 线程缓存和 Java 内存模型

    我正在尝试了解 Java 内存模型和线程 据我了解 每个线程都有 主 内存的本地副本 因此 如果一个线程尝试更改int变量 例如某个对象的变量 它会缓存int变量 如果它更改它 其他线程可能看不到更改 但是如果线程缓存一些对象而不是 int
  • 我们还需要迭代器设计模式吗? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 使用 JavaFX 和 Maven 将模块描述符添加到库中[重复]

    这个问题在这里已经有答案了 我需要使用反思 https github com ronmamo reflections在一个带有 JavaFX 的 Maven 项目中 我想使用jlink捆绑一个最小的 JRE 问题是我运行时出现以下错误mvn
  • javaFX,抛出 NullPointerException,位置是必需的

    我看过其他答案 但没有任何帮助我 抱歉 GUI新手只知道swing的基础知识 这是主课 package application import javafx application Application import javafx fxml
  • 请解释*贪婪量词的工作原理

    Pattern ptn Pattern compile a Matcher mtch ptn matcher bbaac if mtch find System out println mtch group 输出 不打印任何内容 Patte
  • 无法使用 Struts 2 重定向 JSP 文件并显示值

    我创建了一个简单的程序 使用文本字段获取用户的名字和姓氏 但问题是 当我单击提交按钮时 我无法将其重定向到另一个显示用户名字和姓氏的 jsp 文件 这是我的HelloAction class package com novamsc trai

随机推荐