SpringBoot中使用ThreadPoolExecutor和ThreadPoolTaskExecutor线程池的方法和区别

2023-11-17

        Java中经常用到多线程来处理业务。在多线程的使用中,非常的不建议使用单纯的Thread或者实现Runnable接口的方式来创建线程,因为这样的线程创建及销毁势必会造成耗费资源、线程上下文切换问题,同时创建过多的线程也可能会引发资源耗尽的风险,对线程的管理非常的不方便。因此在使用多线程的时候,日常开发中我们经常引入的是线程池,利用线程池十分方便的对线程任务进行管理。

        这里主要对线程池ThreadPoolExecutor和ThreadPoolTaskExecutor进行对比与使用见解。

一、ThreadPoolExecutor

该图是它的继承关系

 它的构造方法为

public ThreadPoolExecutor(int coreSize,
        int maxSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExectionHandler handler);

几个参数的含义分别是:

coreSize:核心线程数,也是线程池中常驻的线程数
maxSize:最大线程数,在核心线程数的基础上可能会额外增加一些非核心线程,需要注意的是只有当workQueue队列填满时才会创建多于核心线程数的线程
keepAliveTime:非核心线程的空闲时间超过keepAliveTime就会被自动终止回收掉
unit:keepAliveTime的时间单位
workQueue:用于保存任务的队列,可以为直接提交队列、无界任务队列、有界任务队列、优先任务队列类型之一,当池子里的工作线程数大于核心线程数时,这时新进来的任务会被放到队列中
threadFactory:执行程序创建新线程时使用的工厂
handler:线程池无法继续接收任务是的拒绝策略

workQueue任务队列

        workQueue任务队列可以为直接提交队列、无界任务队列、有界任务队列、优先任务队列类型之一,示例如下

例1:直接提交队列

SynchronousQueue它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作

        当创建的线程数大于最大线程数时,会直接执行设置好的拒绝策略

new ThreadPoolExecutor(1,
        2,
        1000,
        TimeUnit.MILLISECONDS,
        new SynchronousQueue<Runnable>(),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
    );

例2:有界的任务队列

ArrayBlockingQueue有界的任务队列。如果有新的任务需要执行时,线程池会创建新的线程,知道创建的线程数量达到核心线程数时,则会将新的任务加入到等待的队列中。如果等待的队列已满,则会继续创建线程,直到线程数量达到设定的最大线程数,如果创建的线程数大于了最大线程数,则执行拒绝策略。

new ThreadPoolExecutor(
        1, 
        2, 
        1000, 
        TimeUnit.MILLISECONDS, 
        new ArrayBlockingQueue<Runnable>(10), 
        Executors.defaultThreadFactory(), 
        new ThreadPoolExecutor.AbortPolicy()
    );

 例3:无界的任务队列

LinkedBlockingQueue无界的任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数就是设定的核心线程数量,也就是说在这种情况下,就算你设置了最大线程数也是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

new ThreadPoolExecutor(
        1, 
        2, 
        1000, 
        TimeUnit.MILLISECONDS, 
        new LinkedBlockingQueue<Runnable>(),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
    );

 例4:优先任务队列

        PriorityBlockingQueue优先任务队列,线程池的线程数一直为设定的核心线程数个,无论添加多少个任务,线程池创建的线程数也不会超过你设定的核心线程数,只不过PriorityBlockingQueue队列内的任务可以自定义队则根据任务的优先级顺序进行执行,不同于其它队列是按照先进先出的规则处理的

new ThreadPoolExecutor(1,
        2,
        1000,
        TimeUnit.MILLISECONDS,
        new PriorityBlockingQueue<Runnable>(),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
    );

线程池拒绝策略

AbortPolicy

       直接抛出异常阻止系统正常工作

CallerRunsPolicy

       只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务

DiscardOldestPolicy

       丢弃最老的一个请求,尝试再次提交当前任务

DiscardPolicy

       丢弃无法处理的任务,不给予任何处理

除上述拒绝策略外,可以实现RejectedExecutionHandler接口,自定义拒绝策略

new ThreadPoolExecutor(
                1,
                2,
                1000,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)                             
                    {
                        System.out.println(r.toString() + "执行了拒绝策略");
                    }
                });

ThreadPoolExecutor工作流程

        当一个新的任务提交给线程池时,线程池的处理步奏:

1、首先判断核心线程数是否已满,如果没满则调用一个线程处理Task任务,如果已满则执行步奏2;

2、这时会判断阻塞队列是否已满,如果阻塞队列没满,就将Task任务加入到阻塞队列中等待执行,如果阻塞队列已满,则执行步奏3;

3、判断是否大于最大线程数,如果小于最大线程数,则创建线程执行Task任务,如果大于最大线程数,则执行步骤4;

4、这时会使用淘汰策略来处理无法执行的Task任务

ThreadpoolExecutor线程池的使用

书写一个配置类,在配置类中定义一个bean,如下

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {

    @Bean
    public ThreadPoolExecutor asyncExecutor(){
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                10,
                20,
                1000,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        System.out.println("线程"+r.hashCode()+"创建");
                        //线程命名
                        Thread th = new Thread(r,"threadPool"+r.hashCode());
                        return th;
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        ){
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName());
                super.beforeExecute(t, r);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完毕:"+((ThreadTask)r).getTaskName());
                super.afterExecute(r, t);
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
                super.terminated();
            }
        };
        return executor;
    }

}

说明:

  1. beforeExecute:线程池中任务运行前执行
  2. afterExecute:线程池中任务运行完毕后执行
  3. terminated:线程池退出后执行

代码中的ThreadTask如下,此处可根据自己需求进行代码编写

public class ThreadTask implements Runnable {

    private String taskName;

    public String getTaskName() {
        return taskName;
    }

    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    public ThreadTask(String name) {
        this.setTaskName(name);
    }

    public void run() {
        //输出执行线程的名称
        System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
    }
}

二、ThreadPoolTaskExecutor

ThreadPoolTaskExecutor这个类是Spring-Context支持的一个,专门用于Spring环境的线程池。其底层是在ThreadPoolExecutor的基础上包装的一层,使得Spring的整合更加方便

继承关系如下

 其成员变量如ThreadPoolExecutor,有核心线程数、最大线程数、keepAliveTIme、超时时间单位、队列、线程创建工厂、拒绝策略

查看它的源码如下

 

 可以看出,它依赖的还是ThreadPoolExecutor,并且注意它直接设定了keepAliveTime的时间单位

它的队列、拒绝策略通ThreadPoolExecutor一致

ThreadPoolTaskExecutor的使用

书写一个配置类,在配置类中对线程池ThreadPoolTaskExecutor进行配置

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;


@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor asyncExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(10);
        //配置最大线程数
        executor.setMaxPoolSize(20);
        //配置队列大小
        executor.setQueueCapacity(100);
        //配置keepAliveTime
        executor.setKeepAliveSeconds(10);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");
        //拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }

}

配置类中的VisiableThreadPoolTaskExecutor()类扩展了ThreadPoolTaskExecutor,对线程执行前后各阶段做了补充操作,类似于上面ThreadPoolExecutor中的beforeExecute、afterExecute等操作,具体代码如下

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if (null == threadPoolExecutor) {
            return;
        }

        log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

三、线程池在接口中的具体使用

 上述描述中,最终书写了一个配置类,对线程池进行了配置,定义了一个bean对象,那么在具体接口中该怎么使用,如下所示

1、创建controller层,书写接口入口,调用server层代码

import com.smile.syncproject.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("async")
@Slf4j
public class AsyncController {

    @Autowired
    private AsyncService asyncService;

    @RequestMapping("test")
    public String test() {
        log.info("start submit");

        //调用service层的任务
        asyncService.executeAsync();

        log.info("end submit");

        return "success";
    }

}

 2、在service层实现层进行线程池的使用

        通过注解@Async

@Async("asyncServiceExecutor")

注解内的值就是上面定义好的配置类中的bean的名称。如果有多个线程池,就需要在定义不同bean的时候指定其name了

import com.smile.syncproject.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {

    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync() {
        log.info("start executeAsync");
        try{
            Thread.sleep(1000);
        }catch(Exception e){
            e.printStackTrace();
        }
        log.info("end executeAsync");
    }

}

四、其它

1、线程池大小的设置

        针对这个问题,我们首先要确认的是我们的需求是计算密集型还是IO密集型。

        如果是计算密集型,比较理想的方案是:线程数 = CPU核数 + 1,也可以设置成CPU核数*2,一般设置CPU*2

        如果是IO密集型,线程数 = CPU核心数/(1-阻塞系数),这个组赛系数一般为0.8~0.9之间,也可以取0.8或者0.9.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot中使用ThreadPoolExecutor和ThreadPoolTaskExecutor线程池的方法和区别 的相关文章

随机推荐

  • Python求1-100所有奇数和的方法!

    在之前的文章中 老男孩IT教育小编为大家介绍过Python的特点 优势 用途以及薪资待遇等知识 而为了帮助大家更好的掌握Python 小编将为大家讲解一些实战案例 比如 Python中如何求1 100的奇数和 接下来我们来看看吧 Pytho
  • Stable Diffusion安装教程、model导入教程以及精品promt指令

    文章目录 引言 原理 图片感知压缩 潜在扩散模型 安装 插件 插件与模型下载 常用promt关键字 交流讨论 引言 最近大火的AI作画吸引了很多人的目光 AI作画近期取得如此巨大进展的原因个人认为有很大的功劳归属于Stable Diffus
  • 读取sftp服务器上的文件内容到指定的数据库表内

    引入sftp jar依赖
  • 一些常用的公共js方法

    读者可能会觉得节流与防抖有点像 其实仔细斟酌就能发现他们的不同 节流是指对于连续触发的事件 每隔一段固定时间执行一次 只要事件持续出发就可以执行很多次 在节流里涉及的时间主要是指事件执行的间隔时间 防抖则是对连续触发的事件 只会执行一次 不
  • 从操作系统层面理解同步、异步、阻塞、非阻塞

    同步和异步描述调用者会不会主动等待函数的返回值 举个例子 public void method int result otherMethod 像上面这种形式就叫同步 result 会一直等待 otherMethod 方法执行完毕并拿到返回值
  • BMVC2022

    原文标题 Hierarchical Residual Learning Based Vector Quantized Variational Autoencoder for Image Reconstruction and Generati
  • 运行 Triton 示例

    安装 Triton Docker 镜像 在拉取镜像前 需要安装 Docker 和 NVIDIA Container Toolkit 用以下命令拉取镜像 docker pull nvcr io nvidia tritonserver
  • VTK编译方法

    VTK编译方法 VTK Group Imaging ON VTK Group MPI ON VTK Group QT ON VTK Group TK ON VTK Group Views ON VTK RENDERING BACKEND O
  • 使用layui/layuiAdmin的总结

    layui是一个前端UI框架 主要是配合JQuery使用 开始使用 首先是下载文件 然后引入css和js文件 引入之后就需要在
  • 以太坊开发入门,完整入门

    翻译自 https medium com mattcondon getting up to speed on ethereum 63ed28821bbe 从入门到精通 干货篇 必读 如果你 是一个专业的程序员 如果你想了解以太坊当前可以做到
  • QT QTabWidget 、布局控件 动态添加窗口(控件)、删除窗口(控件)方案

    new 一个窗口或者控件 QTabWidget addTab 将新建的控件放到一个容器中 比如 QMap
  • mybatis查询

    以后返回统一用对象 resultMap 查询 基本查询 select from person where person id id 条件查询 分页 select from cobra apply store yjs user id yjsU
  • 递归的一种应用

    有些问题 涉及两个对象 比如两个数 像个长度不同的数组 链表之类的 必须考虑是前者大还是后者大的情况 分别处理 其实可以只处理一种情况 比如前者小 后者大的情况 另一种情况 前者大后者小 可以通过交换参数 递归调用本函数来处理
  • ArrayList与顺序表

    文章目录 一 顺序表是什么 二 ArrayList是什么 三 ArrayList的构造方法 四 ArrayList的常见方法 4 1 add 4 2 size 4 3 remove 4 4 get 4 5 set 4 6 contains
  • kali linux中如何安装中文输入法

    前言 在使用kali linux中 我们可能用到中文输入法 那么我们该如何安装中文输入法呢 正文 一 首先 我们需要检查更新源是否可用 如果可用我们就进行第二步 如果不可用 我们则需要手动添加更新源 手动添加更新源 我们需要到网上找到最新的
  • 云原生安全性:构建可信任的云应用的最佳实践

    文章目录 云原生安全性的重要性 1 数据隐私 2 恶意攻击 3 合规性要求 4 业务连续性 构建可信任的云应用的最佳实践 1 安全开发 2 身份验证与授权 3 容器安全性 4 监控与审计 5 持续集成与持续交付 CI CD 6 安全培训和教
  • 制作树莓派img镜像文件

    想做个树莓派的img镜像 然而对SD卡进行全盘复制很浪费空间 且不能恢复到比现有SD卡容量小的卡上 因此探索制作小img的方法 网上看了大神制作的脚本 比如https github com conanwhf RaspberryPi scri
  • MySQL索引原理详解

    目录 一 数据结构 1 1 二叉树 为什么索引的数据结构不用二叉树 1 2 红黑树 自平衡二叉查找树 为什么索引的数据结构不用红黑树 1 3 B树 多路平衡搜索树 为什么索引的数据结构不用B树 1 4 B 树 1 5 MySQL B 树 1
  • QML的Label实现Tooltip提示效果

    在用QML进行界面设计时 往往需要用到Label 但是由于界面宽度的限制 Label会显示不全 需要进行Tooltip进行提示 而QML中的Label本身还不支持Tooltip的提示功能 所以给开发带来了一定的困难 那么 遇到这种问题 该怎
  • SpringBoot中使用ThreadPoolExecutor和ThreadPoolTaskExecutor线程池的方法和区别

    Java中经常用到多线程来处理业务 在多线程的使用中 非常的不建议使用单纯的Thread或者实现Runnable接口的方式来创建线程 因为这样的线程创建及销毁势必会造成耗费资源 线程上下文切换问题 同时创建过多的线程也可能会引发资源耗尽的风