Java并发编程系列8 - 多线程实战

2023-11-06

主要基于小米最近的多线程项目,抽离出里面的多线程实例。

前言

Java多线程的学习,也有大半个月了,从开始学习Java多线程时,就给自己定了一个小目标,希望能写一个多线程的Demo,今天主要是兑现这个小目标。

这个多线程的示例,其实是结合最近小米的一个多线程异步任务的项目,我把里面涉及到多线程的代码抽离出来,然后进行一定的改造,之所以不自己重写一个,一方面是自己能力还不够,另一方面是想学习现在项目中多线程的实现姿势,至少这个示例是实际项目中应用的。先学习别人怎么造轮子,后面就知道自己怎么去造轮子了。

业务需求

做这个多线程异步任务,主要是因为我们有很多永动的异步任务,什么是永动呢?就是任务跑起来后,需要一直跑下去,比如消息Push任务,因为一直有消息过来,所以需要一直去消费DB中的未推送消息,就需要整一个Push的永动异步任务。

我们的需求其实不难,简单总结一下:

  1. 能同时执行多个永动的异步任务;
  2. 每个异步任务,支持开多个线程去消费这个任务的数据;
  3. 支持永动异步任务的优雅关闭,即关闭后,需要把所有的数据消费完毕后,再关闭。

完成上面的需求,需要注意几个点:

  1. 每个永动任务,可以开一个线程去执行;
  2. 每个子任务,因为需要支持并发,需要用线程池控制;
  3. 永动任务的关闭,需要通知子任务的并发线程,并支持永动任务和并发子任务的优雅关闭。

项目示例

  1. 线程池

对于子任务,需要支持并发,如果每个并发都开一个线程,用完就关闭,对资源消耗太大,所以引入线程池:

public class TaskProcessUtil {
    // 每个任务,都有自己单独的线程池
    private static Map<String, ExecutorService> executors = new ConcurrentHashMap<>();

    // 初始化一个线程池
    private static ExecutorService init(String poolName, int poolSize) {
        return new ThreadPoolExecutor(poolSize, poolSize,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(),
                new ThreadFactoryBuilder().setNameFormat("Pool-" + poolName).setDaemon(false).build(),
                new ThreadPoolExecutor.CallerRunsPolicy());
    }

    // 获取线程池
    public static ExecutorService getOrInitExecutors(String poolName,int poolSize) {
        ExecutorService executorService = executors.get(poolName);
        if (null == executorService) {
            synchronized (TaskProcessUtil.class) {
                executorService = executors.get(poolName);
                if (null == executorService) {
                    executorService = init(poolName, poolSize);
                    executors.put(poolName, executorService);
                }
            }
        }
        return executorService;
    }

    // 回收线程资源
    public static void releaseExecutors(String poolName) {
        ExecutorService executorService = executors.remove(poolName);
        if (executorService != null) {
            executorService.shutdown();
        }
    }
}

这是一个线程池的工具类,这里初始化线程池和回收线程资源很简单,我们主要讨论获取线程池。获取线程池可能会存在并发情况,所以需要加一个synchronized锁,然后锁住后,需要对executorService进行二次判空校验,这个和Java单例的实现很像,具体可参考《【设计模式系列5】单例模式》这篇文章。

单个任务

为了更好讲解单个任务的实现方式,我们的任务主要就是把Cat的数据打印出来,Cat定义如下:

@Data
@Service
public class Cat {
    private String catName;
    public Cat setCatName(String name) {
        this.catName = name;
        return this;
    }
}

单个任务主要包括以下功能:

  • 获取永动任务数据:这里一般都是扫描DB,我直接就简单用queryData()代替。
  • 多线程执行任务:需要把数据拆分成4份,然后分别由多线程并发执行,这里可以通过线程池支持;
  • 永动任务优雅停机:当外面通知任务需要停机,需要执行完剩余任务数据,并回收线程资源,退出任务;
  • 永动执行:如果未收到停机命令,任务需要一直执行下去。

直接看代码:

public class ChildTask {

    private final int POOL_SIZE = 3; // 线程池大小
    private final int SPLIT_SIZE = 4; // 数据拆分大小
    private String taskName;

    // 接收jvm关闭信号,实现优雅停机
    protected volatile boolean terminal = false;

    public ChildTask(String taskName) {
        this.taskName = taskName;
    }

    // 程序执行入口
    public void doExecute() {
        int i = 0;
        while(true) {
            System.out.println(taskName + ":Cycle-" + i + "-Begin");
            // 获取数据
            List<Cat> datas = queryData();
            // 处理数据
            taskExecute(datas);
            System.out.println(taskName + ":Cycle-" + i + "-End");
            if (terminal) {
                // 只有应用关闭,才会走到这里,用于实现优雅的下线
                break;
            }
            i++;
        }
        // 回收线程池资源
        TaskProcessUtil.releaseExecutors(taskName);
    }

    // 优雅停机
    public void terminal() {
        // 关机
        terminal = true;
        System.out.println(taskName + " shut down");
    }

    // 处理数据
    private void doProcessData(List<Cat> datas, CountDownLatch latch) {
        try {
            for (Cat cat : datas) {
                System.out.println(taskName + ":" + cat.toString() + ",ThreadName:" + Thread.currentThread().getName());
                Thread.sleep(1000L);
            }
        } catch (Exception e) {
            System.out.println(e.getStackTrace());
        } finally {
            if (latch != null) {
                latch.countDown();
            }
        }
    }

    // 处理单个任务数据
    private void taskExecute(List<Cat> sourceDatas) {
        if (CollectionUtils.isEmpty(sourceDatas)) {
            return;
        }
        // 将数据拆成4份
        List<List<Cat>> splitDatas = Lists.partition(sourceDatas, SPLIT_SIZE);
        final CountDownLatch latch = new CountDownLatch(splitDatas.size());

        // 并发处理拆分的数据,共用一个线程池
        for (final List<Cat> datas : splitDatas) {
            ExecutorService executorService = TaskProcessUtil.getOrInitExecutors(taskName, POOL_SIZE);
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    doProcessData(datas, latch);
                }
            });
        }

        try {
            latch.await();
        } catch (Exception e) {
            System.out.println(e.getStackTrace());
        }
    }

    // 获取永动任务数据
    private List<Cat> queryData() {
        List<Cat> datas = new ArrayList<>();
        for (int i = 0; i < 5; i ++) {
            datas.add(new Cat().setCatName("罗小黑" + i));
        }
        return datas;
    }
}

简单解释一下:

  • queryData:用于获取数据,实际应用中其实是需要把queryData定为抽象方法,然后由各个任务实现自己的方法。
  • doProcessData:数据处理逻辑,实际应用中其实是需要把doProcessData定为抽象方法,然后由各个任务实现自己的方法。
  • taskExecute:将数据拆分成4份,获取该任务的线程池,并交给线程池并发执行,然后通过latch.await()阻塞。当这4份数据都执行成功后,阻塞结束,该方法才返回。
  • terminal:仅用于接受停机命令,这里该变量定义为volatile,所以多线程内存可见,详见《【Java并发编程系列2】volatile》;
  • doExecute:程序执行入口,封装了每个任务执行的流程,当terminal=true时,先执行完任务数据,然后回收线程池,最后退出。

任务入口

直接上代码:

public class LoopTask {
    private List<ChildTask> childTasks;
    public void initLoopTask() {
        childTasks = new ArrayList();
        childTasks.add(new ChildTask("childTask1"));
        childTasks.add(new ChildTask("childTask2"));
        for (final ChildTask childTask : childTasks) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    childTask.doExecute();
                }
            }).start();
        }
    }
    public void shutdownLoopTask() {
        if (!CollectionUtils.isEmpty(childTasks)) {
            for (ChildTask childTask : childTasks) {
                childTask.terminal();
            }
        }
    }
    public static void main(String args[]) throws Exception{
        LoopTask loopTask = new LoopTask();
        loopTask.initLoopTask();
        Thread.sleep(5000L);
        loopTask.shutdownLoopTask();
    }
}

每个任务都开一个单独的Thread,这里我初始化了2个永动任务,分别为childTask1和childTask2,然后分别执行,后面Sleep了5秒后,再关闭任务,我们可以看看是否可以按照我们的预期优雅退出。

结果分析

执行结果如下:

childTask1:Cycle-0-Begin
childTask2:Cycle-0-Begin
childTask1:Cat(catName=罗小黑0),ThreadName:Pool-childTask1
childTask1:Cat(catName=罗小黑4),ThreadName:Pool-childTask1
childTask2:Cat(catName=罗小黑4),ThreadName:Pool-childTask2
childTask2:Cat(catName=罗小黑0),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑1),ThreadName:Pool-childTask1
childTask2:Cat(catName=罗小黑1),ThreadName:Pool-childTask2
childTask2:Cat(catName=罗小黑2),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑2),ThreadName:Pool-childTask1
childTask2:Cat(catName=罗小黑3),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑3),ThreadName:Pool-childTask1
childTask2:Cycle-0-End
childTask2:Cycle-1-Begin
childTask1:Cycle-0-End
childTask1:Cycle-1-Begin
childTask2:Cat(catName=罗小黑0),ThreadName:Pool-childTask2
childTask2:Cat(catName=罗小黑4),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑4),ThreadName:Pool-childTask1
childTask1:Cat(catName=罗小黑0),ThreadName:Pool-childTask1
childTask1 shut down
childTask2 shut down
childTask2:Cat(catName=罗小黑1),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑1),ThreadName:Pool-childTask1
childTask1:Cat(catName=罗小黑2),ThreadName:Pool-childTask1
childTask2:Cat(catName=罗小黑2),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑3),ThreadName:Pool-childTask1
childTask2:Cat(catName=罗小黑3),ThreadName:Pool-childTask2
childTask1:Cycle-1-End
childTask2:Cycle-1-End

输出数据中,“Pool-childTask”是线程池名称,“childTask”是任务名称,“Cat(catName=罗小黑)”是执行的结果,“childTask shut down”是关闭标记,“childTask:Cycle-X-Begin”和“childTask:Cycle-X-End”是每一轮循环的开始和结束标记。

我们分析一下执行结果:childTask1和childTask2分别执行,在第一轮循环中都正常输出了5条罗小黑数据,第二轮执行过程中,我启动了关闭指令,这次第二轮执行没有直接停止,而是先执行完任务中的数据,再执行退出,所以完全符合我们的优雅退出结论。

结语

这其实是一个比较经典的线程池使用示例,是我们公司的一位同事写的,感觉整个流程没有毛病,实现的也非常优雅,非常值得我学习的。

然后学习Java多线程的过程中,我感觉我目前的掌握速度还算是比较快的,从Java内存模型、到Java多线程的基本知识和常用工具,到最后的多线程实战,一共8篇文章,真的是可以让你从Java小白到能写出比较健壮的多线程程序。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java并发编程系列8 - 多线程实战 的相关文章

  • 使用 Jersey Client 忽略自签名 ssl 证书 [重复]

    这个问题在这里已经有答案了 我正在使用 Jersey 客户端库对 jboss 上运行的其余服务运行测试 我使用自签名证书在服务器上正确设置了 https 在本地主机上运行 但是 每当我使用 https url 运行测试时 都会收到以下错误
  • Java 多头中的斐波那契计算显示负值

    我的斐波那契计算器工作正常 但当数字增加时 结果会出现负值 就像它是一个Integer超过其最大值 它正在使用缓存java util Map
  • 如何安装 C++ 的 VOCE?

    我正在尝试安装 VOCE api 它是为 C 和 Java 构建的语音识别 API 这是我第二次使用外部 C 库 也是第一次使用 Java C api 语音链接 http voce sourceforge net http voce sou
  • Swagger 3.0.0:如果没有 SwaggerConfig 和 @Profile,则无法在生产中禁用

    我正在从 2 x 升级到 SpringFox Swagger 3 0 0 它引入了 Spring Boot 启动器springfox boot starter消除了对基于 2 x 的需要的依赖性SwaggerConfig NO LONGER
  • 是否可以使用检测重新定义核心 JDK 类?

    我想重新定义字节码StackOverflowError构造函数 因此当堆栈溢出发生时我有一个 钩子 我想要做的就是在构造函数的开头插入对我选择的静态方法的单个方法调用 是否有可能做到这一点 您应该能够使用两种方法之一来完成此操作 除非在过去
  • Java 的 System.arraycopy() 对于小数组有效吗?

    是Java的System arraycopy 对于小数组来说是高效的 或者它是本机方法这一事实是否使其可能比简单的循环和函数调用效率低得多 本机方法是否会因跨越某种 Java 系统桥梁而产生额外的性能开销 稍微扩展一下 Sid 所写的内容
  • Infinispan 复制缓存不复制对象以供读取

    我们正在尝试在 Openshift 内的 Wildfly 11 上运行的两个 infinispan 节点上安装复制缓存 当我们在一个节点上写入一个对象时 它不会显示在另一节点上进行读取 启动时 节点在集群中连接 并且可以看到彼此 如日志中所
  • 使用 Java 检索 Window 进程的 CPU 使用率

    我正在寻找一个 Java 解决方案来查找 Windows 中正在运行的进程的 CPU 使用情况 查了一下网上 关于Java解决方案的信息似乎很少 请记住 我并不是要查找 JVM 的 CPU 使用情况 而是要查找当时在 Windows 中运行
  • 如何使用 Gradle 2.10 将 ANTLR 词法分析器语法导入到另一个语法中?

    我一直在和 Terence Parr 一起学习 ANTLR 4权威的 ANTLR 4 参考 到目前为止我一直在使用 Gradle 2 10 及其内置 ANTLR 插件进行跟踪 然而 我在获取一些我从第 4 章第 38 41 页改编的代码以使
  • UnsupportedOperationException:特权进程中不允许使用 WebView

    我在用android sharedUserId android uid system 在我的清单中获得一些不可避免的权利 从 HDMI 输入读取安卓盒子 http eweat manufacturer globalsources com s
  • AIX:IBM Java:java.net.SocketException:连接超时:可能是由于地址无效

    当尝试与我们的服务器建立 SSL 连接时 我们在 IBM AIX 上经常看到以下异常 java net SocketException Socket closed at com sun net ssl internal ssl SSLSoc
  • Java中通过FTP创建文件夹层次结构

    Java 是否有现成的功能可以在远程 FTP 服务器上创建文件夹层次结构 Apache Commons 确实提供了 FTP 客户端 但我找不到创建目录层次结构的方法 它确实允许创建单个目录 makeDirectory 但创建整个路径似乎并不
  • Apache HttpClient 执行时会在所有 HTTP 5XX 错误上抛出 IOException 吗?

    The Apache HttpClient 文档 http hc apache org httpcomponents client ga httpclient apidocs org apache http client HttpClien
  • 当另一个线程发生事情时从主线程获取数据?

    目前我有一个线程正在运行一个侦听连接的套接字 当它收到连接时 它需要上传在主线程中收集的数据 即从主线程获取数据 但是 我传递了对象的实例 但它从未使用等待连接时收集的数据进行更新 有没有正确的方法来做到这一点 我用谷歌搜索了一下 似乎找不
  • 在Java程序中计算zip文件的md5哈希值

    我有一个 zip 文件 在我的 Java 代码中我想计算 zip 文件的 md5 哈希值 有没有我可以用于此目的的 java 库 一些例子将非常感激 谢谢 几周前我通过这篇文章做到了这一点 http www javalobby org ja
  • 我们可以用java定制一个垃圾收集器吗?

    我们知道java的垃圾收集器是一个低优先级线程 在java中我们可以创建任何具有高优先级的线程 那么是否有可能拥有我们自己定制的具有可变优先级的垃圾收集器线程 我们可以根据内存管理的级别进行设置 有人尝试过吗 如果是的话 您能分享一些关于如
  • Java 8 Stream - 为什么过滤器方法不执行? [复制]

    这个问题在这里已经有答案了 我正在学习使用java流进行过滤 但是过滤后的流没有打印任何内容 我认为过滤器方法没有被执行 我的过滤代码如下 Stream of d2 a2 b1 b3 c filter s gt s startsWith b
  • servlet 如何获取 servlet 之外的文件的绝对路径?

    我们一直在使用 System getProperties user dir 来获取属性文件的位置 现在它已经部署在 Tomcat 上 通过 servlet 系统调用将位置指定为 tomcat 而不是属性文件所在的位置 我们如何动态调用属性文
  • Admob - 没有广告可显示

    你好 我尝试制作一些在 Android 手机上显示广告的示例程序 并尝试在 v2 2 的模拟器上测试它 代码中的一切似乎都很好 但调试器中的 AdListener 表示 响应消息为零或空 onFailedToReceiveAd 没有广告可显
  • 在测试期间调用预定方法[重复]

    这个问题在这里已经有答案了 我正在使用 Maven 开发 SpringBoot 应用程序 我有一个班级 Component有方法的注释m与 Scheduled initialDelay 1000 fixedDelay 5000 注解 这里f

随机推荐

  • linux中如何安装windows

    首先将windows镜像拷贝到根目录的ios下 这个文件可自行选择位置 我这里使用的是windows10 并且已经提前安装了虚拟机组件 然后用root用户的身份在命令行输入 virt manager 点击file下面的小电脑 然后选择刚刚拷
  • FBXSDK踩坑之——搭建环境

    1 创建一个C 控制台影用程序 2 C 常规设置里设置附加包含目录 3 设置连接器常规设置的附加库目录 4 连接器的输入设置里设置附加依赖项和忽略特定默认库 5 根据前面选择的附加依赖项来选择运行库 选择参考 注意 如果使用动态链接要设置预
  • Spring Boot集成持久化Quartz定时任务管理和界面展示

    前言 本文是对之前的一篇文章Spring SpringMVC mybatis Quartz整合代码部分做的一个修改和补充 其中最大的变化就是后台框架变成了Spring Boot 本工程所用到的技术或工具有 Spring Boot Mybat
  • Virtual DOM(虚拟 DOM)

    文章目录 什么是 Virtual DOM 为什么要使用 Virtual DOM 虚拟 DOM 的作用 虚拟 DOM 库 Snabbdom 基本使用 安装 parcel 配置 scripts 目录结构 导入 Snabbdom 基本使用 包含子
  • 一文看尽深度学习中的各种数据增强

    目录 引言 数据增强的定义 数据增强的作用 省钱 省时 省心 提升模型性能 数据增强的方式 基础数据增强方法 Image Manipulation Rotation Translation Shearing Flipping Croppin
  • unity 游戏开发过程中需要注意和后期优化的重点方向和难点分析(大白话理论篇)

    开发游戏也有近3年时间了 自己是做app移动开发出身 可能入手unity开发手机游戏 算是有点小优势 毕竟起码开始就有点unity它是怎么一步步转化为一个用户用的app 在用unity做游戏的时候 自己也尝试过使用cocos2d x 感觉下
  • 使用 Docker 搭建你自己的 RSS 服务(FreshRSS)

    本文使用 署名 4 0 国际 CC BY 4 0 许可协议 欢迎转载 或重新修改使用 但需要注明来源 署名 4 0 国际 CC BY 4 0 本文作者 苏洋 创建时间 2019年01月05日 统计字数 1983字 阅读时间 4分钟阅读 本文
  • sys.path.insert()用法

    注 sys path模块是动态的修改系统路径 模块要处于Python搜索路径中的目录里才能被导入 但我们不喜欢维护一个永久性的大目录 因为其他所有的Python脚本和应用程序导入模块的时候性能都会被拖累 本节代码动态地在该路径中添加了一个
  • 循环动态渲染img标签

    img alt listInfo icon buKa png type 补卡 id 1 icon xiuJia png type 休假 id 2
  • VS工程的3个基本文件

    VS的工程根目录下有3个文件 这3个文件都可以用记事本打开 其中 1 MyTest sln 工程主文件 含属性设置 工程从该文件打开 工程的主要属性设置在该文件中 2 MyTest filters 工程目录结构筛选器 工程的目录结构保存在这
  • leetCode热题52-57 解题代码,调试代码和思路

    前言 本文属于特定的六道题目题解和调试代码 1 剑指 Offer 22 链表中倒数第k个节点 Easy 2022 09 01 91 2 76 最小覆盖子串 Hard 2023 03 27 82 3 165 比较版本号 Medium 2023
  • Me and My Girlfriend靶机渗透

    背景描述 这个VM告诉我们 有一对恋人 即Alice和Bob 这对夫妻本来很浪漫 但是自从Alice在一家私人公司 Ceban Corp 工作以来 爱丽丝对鲍勃的态度发生了一些变化是 隐藏的 而鲍勃 Bob 寻求您的帮助 以获取爱丽丝 Al
  • Python 第四章 字典:当索引不好用时

    字典 是Python中唯一内建的映射数据结构类型 这里的映射是指通过名字引用值 字典中的值并没有特殊的顺序 值都是存储在一个特定的键里 键可以是数字 字符串甚至是元祖 什么是键 什么是值 怎么映射呢 请继续看下去 创建和使用字典 下面来创建
  • ‘vue-cli-service‘ 不是内部或外部命令,也不是可运行的程序

    运行 npm run serve 报以下错误 首先检查一下文件中是否有node modules文件夹 如果没有则执行以下代码 npm install 其中可能会有很多警告 不用搭理 如果文件中已经有node modules文件夹 那么先把n
  • 接口自动化测试总结

    前言 本文是我在公司总结的一点点个人建议 可能有非常多的遗漏 先记录下来这时候我的理解 公司是做共享单车业务的 所以场景基本上也可以复用 毕竟大家都骑过单车 注明 code是我司接口返回的标志 编写之前 接口相关 这块总结不全 了解接口的功
  • 【Springboot WebSocket STOMP使用 2】STOMP使用@SendToUser实现用户个人请求-响应

    背景 STOMP协议的模型是 订阅 发布 式的 所以一般场景是同一个主题的广播 而有些场景下用户既想保持当前的WebSocket Session复用 又想消息的响应只有我自己能收到 指定其他某一个用户也行 不过得用其他api方法 不需要别人
  • 九月十月百度人搜,阿里巴巴,腾讯华为笔试面试八十题(第331-410题)

    九月十月百度人搜 阿里巴巴 腾讯华为小米搜狗笔试面试八十题 引言 自发表上一篇文章至今 事实上 上篇文章更新了近3个月之久 blog已经停了3个多月 而在那之前 自开博以来的21个月每月都不曾断过 正如上一篇文章支持向量机通俗导论 理解SV
  • 嵌入式Linux下使用OpenCV

    By Toradex 秦海 1 简介 OpenCV的全称是Open Source Computer Vision Library 是一个跨平台的计算机视觉库 OpenCV是由英特尔公司发起并参与开发 以BSD许可证授权发行 可以在商业和研究
  • 机器学习之实战matlab神经网络工具箱

    上节在 机器学习之从logistic到神经网络算法 中 我们已经从原理上介绍了神经网络算法的来源与构造 并编程实战了简单神经网络对于线性与非线性数据的分类测试实验 看过上节的可能会发现 上节实现的算法对于非线性数据的分类效果并不是非常完美
  • Java并发编程系列8 - 多线程实战

    主要基于小米最近的多线程项目 抽离出里面的多线程实例 前言 Java多线程的学习 也有大半个月了 从开始学习Java多线程时 就给自己定了一个小目标 希望能写一个多线程的Demo 今天主要是兑现这个小目标 这个多线程的示例 其实是结合最近小