【Java】你还在使用单线程处理大量数据么?

2023-11-12

业务场景

现有一个数据拼装入库的接口,总数据量大约几万条,之前使用单线程同步处理,需要处理几分钟,这接口速度在生产上是不允许的,针对这一问题,需要对此接口进行速度优化。

优化方案

使用多线程异步处理的方式,技能保证接口很快的响应,也能提高数据的拼装入库操作的效率。

多线程的实现–线程池

为什么要使用线程池

1.在Java中,如果每个请求到达就创建一个新线程,开销是相当大的。在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源要多的多。

2.除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个jvm里创建太多的线程,可能会是系统由于过度消耗内存或“切换过度”而导致系统资源不足。

3.为了防止资源不足,服务器应用程序需要采取一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁的线程的次数,特别是一个些资源耗费比较大但是线程的创建和销毁尽量利用已有对象来进行服务,这就是“池化资源”技术产生的原因。

线程池的创建

ThreadPoolExecutor是线程池的核心实现类,在JDK1.5引入,位于java.util.concurrent包

在这里插入图片描述

  • Executor线程池相关顶级接口,它将任务的提交与任务的执行分离开来
  • ExecutorService继承并扩展了Executor接口,提供了Runnable、FutureTask等主要线程实现接口扩展
  • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务

1、Spring配置类

@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
    @Bean("myExceutor")
    public Executor myfoExceutor(ThreadPoolConfigProperties pool) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(pool.getCoreSize());
        // 设置最大线程数
        executor.setMaxPoolSize(pool.getMaxSize());
        //配置队列大小
        executor.setQueueCapacity(pool.getQueueSize());
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(pool.getKeepAliveTime());
        // 设置默认线程名称
        executor.setThreadNamePrefix("pageInfoExceutor-");
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //执行初始化
        executor.initialize();
        return executor;
    }
}

2、手动创建

/**
corePoolSize:核心线程池的线程数量
maximumPoolSize:最大的线程池线程数量
keepAliveTime:线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。
unit:线程活动保持时间的单位。
workQueue:指定任务队列所使用的阻塞队列
threadFactory:线程工厂提供线程的创建方式,默认使用Executors.defaultThreadFactory()
handler:当线程池所处理的任务数超过其承载容量或关闭后继续有任务提交时,所调用的拒绝策略
**/
ExecutorService exec = new ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                          	  RejectedExecutionHandler handler);

Executors是java线程池的工厂类,通过它可以快速初始化一个符合业务需求的线程池,如Executors.newFixedThreadPool方法可以生成一个拥有固定线程数的线程池。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

其本质是初始化一个ThreadPoolExecutor对象。

提交任务

1、execute()

通过Executor.execute()方法提交的任务,必须实现Runnable接口,该方式提交的任务不能获取返回值,因此无法判断任务是否执行成功。

/**
* 源码 
**/
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         分 3 个步骤进行:
         1. 如果正在运行的线程少于 corePoolSize,请尝试使用给定命令作为其第一个任务来启动新线程。对 addWorker 的调用以原子方式检查 runState 和 workerCount,因此通过返回 false 来防止错误警报,这些警报会在不应该添加线程时添加线程。
         2. 如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为现有线程自上次检查以来就死了)或者池在进入此方法后关闭了。因此,我们重新检查状态,如有必要,如果停止,则回滚排队,如果没有,则启动一个新线程。
         3.如果我们不能排队任务,那么我们尝试添加一个新线程。如果失败了,我们知道我们已经关闭或饱和,因此拒绝这项任务。

         */
        int c = ctl.get();
    //判断工作线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            //执行addworker,创建一个核心线程,创建失败重新获取ctl
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    //如果工作线程数大于核心线程数,判断线程池的状态是否为running,并且可以添加进队列
//如果线程池不是running状态,则执行拒绝策略,(还是会调用一次addworker)
        if (isRunning(c) && workQueue.offer(command)) {
              //再次获取ctl,进行双重检索
            int recheck = ctl.get();
            //如果线程池是不是处于RUNNING的状态,那么就会将任务从队列中移除, 
        	//如果移除失败,则会判断工作线程是否为0 ,如果过为0 就创建一个非核心线程 
        	//如果移除成功,就执行拒绝策略,因为线程池已经不可用了;
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
      //线程池挂了或者大于最大线程数
        else if (!addWorker(command, false))
            reject(command);
    }
exec.execute(() -> gooodsService.copyData(a,b));

2、submit()

通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。

/**
提交可运行任务以执行,并返回表示该任务的未来。未来的方法get将在成功完成后返回null。
参数:任务 – 要提交的任务
返回:a 未来代表任务完成前
抛出:RejectedExecutionException – 如果无法安排任务执行
	 NullPointerException – 如果任务为空
**/
Future<?> submit(Runnable task);
FutureTask<T> ft = new FutureTask<T>(Callable<V> callable);
exec.submit(ft);

案例伪代码

/**
*拼装数据并插表(无返回值)
@param dataList 一个大量数据的集合
**/
public void copyData(List<po> dataList) {
    // 创建线程池
        ExecutorService exec = new ThreadPoolExecutor(8,20,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    //先对数据进行分割
	List<List<po>> divideList = CollectionDataUtils.divide(dataList, 500);
	for (List<po> list : divideList){
        //提交任务
		exec.execute(() ->  dataMapper.saveBatchData(list));
  }
    
}


/**
*拼装数据并插表(有返回值)
@param dataList 一个大量数据的集合
**/
public void copyData(List<po> dataList) {
   	//创建一个集合,来接收返回值
     List<CompletableFuture<ResultDTO>> returnList = new ArrayList<>();
    // 创建线程池
        ExecutorService exec = new ThreadPoolExecutor(8,20,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    //先对数据进行分割
	List<List<po>> divideList = CollectionDataUtils.divide(dataList, 500);
	for (List<po> list : divideList){
        //提交任务
		 CompletableFuture<ResultDTO> cf = CompletableFuture.supplyAsync(() -> {
                        try {
                            return dataMapper.saveBatchData(list);
                        } catch (Exception e) {
                            return null;
                        }
                    },exec).exceptionally( ex -> null);
        returnList.add(cf)
				
  }
    /*之后可以对拿到的返回值进行自己的业务操作....................*/
    
    //阻塞主线程-->等待子线程都执行完后执行,(这里我想等所有线程都执行完操作数据库后,删除redis锁)
    CompletableFuture.allOf(returnList.toArray(new CompletableFuture[returnList.size()])).join();
    redisTemplate.delete(key);
}

后续优化

经过多线程处理后,接口响应速度提升了10倍,并且可以做异步处理,接口先给出响应,数据在后台异步处理。

但是这也是处理几万的数据量的。虽然使用了多线程,但是依旧只是一个节点处理,依靠单个Jvm,如果数据量再大,且并发量提高,会造成OOM内存溢出。所以后续在不改变需求的情况下,使用分布式计算,依靠MQ任务分发,进行多节点处理的方式,性能和稳定性会大大提高。

引用

深入理解ThreadPoolExecutor中execute()方法原理_mr.zjx的博客-CSDN博客

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【Java】你还在使用单线程处理大量数据么? 的相关文章

  • Java 7 中的 Beans Binding 将被什么取代?

    我在某处读到 我忘记了链接 Beans Binding 将不会成为 Java 7 的一部分 有人知道什么会取代它吗 另外 当前版本的 Java 中是否有 Bean 绑定的替代方案 我建议JGoodies 绑定 https binding d
  • 在循环中使用 if 语句? - 加工

    假设我必须在 for 循环中使用 if 语句 并且 for 循环在特定条件下触发 而 if 语句仅在 for 循环达到特定阶段时触发 例如 条件是一个计数器 当发生特定事件 例如球从屏幕上掉下来 时 该计数器会进行计数 每次球穿过屏幕时 都
  • Java - 直观地拖动摆动元素

    有没有类似的解决方案http allen sauer com com allen sauer gwt dnd demo DragDropDemo DragDropDemo html PaletteExample http allen sau
  • “错误:无法找到或加载主类 org.apache.hadoop.util.RunJar”是什么意思?

    我正在尝试运行一个示例 因为它指出 Hadoop 实践 一书 http www manning com lam 第 15 页 这是需要运行的命令 bin hadoop jar hadoop examples jar 但我收到这个错误 Err
  • Java 应用程序可以检测到调试器已连接吗?

    我知道 jvm 启动选项可以让 jvm 等待附加调试器 这不是我在这里的意思 是否有可能从 Java 代码中也检测调试器的附件 以便我可以例如编写一个正在执行某些操作的 脚本 然后在某个时刻让我的应用程序等待调试器 不会 这些选项是 JVM
  • Java 正则表达式中 \w 和 \b 的 Unicode 等效项?

    许多现代正则表达式实现解释 w字符类简写为 任何字母 数字或连接标点符号 通常 下划线 这样 正则表达式就像 w 匹配像这样的词hello l ve GO 432 or gefr ig 不幸的是 Java 没有 在爪哇 w仅限于 A Za
  • 此代码签名证书对于签名小程序有效吗?

    我们购买了代码签名证书来签名小程序 但在签名小程序时出现以下错误 C CM WEB INF gt jarsigner keystore code signing keystore C CM SweetApplet jar code sign
  • Java SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'") 给出时区作为 IST

    我有 SimpleDateFormat 构造函数作为 SimpleDateFormat yyyy MM dd T HH mm ss Z 我正在解析字符串 2013 09 29T18 46 19Z 我读到这里 Z 代表GMT UTC时区 但是
  • Spring REST 控制器返回带有空数据的 JSON [关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 我有一个简单的 Spring Boot Web 应用程序 我正在尝试从服务器接收一些数据 控制器返回一个集合 但浏览器收
  • 返回 Consumer 表达式内的 Method 值

    我试图在方法中返回一个布尔值 并且我正在使用消费者函数 有什么方法可以直接在 Consumer 表达式中返回该值吗 这是代码 private static boolean uuidExists UUID uuid MySQL getResu
  • 在 Java/Android 中查找 UTF-8 字符串中的字符数

    我试图找出字符串以 UTF 8 存储时的长度 我尝试了以下方法 String str Charset UTF8 CHARSET Charset forName UTF 8 byte abc str getBytes UTF8 CHARSET
  • Preg_match PHP 到 java 的翻译

    我在将 php preg match 转换为 java 时遇到一些问题 我以为我的一切都是正确的 但它似乎不起作用 这是代码 原始PHP Pattern for 44 Character UUID pattern 0 9A F 44 if
  • 使 @Schedule 在集群环境中仅运行一次

    我有两个 tomee 实例集群 每个都有一个方法注释如下 Schedule dayOfWeek public void runMeDaily 我只想每天运行一次这个方法 每天不两次 每个实例一次 我可以使用此处描述的标志仅在一个WebLog
  • 从流中过滤/删除无效的 xml 字符

    首先 我无法更改 xml 的输出 它是由第三方生成的 他们在 xml 中插入无效字符 我得到了 xml 字节流表示形式的 InputStream 除了将流消耗到字符串中并对其进行处理之外 是否有一种更干净的方法来过滤掉有问题的字符 我找到了
  • JavaFX:在 WebView img 标签中未加载本地图像

    以下是我的代码 一切安好 我可以加载远程页面 我可以放置 HTML 内容 但我的img标签显示一个X标志表示无法加载图像 Note 我的图像与类位于同一个包中JavaFX在 Smiley 文件夹中 我可以列出所有图像 这意味着路径没有问题
  • 更新 Maven 项目模块中的父版本

    我有一个奇怪的场景 我有一个项目 Y 它有一个模块 X 和一些其他模块 X 是项目 Y 的一部分 但它不作为该项目的模块链接 因此 每次发布 Y 的新版本时 都需要有人手动更新 X 中的父版本 我需要以这样的方式更新 Y 项目 a 每次发布
  • 我们还需要迭代器设计模式吗? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 使用 Maven 将值附加到文件中

    我想在文件末尾附加一个值 但我无法确定要使用哪个插件 例子 我要附加的值 myValue file value1 value2 myValue 追加后 我知道我可以使用 antrun plugin 来做到这一点 但是可以使用 Maven 插
  • Jsplitpane 自动调整大小

    我有一个 JSPlitPane 它们之间有 50 的分隔线 这工作正常 但是 当我在右侧添加一些 JLabels 时 jsplitpane 会忽略我的 50 分隔符 左侧窗格会增加其大小 并会挤压右侧窗格 为什么会发生这种情况以及如何解决
  • 如何使用 Spring Security 手动注销用户?

    也许答案很简单 如何在 Spring Security 中手动注销当前登录的用户 拨打电话是否足够 SecurityContextHolder getContext getAuthentication setAuthenticated fa

随机推荐

  • Github下载任意版本的VsCode

    下载历史版本VsCode zip 下载链接由三部分组成 固定部分 commit id VSCode win32 x64 版本号 zip 固定部分 https vscode cdn azure cn stable Commit id 打开 v
  • 嵌入式linux通过systemd自启动一个python代码

    一直想实现一段自启动的代码 今天尝试了下 成功了 做个记录 首先 我用的是imx6ull处理器 嵌入式linux内核版本为4 9 88 然后 上位机用的虚拟机ubuntu22 04 01 先在ubuntu上面试了试 能够自启动 然后再下载到
  • linux系统调用(持续更新....)

    随着自己接触越来越多的linux的系统函数发现自己在linux系统调用方面有很多不足 每次遇到系统调用函数都要百度一遍看一下用法 所以我打算写一篇博客来记录在开发过程遇到的系统调用函数 方便自己查阅 本文持续更新 1 popen 函数 2
  • Unity-协同程序

    知识点一 Unity是否支持多线程 1 首先要明确一点Unity是支持多线程的 只是新开线程无法访问Unity相关对象的内容 Unity中的多线程 要记住关闭 Unity中去使用 如果说 我们一开始在Start内创建一个多线程 那么我们无法
  • 【算法】最近点对问题(暴力破解法)

    简单的画了一张图 通过暴力方式 进行一次比较获取两个点之间的最短距离 点对最近问题 暴力破解法 include
  • Maven Dependency设置,详解!

    come from http www javaeye com topic 240424 用了Maven 所需的JAR包就不能再像往常一样 自己找到并下载下来 用IDE导进去就完事了 Maven用了一个项目依赖 Dependency 的概念
  • 赶紧来修炼内功~字符串函数详解大全(二)

    目录 1 strncpy 重点 模拟实现 2 strncat 重点 模拟实现 3 strncmp 重点 模拟实现 写在最后 1 strncpy 该函数包含三个参数 前两个参数与上一篇文章中讲解的strcpy函数一样 一个目的地 一个源 第三
  • 【算法】分治、动态规划和贪心算法

    这三种算法非常相似 但是又有一些区别 理解如下 分治 把一个问题划分为若干子问题 求出子问题的最优解 再把子问题的最优解进行merge 最终得到原问题的最优解 动态规划 原问题的最优解包含子问题的最优解 即 拥有最优子结构 同时 求子问题的
  • React Router 从v3升级到v4的踩坑之旅

    React 应用很少不用react router这个包的 marknoteapp com之前一直用v3 看到v4出来后一直心痒 最近 抱着 用新不用旧 的理念 手贱升了一下级 这一升级 差不多2天功夫花掉啦 概述 和 Angular 那改朝
  • 通过java api提交自定义hadoop 作业

    通过API操作之前要先了解几个基本知识 一 hadoop的基本数据类型和java的基本数据类型是不一样的 但是都存在对应的关系 如下图 如果需要定义自己的数据类型 则必须实现Writable hadoop的数据类型可以通过get方法获得对应
  • 未来科技计算机作文600字,未来的科技作文600字

    未来的科技是什么样的呢 也许大家都很好奇 未来可能有像人一样的机器人 未来的汽车能在天上飞 电脑也会有思维会说话谈感情 现在让我们一起去畅想未来的世界吧 清晨 我正懒洋洋地睡在床上 蒙眬中听见一阵音乐把我给弄醒了 小主人 起床的时间到了 家
  • Elasticsearch 入门到精通-ES可视化查询工具kibana重启

    ps ef grep kibana ps ef grep 5601 都找不到 尝试 使用 fuser n tcp 5601 kill 9 端口 ps ef grep node 或 netstat anltp grep 5601 启动即可 k
  • LeetCode - 旋转数组类总结(二分法)

    搜索旋转排序数组 假设按照升序排序的数组在预先未知的某个点上进行了旋转 例如 数组 0 1 2 4 5 6 7 可能变为 4 5 6 7 0 1 2 搜索一个给定的目标值 如果数组中存在这个目标值 则返回它的索引 否则返回 1 你可以假设数
  • linux下解压命令大全

    tar 解包 tar xvf FileName tar打包 tar cvf FileName tar DirName 注 tar是打包 不是压缩 gz解压1 gunzip FileName gz解压2 gzip d FileName gz压
  • 前端中DOM是什么,怎样理解dom

    前端中DOM是什么 怎样理解 首先我们来说一下DOM是什么 文档对象模型 Document Object Model 简称DOM 我个人认为他就是将 通过浏览器的一些规则解析后 在渲染成我们能够看得见的页面 这整个过程就是DOM 它的过程分
  • vue2引入Element UI组件去创建新页面的详细步骤--项目阶段2

    目录 一 Element UI介绍 Element UI的特点 二 下载配置Element UI 零 创建vue项目 一 下载Element UI依赖 二 全局文件main js中引入Element UI 三 删除多余的东西 一 删除App
  • ctf.show

    web21 自定义迭代器 先抓个包 发现authorization 授权 后面的basic认证很奇怪 用base64解码看看就是输入的用户名和密码 返回包里出现乱码 但可以看到admin 猜测用户名就叫admin 知道了爆破登录的形式是xx
  • spring与mybatis集成

    Spring 集成 MyBatis 将 MyBatis与 Spring 进行整合 主要解决的问题就是将 SqlSessionFactory 对象交由 Spring来管理 所以 该整合 只需要将 SqlSessionFactory 的对象生成
  • Nodejs+Express中页面控制器及脚本自动加载设计

    Express自身带强大的路由功能 这让我们可以详细拆分项目的需求 设计出优美的restful风格对外API 为了方便实现上述功能 我加入了页面控制器及脚本自动加载设计 比如针对会员模块 我们在app js中指定了模块的路由文件 app u
  • 【Java】你还在使用单线程处理大量数据么?

    Java 结合实际业务场景 使用多线程异步处理大量数据 业务场景 优化方案 多线程的实现 线程池 为什么要使用线程池 线程池的创建 1 Spring配置类 2 手动创建 提交任务 1 execute 2 submit 案例伪代码 后续优化