【Java多线程批量数据导入的方法】

2023-11-20

前言:

当遇到大量数据导入时,为了提高处理的速度,可以选择使用多线程来批量处理这些处理。常见的场景有:

  • 大文件导入数据库(这个文件不一定是标准的CSV可导入文件或者需要在内存中经过一定的处理)
  • 数据同步(从第三方接口拉取数据处理后写入自己的数据库)

以上的场景有一个共性,这类数据导入的场景简单来说就是将数据从一个数据源移动到另外一个数据源,而其中必定可以分为两步

  • 数据读取:从数据源读取数据到内存
  • 数据写入:将内存中的数据写入到另外一个数据源,可能存在数据处理

而且根据读取的速度一般会比数据写入的速度快很多,即读取快,写入慢。

设计思路

由于场景的特点是读取快,写入慢,如果是使用多线程处理,建议是数据写入部分改造为多线程。而数据读取可以改造成批量读取数据。简单来说就是两个要点:

  • 批量读取数据
  • 多线程写入数据

示例

多线程批量处理最简单的方案是使用线程池来进行处理,下面会通过一个模拟批量读取和写入的服务,以及对这个服务的多线程写入调用作为示例,展示如何多线程批量数据导入

  1. 模拟服务
import java.util.concurrent.atomic.AtomicLong;
/**
* 数据批量写入用的模拟服务
*
* @author RJH
* create at 2019-04-01
*/
public class MockService {
/**
* 可读取总数
*/
private long canReadTotal;
/**
* 写入总数
*/
private AtomicLong writeTotal=new AtomicLong(0);
/**
* 写入休眠时间(单位:毫秒)
*/
private final long sleepTime;
/**
* 构造方法
*
* @param canReadTotal
* @param sleepTime
*/
public MockService(long canReadTotal, long sleepTime) {
this.canReadTotal = canReadTotal;
this.sleepTime = sleepTime;
}
/**
* 批量读取数据接口
*
* @param num
* @return
*/
public synchronized long readData(int num) {
long readNum;
if (canReadTotal >= num) {
canReadTotal -= num;
readNum = num;
} else {
readNum = canReadTotal;
canReadTotal = 0;
}
//System.out.println("read data size:" + readNum);
return readNum;
}
/**
* 写入数据接口
*/
public void writeData() {
try {
// 休眠一定时间模拟写入速度慢
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 写入总数自增
System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
}
/**
* 获取写入的总数
*
* @return
*/
public long getWriteTotal() {
return writeTotal.get();
}
}

  1. 批量数据处理器
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 基于线程池的多线程批量写入处理器
* @author RJH
* create at 2019-04-01
*/
public class SimpleBatchHandler {
private ExecutorService executorService;
private MockService service;
/**
* 每次批量读取的数据量
*/
private int batch;
/**
* 线程个数
*/
private int threadNum;
public SimpleBatchHandler(MockService service, int batch,int threadNum) {
this.service = service;
this.batch = batch;
//使用固定数目的线程池
this.executorService = Executors.newFixedThreadPool(threadNum);
}
/**
* 开始处理
*/
public void startHandle() {
// 开始处理的时间
long startTime = System.currentTimeMillis();
System.out.println("start handle time:" + startTime);
long readData;
while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止
for (long i = 0; i < readData; i++) {
executorService.execute(() -> service.writeData());
}
}
// 关闭线程池
executorService.shutdown();
while (!executorService.isTerminated()) {//等待线程池中的线程执行完
}
// 结束时间
long endTime = System.currentTimeMillis();
System.out.println("end handle time:" + endTime);
// 总耗时
System.out.println("total handle time:" + (endTime - startTime) + "ms");
// 写入总数
System.out.println("total write num:" + service.getWriteTotal());
}
}
  1. 测试类
/**
* SimpleBatchHandler的测试类
*/
public class SimpleBatchHandlerTest {
public static void main(String[] args) {
// 总数
long total=100000;
// 休眠时间
long sleepTime=100;
// 每次拉取的数量
int batch=100;
// 线程个数
int threadNum=16;
MockService mockService=new MockService(total,sleepTime);
SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
handler.startHandle();
}
}
  1. 运行结果
start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分输出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【Java多线程批量数据导入的方法】 的相关文章

  • 在 Java 中连接和使用 Cassandra

    我已经阅读了一些关于 Cassandra 是什么以及它可以做什么的教程 但我的问题是如何在 Java 中与 Cassandra 交互 教程会很好 如果可能的话 有人可以告诉我是否应该使用 Thrift 还是 Hector 哪一个更好以及为什
  • 为什么 i++ 不是原子的?

    Why is i Java 中不是原子的 为了更深入地了解 Java 我尝试计算线程中循环的执行频率 所以我用了一个 private static int total 0 在主课中 我有两个线程 主题 1 打印System out prin
  • Final字段的线程安全

    假设我有一个 JavaBeanUser这是从另一个线程更新的 如下所示 public class A private final User user public A User user this user user public void
  • 反射找不到对象子类型

    我试图通过使用反射来获取包中的所有类 当我使用具体类的代码 本例中为 A 时 它可以工作并打印子类信息 B 扩展 A 因此它打印 B 信息 但是当我将它与对象类一起使用时 它不起作用 我该如何修复它 这段代码的工作原理 Reflection
  • JavaMail 只获取新邮件

    我想知道是否有一种方法可以在javamail中只获取新消息 例如 在初始加载时 获取收件箱中的所有消息并存储它们 然后 每当应用程序再次加载时 仅获取新消息 而不是再次重新加载它们 javamail 可以做到这一点吗 它是如何工作的 一些背
  • 磁模拟

    假设我在 n m 像素的 2D 表面上有 p 个节点 我希望这些节点相互吸引 使得它们相距越远吸引力就越强 但是 如果两个节点之间的距离 比如 d A B 小于某个阈值 比如 k 那么它们就会开始排斥 谁能让我开始编写一些关于如何随时间更新
  • 我可以使用 HSQLDB 进行 junit 测试克隆 mySQL 数据库吗

    我正在开发一个 spring webflow 项目 我想我可以使用 HSQLDB 而不是 mysql 进行 junit 测试吗 如何将我的 mysql 数据库克隆到 HSQLDB 如果您使用 spring 3 1 或更高版本 您可以使用 s
  • 路径中 File.separator 和斜杠之间的区别

    使用有什么区别File separator和一个正常的 在 Java 路径字符串中 与双反斜杠相反 平台独立性似乎不是原因 因为两个版本都可以在 Windows 和 Unix 下运行 public class SlashTest Test
  • 无法解析插件 Java Spring

    我正在使用 IntelliJ IDEA 并且我尝试通过 maven 安装依赖项 但它给了我这些错误 Cannot resolve plugin org apache maven plugins maven clean plugin 3 0
  • 十进制到八进制的转换[重复]

    这个问题在这里已经有答案了 可能的重复 十进制转换错误 https stackoverflow com questions 13142977 decimal conversion error 我正在为一个类编写一个程序 并且在计算如何将八进
  • 禁止的软件包名称:java

    我尝试从数据库名称为 jaane 用户名 Hello 和密码 hello 获取数据 错误 java lang SecurityException Prohibited package name java at java lang Class
  • 为什么HashMap不能保证map的顺序随着时间的推移保持不变

    我在这里阅读有关 Hashmap 和 Hashtable 之间的区别 http javarevisited blogspot sg 2010 10 difference Between hashmap and html http javar
  • Google App Engine 如何预编译 Java?

    App Engine 对应用程序的 Java 字节码使用 预编译 过程 以增强应用程序在 Java 运行时环境中的性能 预编译代码的功能与原始字节码相同 有没有详细的信息这是做什么的 我在一个中找到了这个谷歌群组消息 http groups
  • 在mockito中使用when进行模拟ContextLoader.getCurrentWebApplicationContext()调用。我该怎么做?

    我试图在使用 mockito 时模拟 ContextLoader getCurrentWebApplicationContext 调用 但它无法模拟 here is my source code Mock org springframewo
  • 如何从泛型类调用静态方法?

    我有一个包含静态创建方法的类 public class TestClass public static
  • 玩!框架:运行“h2-browser”可以运行,但网页不可用

    当我运行命令时activator h2 browser它会使用以下 url 打开浏览器 192 168 1 17 8082 但我得到 使用 Chrome 此网页无法使用 奇怪的是它以前确实有效 从那时起我唯一改变的是JAVA OPTS以启用
  • 编译器抱怨“缺少返回语句”,即使不可能达到缺少返回语句的条件

    在下面的方法中 编译器抱怨缺少退货声明即使该方法只有一条路径 并且它包含一个return陈述 抑制错误需要另一个return陈述 public int foo if true return 5 鉴于Java编译器可以识别无限循环 https
  • 在 Maven 依赖项中指定 jar 和 test-jar 类型

    我有一个名为 commons 的项目 其中包含运行时和测试的常见内容 在主项目中 我添加了公共资源的依赖项
  • 按日期对 RecyclerView 进行排序

    我正在尝试按日期对 RecyclerView 进行排序 但我尝试了太多的事情 我不知道现在该尝试什么 问题就出在这条线上适配器 notifyDataSetChanged 因为如果我不放 不会显示错误 但也不会更新 recyclerview
  • 如何实现仅当可用内存较低时才将数据交换到磁盘的写缓存

    我想将应用程序生成的数据缓存在内存中 但如果内存变得稀缺 我想将数据交换到磁盘 理想情况下 我希望虚拟机通知它需要内存并将我的数据写入磁盘并以这种方式释放一些内存 但我没有看到任何方法以通知我的方式将自己挂接到虚拟机中before an O

随机推荐

  • 微信小程序接入支付功能并实现支付

    随着微信小程序越来越广泛的应用 现在小程序几乎无所不能 绝对啦 哈哈 那么就会有很多微信小程序需要有支付的需求 那么该文章将带领大家走一遍如何实现微信小程序的支付功能 第一步 微信小程序管理后台 gt 微信支付 gt 接入微信支付 及关联
  • 0基础也能看懂,熬夜7天肝出这一份3w字软件测试学习手册【建议收藏】

    随着互联网行业的发展迅速 很多人都想涌进来 近年来软件测试岗位也呈现出了前所未有的火爆趋势 尤其2021年国家实现教育 双减 政策 激起了很多教培从业者 幼师 机械加入软件测试行业学习 剑哥今天抽个时间简单的给大家说下 对于0基础的朋友到底
  • python3图像处理_Python3与OpenCV3.3 图像处理(二)--图像基本操作

    一 本节简述 本节主要讲解图像的一些基础知识 以及图像的加载和获得属性 最后将会学到 OpenCV 摄像头的简单使用 二 图像基本知识 1 图像是什么 图像是客观对象的一种相似性的 生动性的描述或写真 是人类社会活动中最常用的信息载体 或者
  • CST2020 安装包和安装步骤

    安装包和破解码的百度云链接 链接 https pan baidu com s 1RNSWxVxb DIu8dg8gkCzAw 提取码 dve7 如果失效可评论留言 谢谢 1 关闭防火墙和杀毒软件 2 解压后 以管理员模式运行setup文件
  • 使用inet_ntop转换IPv6地址时在macOS和linux上的行为不一样

    下面这段python代码在macOS和linux时运行的结果是不同的 import socket ip socket inet pton socket AF INET6 1 2 3 0 5 6 7 8 print socket inet n
  • ubuntu20.04 apt 安装报 E: Unable to correct problems, you have held broken packages.

    在安装软件的时候报错 root root sudo apt get install vim Reading package lists Done Building dependency tree Reading state informat
  • Leetcode刷题日志5.0

    目录 前言 1 两数相加 2 无重复字符的最长子串 3 整数反转 4 删除链表的倒数第 N 个结点 前言 今天我又来继续分享最近做的题了 现在开始进入我们快乐的刷题时间吧 编程语言Python3 0 难度 中等 1 两数相加 给你两个 非空
  • Redis工具类(缓存操作,Object转换成JSON数据)

    依赖spring data redis 2 4 1 jar Component Data public class RedisUtils Autowired private RedisTemplate
  • 双向链表详解

    目录 一 双向链表的概念及结构 二 双向链表的方法及其实现 2 1 双向链表 2 2 addFirst int data 头插法 2 3 addLast int data 尾插法 2 4 size 链表长度 2 5 display 打印链表
  • Centos6.4 用rpm方式安装MySql5.6

    1 查看系统是否安装了MySQL 使用命令 rpm qa grep mysql 2 卸载已安装的MySQL 卸载mysql命令如下 rpm e nodeps mysql libs 5 1 61 4 el6 x86 64 要将 var lib
  • sql局部变量和全局变量_有效使用SQL内置全局变量

    SQL内置全局变量是只读的 由IBM DB2 for i维护 并且是受信任且易于使用的资源 存在一些全局变量是为了与DB2系列兼容 并且包含在SYSIBM模式中 其他全局变量提供IBM i特定的值 并包含在QSYS2模式中 全局变量使应用程
  • 【实验二】【创建表并输入数据】

    文章目录 目的表 XSQK 学生情况 KC 课程 XS KC 学生 课程 T SQL创建表 1 新建查询 2 切换数据库 3 输入T SQL查询语句创建表 XSQK 学生情况 KC 课程 XS KC 学生 课程 4 执行命令 5 查看表 S
  • JVM笔记5:虚拟机栈

    目录 1 虚拟机主要特点 虚拟机栈出现的背景 初步印象 内存中的栈与堆 虚拟机栈基本内容 2 虚拟机栈的常见异常与如何设置栈大小 3 栈的存储结构和运行原理 栈中存储什么 栈运行原理 4 栈帧的内部结构 每个栈帧中存储着 5 局部变量表 6
  • 基于python的全球疫情数据分析及可视化系统

    源码获取 https www bilibili com video BV1Ne4y1g7dC 现如今 随着互联网的发展 人们获取信息的方式也各有不同 以前的传统方式的信息流与电视 报纸 书籍 信件 等等 因为互联网的使用 现在的互联网媒体已
  • C++ 判断文件是否被打开,防止重复打开

    如何判断文件是否已经被打开 在这里通过文件的一些属性实现判断文件是否被打开 通过QFile将文件尝试实现例如linux的move操作和rm r 的操作 就可以判断是否文件被占用 首先添加 include QFile 头文件 再设置全局的判断
  • 项目中:Json文件的读取

    项目中 Json文件的读取 读Json文件 取Json文件中内容 举例 举例 Json文件内容如下 Flickr8k images sentids 39300 39301 39302 39303 39304 imgid 7860 sente
  • C++11中 std::bind 的两种用法

    概述 std bind的头文件是
  • hadoop环境搭建之关闭防火墙和SELinux

    每一台服务器上都要做1 2 1 关闭防火墙 查看防火墙状态 systemctl status firewalld 关闭防火墙 systemctl disable firewalld systemctl stop firewalld 查看防火
  • iOS 获取系统键盘UIKeyboard方法

    公司项目需求 需要让弹窗显示在键盘所在的图层之上 而不是在弹窗出现的时候消失 如图1 系统弹窗出现的时候会使键盘暂时不显示 而这种效果显然不符合要求的 由于没想到更好的办法 只好从键盘自身的UIKeyboard做文章了 通过获取当前键盘的U
  • 【Java多线程批量数据导入的方法】

    前言 当遇到大量数据导入时 为了提高处理的速度 可以选择使用多线程来批量处理这些处理 常见的场景有 大文件导入数据库 这个文件不一定是标准的CSV可导入文件或者需要在内存中经过一定的处理 数据同步 从第三方接口拉取数据处理后写入自己的数据库