线程池为什么能维持线程不释放,随时运行各种任务?

2023-10-27

版权声明:本文为博主原创文章,未经博主允许不得转载。技术交流可邮:cjh94520@outlook.com https://blog.csdn.net/cjh94520/article/details/70545202

线程池

之前一直有这个疑问:我们平时使用线程都是各种new Thread(),然后直接在run()方法里面执行我们要做的各种操作,使用完后需要做什么管理吗?线程池为什么能维持住核心线程不释放,一直接收任务进行处理呢?

线程

线程无他,主要有两个方法,我们先看看start()方法介绍:

    /**
     * Causes this thread to begin execution; the Java Virtual Machine
     * calls the <code>run</code> method of this thread.
     * <p>
     * The result is that two threads are running concurrently: the
     * current thread (which returns from the call to the
     * <code>start</code> method) and the other thread (which executes its
     * <code>run</code> method).
     * <p>
     * It is never legal to start a thread more than once.
     * In particular, a thread may not be restarted once it has completed
     * execution.
     *
     * @exception  IllegalThreadStateException  if the thread was already
     *               started.
     * @see        #run()
     * @see        #stop()
     */

    public synchronized void start() {
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);

        started = false;
        try {
            nativeCreate(this, stackSize, daemon);
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }
  • 从这个方法解释上看,start()这个方法,最终会交给VM 去执行run()方法,所以一般情况下,我们在随便一个线程上执行start(),里面的run()操作都会交给VM 去执行。
  • 而且还说明,重复启用线程是不合法的,当一个线程完成的时候,may not be restarted once。

    那么这种情况下,线程池是怎么做的?他为什么就能够重复执行各种任务呢?


带着各种疑问,我们去看看线程池自己是怎么实现的。

线程池

线程池常用的创建方法有那么几种: 
1. newFixedThreadPool() 
2. newSingleThreadExecutor() 
3. newCachedThreadPool() 
4. newScheduledThreadPool()

这4个方法创建的线程池实例具体就不一一介绍,无非是创建线程的多少,以及回收等问题,因为其实这4个方法最后都会调用统一的构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

具体来说只是这几个值的不同决定了4个线程池的作用: 
1. corePoolSize 代表核心线程池的个数,当线程池当前的个数大于核心线程池的时候,线程池会回收多出来的线程 
2. maximumPoolSize 代表最大的线程池个数,当线程池需要执行的任务大于核心线程池的时候,会创建更多的线程,但是最大不能超过这个数 
3. keepAliveTime 代表空余的线程存活的时间,当多余的线程完成任务的时候,需要多长时间进行回收,时间单位是unit 去控制 
4. workQueue 非常重要,这个工作队列会存放所有待执行的Runnable对象

 @param workQueue the queue to use for holding tasks before they areexecuted. 
 This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method.

我们平时在使用线程池的时候,都是直接 实例.execute(Runnable),一起跟进去,看看这个方法具体做了什么

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */

        //结合上文的注释,我们得知,第一次,先判断当前的核心线程数,
        //如果小于初始化的值,马上创建;然后第二个if,将这个任务插入到工作线程,双重判断任务,
        //假定如果前面不能直接加入到线程池Worker集合里,则加入到workQueue队列等待执行。
        //里面的if else判断语句则是检查当前线程池的状态。如果线程池本身的状态是要关闭并清理了,
        //我们则不能提交线程进去了。这里我们就要reject他们。
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

所以其实主要起作用的还是addWorker()方法,我们继续跟踪进去:

private boolean addWorker(Runnable firstTask, boolean core) {
        ···多余代码


        try {
            w = new Worker(firstTask);  1.重点
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();   2. 重点
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

我们看重点部分,其实最重要的是firstTask这个Runnable,我们一直跟踪这个对象就可以了,这个对象会new Worker(),那么这个wroker()就是一个包装类,里面带着我们实际需要执行的任务,后面进行一系列的判断就会执行t.start(); 这个t 就是包装类worker类里面的Thread,所以整个逻辑又转化进去Worker内部。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

        ...省略代码
    }
  • 这个Worker包装类,重要的属性两个,thread 就是刚才上面那个方法执行的start()对象,这个thread又是把这个worker对象本身作为一个Runnable对象构建出来的,那么当我们调用thread.start()方法时候,实际调用的就是Worker类的run()方法。现在又要追踪进去,看这个runWorker(this),做的是什么鬼东西
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这个方法还是比较好懂的: 
1. 一个大循环,判断条件是task != null || (task = getTask()) != null,task自然就是我们要执行的任务了,当task空而且getTask()取不到任务的时候,这个while()就会结束,循环体里面进行的就是task.run(); 
2.这里我们其实可以打个心眼,那基本八九不离十了,肯定是这个循环一直没有退出,所以才能维持着这一个线程不断运行,当有外部任务进来的时候,循环体就能getTask()并且执行。 
3.下面最后放getTask()里面的代码,验证猜想

   private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }


            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  • 真相大白了,里面进行的也是一个死循环,主要看 Runnable r = timed ? 
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 
    workQueue.take();

  • 工作队列workQueue会一直去拿任务,属于核心线程的会一直卡在 workQueue.take()方法,直到拿到Runnable 然后返回,非核心线程会 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null,Worker对象的run()方法循环体的判断为null,任务结束,然后线程被系统回收

总结

一句话可以概述了,线程池就是用一堆包装住Thread的Wroker类的集合,在里面有条件的进行着死循环,从而可以不断接受任务来进行。

 

 

https://blog.csdn.net/cjh94520/article/details/70545202

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

线程池为什么能维持线程不释放,随时运行各种任务? 的相关文章

  • 如何将 ThreadPool.QueueUserWorkItem 与非静态方法一起使用?

    当我尝试编译时它给了我 错误 1 非静态字段 方法或属性 ConsoleApplication1 Program print string 需要对象引用 ConsoleApplication1 ConsoleApplication1 Pro
  • ExecutorService 的多个 newSingleThreadExecutor 与 newFixedThreadPool

    在我的应用程序中 我有 4 个不同的进程 它们永久运行 但有一些小暂停 当前版本的代码在单独的老式线程中执行每个进程 Thread nlpAnalyzer new Thread gt infine lop for auto restore
  • IIS 工作线程与 Web 应用程序线程

    我正在维护一个ASP NET Core需要重复运行一些后台线程的 Web 应用程序 我知道这不是一个好的设计 但目前我必须以最小的努力解决它的主要问题 现在我想知道我是否应该担心网络服务器处理用户http请求 问题很简单 但我找不到任何明确
  • corePoolSize 0 的 ThreadPoolExecutor 在任务队列已满之前不应执行任务

    我正在经历Java 并发实践并被困在8 3 1 线程的创建和销毁话题 以下脚注警告不要保留corePoolSize为零 开发人员有时会试图将核心大小设置为零 以便工作线程 最终会被拆除 因此不会阻止 JVM 退出 但这可能会导致一些问题 不
  • MsgWaitForMultipleObjects 的 C# 等效项是什么?

    我有一个带有报表模式 ListView 的 Windows 窗体 对于视图中的每个项目 我需要执行长时间运行的操作 其结果是一个数字 我在本机 win32 中执行此操作的方法是为每个项目创建一个工作线程 天真地 当然我不会创建无限数量的线程
  • C++ 阻塞工作队列

    这个问题应该比我上几个简单一点 我在我的程序中实现了以下工作队列 Pool h tpool class It s always closed glasses ifndef POOL H define POOL H class tpool p
  • 如何让主线程等待ThreadPoolExecutor中其他线程完成

    我正在使用 ThreadPoolExecutor 在我的 Java 应用程序中实现线程 我有一个 XML 我需要解析它并将其每个节点添加到线程中以执行完成 我的实现是这样的 parse ip 是创建的线程池对象 ParseQuotes XM
  • Spring 的 ThreadPoolTask​​Executor 的默认队列大小是多少?

    我正在使用 Spring 4 3 8 RELEASE 和 Java 7 我想创建一个线程池来执行任务 所以我在 Spring contxet 中设置了以下内容
  • Java:ExecutorService 在达到一定队列大小后阻塞提交[重复]

    这个问题在这里已经有答案了 我正在尝试编写一个解决方案 其中单个线程生成可以并行执行的 I O 密集型任务 每个任务都有重要的内存数据 所以我希望能够限制当前待处理的任务数量 如果我像这样创建 ThreadPoolExecutor Thre
  • Tomcat 活动线程堆积并导致服务器停顿

    我们的生产服务器 apache tomcat 7 0 6 遇到了这个问题 该服务器正在运行使用 MySQL 作为数据库的 Spring JPA Hibernate 应用程序 在此问题期间 服务器变得缓慢 在一分钟内变得无响应 活动 tomc
  • 使用多处理来查找网络路径

    我目前正在使用 networkx 函数 all simple paths 来查找网络 G 中给定的一组源节点和目标节点的所有路径 在更大 更密集的网络上 这个过程非常密集 我想知道是否可以使用多处理来解决这个问题 以及是否有人对如何通过创建
  • 停止多线程 Windows 服务

    我在 Net 3 5 中有一个多线程 Windows 服务 当创建多个线程时 我在正确停止该服务时遇到了一些麻烦 该服务过去只创建一个线程来完成所有工作 我只是将其更改为多线程 它工作得很好 但是当服务停止时 如果有多个线程正在执行 它将挂
  • 带有 ArrayBlockingQueue 的 ThreadPoolExecutor

    当我在我的一个项目中使用 ThreadPoolExecutor 时 我开始从 Java Doc 中阅读更多有关 ThreadPoolExecutor 的内容 那么谁能解释一下这行代码实际上意味着什么 我知道每个参数代表什么 但我想从这里的一
  • 使用 TPL 时如何管理线程本地存储 (TLS)?

    我想将日志记录上下文信息存储在 TLS 中 以便可以在入口点设置一个值 并使该值在所有结果堆栈中可用 这工作得很好 但我还使用了 TPL 和 ThreadPool 那么问题就变成了如何将 TLS 数据迁移到其他线程 我可以自己完成这一切 但
  • 分析 Windbg 中 !threadpool 和 !threads 的输出

    我已经在四台服务器上生成了转储 并正在分析 threadpool 和 threads 的输出 我注意到以下输出大致一致 0 024 gt threadpool CPU utilization 0 Worker Thread Total 2
  • 如何使用java.util.concurrent包实现后台线程?

    这是我首先使用的代码 但在最新的 Android 版本中AsyncTask类已被弃用并且 因此它没有响应 然后我使用了Thread类 但该类也不起作用 我想要与我得到的结果相同的结果AsyncTask班级 我知道我必须使用 java uti
  • ThreadPool.SetMinThreads 不创建任何新线程

    我想弄清楚到底有什么影响ThreadPool SetMinThreads makes 根据官方文档 https learn microsoft com en us dotnet api system threading threadpool
  • 报告线程进度的最佳方式

    我有一个程序 它使用线程顺序执行耗时的进程 我希望能够监视每个线程的进度 类似于BackgroundWorker ReportProgress ProgressChanged模型确实如此 我不能使用ThreadPool or Backgro
  • C#中如何获取正在运行的线程列表?

    我在 C 中创建动态线程 并且需要获取这些正在运行的线程的状态 List
  • 为什么 Ruby 没有内置线程池?

    我有一个程序 它一次创建 10000 个线程 并同时运行 8 个线程 但是Ruby没有像Java那样内置ThreadPool 有充分的理由吗 可能是因为使用标准库 Queue 类很容易推出您自己的 q Queue new 3 times T

随机推荐

  • 第十四届蓝桥杯大赛软件赛省赛(C/C++ 大学B组)

    目录 试题 A 日期统计 1 题目描述 2 解题思路 3 模板代码 试题 B 01 串的熵 1 题目描述 2 解题思路 3 模板代码 试题 C 冶炼金属 1 题目描述 2 解题思路 3 模板代码 试题 D 飞机降落 1 题目描述 2 解题思
  • No implementation found for void com.wust.testjni10.CallJava.callPrintString()

    问题描述 当你看到这篇文章的时候 说明你会jni了 并且还在调用 so 库 可问题就出在调用的时候报了这么个错 No implementation found for void com wust testjni10 CallJava cal
  • uniapp-提现功能(demo)

    页面布局 提现页面 有一个输入框 一个提现按钮 一段提现全部的文字 首先用v model 和data内的数据双向绑定 输入框逻辑分析 输入框的逻辑 为了符合日常输出 所以要对输入框加一些条件限制 因为是提现 所以对输入的字符做筛选 只允许出
  • 由于某种原因,PowerPoint 无法加载MathType..... (亲测有效)

    网上找了较多的参考解决办法 最后发现如下博主提供的方法快捷有效 https blog csdn net dss875914213 article details 85873938 问题 PPT打开时弹出由于某种原因powerpoint无法加
  • 关于python

    1 关于python Python由荷兰数学和计算机科学研究学会的Guido van Rossum 于1990 年代初设计 作为一门叫做ABC语言的替代品 Python提供了高效的高级数据结构 还能简单有效地面向对象编程 Python语法和
  • Viva Workplace Analytics & Employee Feedback SU Viva Glint部署方案

    目录 一 Viva Workplace Analytics Employee Feedback SU Viva Glint介绍 二 Viva Glint和Viva Pulse特点和优势 1 简单易用
  • SLAM精度评定工具——EVO使用方法详解

    系统版本 Ubuntu20 04 ROS版本 Noetic EVO是用于处理 评估和比较里程计和SLAM算法的轨迹输出的工具 注意 本文的评测是在kitti数据集下进行评测 其他的数据集也支持评测 安装EVO 可以执行下面这条命令 pip
  • Pytorch 中如何对训练数据进行增强处理?

    假设我们的数据集是一个手写数字的图像数据集 其中每一张图像包含一个手写数字和对应的标签 我们可以通过随机旋转 平移 缩放和翻转等操作 对原始的图像进行变换增广 Data Augmentation 以增强模型的训练效果 举个例子 我们可以通过
  • JavaScript 实现数组中的字符串按长度排序,长度一样按字母顺序排序

    以下的newar数组里的val键值排序要求 字串按长度排序 长度一样按字母顺序排序 js实现数组中的字符串按长度排序 长度一样按字母顺序排序 function sortByLenByazAZVal array array sort a b
  • MYSQL之ON DUPLICATE KEY UPDATE使用

    创建表 DROP TABLE IF EXISTS user CREATE TABLE user id int 32 NOT NULL AUTO INCREMENT COMMENT 主键id userName varchar 32 NOT N
  • 【单片机毕业设计】【mcuclub-dz-055】基于单片机的智能手环控制系统设计

    最近设计了一个项目基于单片机的智能智能手环控制系统设计 与大家分享一下 一 基本介绍 项目名 智能手环 项目编号 mcuclub dz 055 单片机类型 STM32F103C8T6 具体功能 1 通过MAX30102测量心率 血氧 2 通
  • PFQ,适用于多核处理器系统中的网络监控框架

    PFQ 是一个支持多语言的网络框架 主要用于 Linux 操作系统下进行高效的包捕获和传输 适用于多核处理器系统中的网络监控框架 PFQ 专门为多核处理器而优化 包括对多个硬件队列的网络设备优化 支持任意网络设备驱动 并提供一个脚本用来加速
  • 动态内存与静态内存的区别

    1 静态内存 静态内存是指在程序开始运行时由编译器分配的内存 它的分配是在程序开始编译时完成的 不占用CPU资源 程序中的各种变量 在编译时系统已经为其分配了所需的内存空间 当该变量在作用域内使用完毕时 系统会 自动释放所占用的内存空间 变
  • Dropout层的个人理解和具体使用

    一 Dropout层的作用 dropout 能够避免过拟合 我们往往会在全连接层这类参数比较多的层中使用dropout 在训练包含dropout层的神经网络中 每个批次的训练数据都是随机选择 实质是训练了多个子神经网络 因为在不同的子网络中
  • 解决Google CoLab使用外部文件的问题

    有时 用Google CoLab时需要用到外部的数据集 这时 可以通过代码把文件上传到CoLab上 代码如下 from google colab import files uploaded files upload for fn in up
  • 2年python从业者整理的学习经验,手把手教你如何系统有效学习python

    我是从19年开始学python的 学了近半年 从业2年时间 看网课自己练 摸爬滚打后终于找到了一套不错的学习方法 以我个人经验给零基础入门的朋友讲下应该怎么系统有效学习python 首先 python不是洪湖猛兽 普通人只要认真学肯定是能学
  • python 尝试有道翻译

    一只小白的爬虫 写了一个简单 有道翻译 记录一下 如果大家有更好的方式 方法记得分享一下哦 coding utf 8 import urllib urllib2 json url http fanyi youdao com translat
  • iview引用自定义的图标

    iview引用自定义的图标 护花使者 博客园
  • C++图书管理系统(基于结构体数组)

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 需求分析 二 代码实现 总结 前言 问题描述 要求以图书馆管理业务为背景 设计并实现一个 图书馆管理信息系统 软件 使用该系统可以方便查询图书的信息 借阅
  • 线程池为什么能维持线程不释放,随时运行各种任务?

    版权声明 本文为博主原创文章 未经博主允许不得转载 技术交流可邮 cjh94520 outlook com https blog csdn net cjh94520 article details 70545202 线程池 之前一直有这个疑