多线程处理并有序整合数据方案

2023-11-12

方案设想

  • 多线程异步“并行”处理待处理数据【for+线程池单例创实例和回收】
    • 防止处理过程中线程数过大,内存溢出,导致处理失败,例如持续for中new Thread
  • 保证并行的线程处理个数【CountDownLatch】
    • 防止线程池未全部结束就开始进行处理,这里设置线程都返回出具再进行处理
  • 异步处理线程安全key-Object数据结构【ConcurrentHashMap】
    • 防止处理过程中漏处理,例如HashMap实测线程不安全,处理数据会<=实际数据量
  • 业务耗时模拟【Thread.sleep】
    • 模拟业务处理耗时,注意单独异常捕获,防止造成中断退出

样例

  • 线程池

    import com.peng.common.util.LogUtils;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.Future;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class BusinessThreadPool {
    
        private ThreadPoolExecutor threadPool;
        private volatile static BusinessThreadPool instance = null;
        private int corePoolSize = 100;
        private int maximumPoolSize = 200;
        private long keepAliveTime = 500L;
    
        private BusinessThreadPool() {
            initPool();
        }
    
        public static BusinessThreadPool getInstance() {
            if (null == instance) {
                synchronized (BusinessThreadPool.class) {
                    if (null == instance) {
                        instance = new BusinessThreadPool();
                    }
                }
            }
            return instance;
        }
    
        private void initPool() {
            try {
                this.threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
                        keepAliveTime, TimeUnit.SECONDS,
                        new ArrayBlockingQueue<Runnable>(maximumPoolSize),
                        new ThreadPoolExecutor.CallerRunsPolicy());
    
                Runtime.getRuntime().addShutdownHook(new Thread() {
                    @Override
                    public void run() {
                        threadPool.shutdown();
                    }
                });
            } catch (Exception e) {
                LogUtils.root.error("Exception:{}", e);
            }
        }
    
        public void execute(Runnable thread) {
            this.threadPool.execute(thread);
        }
    
        public Future<?> submit(Runnable thread) {
            return this.threadPool.submit(thread);
        }
    
        public int getActivityNum() {
            return this.threadPool.getActiveCount();
        }
    
    }
    
  • 业务处理

    import com.alibaba.fastjson.JSON;
    import lombok.SneakyThrows;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CountDownLatch;
    
    public class MoreThreadDealTest {
        public static void main(String[] args) {
            //模拟厂商并发处理
            for (int i = 1; i <= 4; i++) {
                int fiI = i;
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        deal(fiI + "处理===");
                    }
                }).start();
            }
    
        }
    
        public static void deal(String logHead) {
            final Map<Integer, String> resMap = new ConcurrentHashMap<>();//线程安全Map
            List<String> resList = new ArrayList<>();
            List<String> datas = new ArrayList<>();
            //模拟数据
            for (int q = 1; q <= 100; q++) {
                datas.add("" + q);
            }
            final CountDownLatch latch1 = new CountDownLatch(datas.size());
            long startTime = System.currentTimeMillis();
            try {
                for (int i = 0; i < datas.size(); i++) {
                    int finalI = i;
                    Runnable runnable = new Runnable() {
                        @SneakyThrows
                        @Override
                        public void run() {
                            try {
                                resMap.put(finalI, datas.get(finalI) + "处理了");
                                try {
                                    Thread.sleep(3000);//业务耗时模拟
                                } catch (Exception e) {
                                    System.out.println(logHead + "sleep异常" + e.getMessage());
                                }
                                System.out.println(logHead + "当前池线程个数==" + BusinessThreadPool.getInstance().getActivityNum());
                            } catch (Throwable e) {
                                System.out.println(logHead + "deal异常" + e.getMessage());
                            } finally {
                                latch1.countDown();
                            }
                        }
                    };
                    BusinessThreadPool.getInstance().execute(runnable);
                }
                System.out.println();
                latch1.await();
            } catch (Exception e) {
                System.out.print(logHead + "外层try异常了");
            } finally {
            }
            long endTime = System.currentTimeMillis();
            System.out.println();
            System.out.println(logHead + "---多线程异步处理耗时:" + (endTime - startTime));
    
            int size = resMap.size();
            for (int j = 0; j < size; j++) {
                resList.add(resMap.get(j));
            }
            System.out.println();
            System.out.println(logHead + "【处理后数据结构】" + JSON.toJSONString(resList));
            System.out.println();
            System.out.println(logHead + "===resList结果大小" + resList.size());
            long endTime2 = System.currentTimeMillis();
            System.out.println();
            System.out.println(logHead + "===总处理耗时:" + (endTime2 - startTime));
        }
    }
    

日志实例

  • 日志(部分)

    2处理===当前池线程个数==200
    2处理===当前池线程个数==200
    2处理===当前池线程个数==200
    2处理===当前池线程个数==200
    2处理===当前池线程个数==200
    2处理===当前池线程个数==200
    
    3处理===当前池线程个数==200
    
    4处理===当前池线程个数==139
    4处理===当前池线程个数==138
    4处理===当前池线程个数==137
    4处理===当前池线程个数==136
    4处理===当前池线程个数==135
    
    4处理===---多线程异步处理耗时:6053
    
    1处理===当前池线程个数==135
    1处理===当前池线程个数==134
    1处理===当前池线程个数==133
    1处理===当前池线程个数==32
    
    1处理===---多线程异步处理耗时:6069
    
    2处理===当前池线程个数==31
    2处理===当前池线程个数==30
    2处理===当前池线程个数==29
    2处理===当前池线程个数==28
    2处理===当前池线程个数==27
    2处理===当前池线程个数==26
    2处理===当前池线程个数==14
    2处理===当前池线程个数==13
    2处理===当前池线程个数==12
    2处理===当前池线程个数==11
    2处理===当前池线程个数==10
    2处理===当前池线程个数==9
    3处理===当前池线程个数==8
    
    3处理===---多线程异步处理耗时:6080
    
    2处理===当前池线程个数==7
    2处理===当前池线程个数==6
    2处理===当前池线程个数==5
    2处理===当前池线程个数==4
    2处理===当前池线程个数==3
    2处理===当前池线程个数==2
    2处理===当前池线程个数==1
    
    2处理===---多线程异步处理耗时:6080
    
    2处理===【处理后数据结构】["1处理了","2处理了","3处理了","4处理了","5处理了","6处理了","7处理了","8处理了","9处理了","10处理了","11处理了","12处理了","13处理了","14处理了","15处理了","16处理了","17处理了","18处理了","19处理了","20处理了","21处理了","22处理了","23处理了","24处理了","25处理了","26处理了","27处理了","28处理了","29处理了","30处理了","31处理了","32处理了","33处理了","34处理了","35处理了","36处理了","37处理了","38处理了","39处理了","40处理了","41处理了","42处理了","43处理了","44处理了","45处理了","46处理了","47处理了","48处理了","49处理了","50处理了","51处理了","52处理了","53处理了","54处理了","55处理了","56处理了","57处理了","58处理了","59处理了","60处理了","61处理了","62处理了","63处理了","64处理了","65处理了","66处理了","67处理了","68处理了","69处理了","70处理了","71处理了","72处理了","73处理了","74处理了","75处理了","76处理了","77处理了","78处理了","79处理了","80处理了","81处理了","82处理了","83处理了","84处理了","85处理了","86处理了","87处理了","88处理了","89处理了","90处理了","91处理了","92处理了","93处理了","94处理了","95处理了","96处理了","97处理了","98处理了","99处理了","100处理了"]
    
    2处理======resList结果大小100
    
    2处理======总处理耗时:6144
    3处理===【处理后数据结构】["1处理了","2处理了","3处理了","4处理了","5处理了","6处理了","7处理了","8处理了","9处理了","10处理了","11处理了","12处理了","13处理了","14处理了","15处理了","16处理了","17处理了","18处理了","19处理了","20处理了","21处理了","22处理了","23处理了","24处理了","25处理了","26处理了","27处理了","28处理了","29处理了","30处理了","31处理了","32处理了","33处理了","34处理了","35处理了","36处理了","37处理了","38处理了","39处理了","40处理了","41处理了","42处理了","43处理了","44处理了","45处理了","46处理了","47处理了","48处理了","49处理了","50处理了","51处理了","52处理了","53处理了","54处理了","55处理了","56处理了","57处理了","58处理了","59处理了","60处理了","61处理了","62处理了","63处理了","64处理了","65处理了","66处理了","67处理了","68处理了","69处理了","70处理了","71处理了","72处理了","73处理了","74处理了","75处理了","76处理了","77处理了","78处理了","79处理了","80处理了","81处理了","82处理了","83处理了","84处理了","85处理了","86处理了","87处理了","88处理了","89处理了","90处理了","91处理了","92处理了","93处理了","94处理了","95处理了","96处理了","97处理了","98处理了","99处理了","100处理了"]
    
    3处理======resList结果大小100
    
    3处理======总处理耗时:6145
    1处理===【处理后数据结构】["1处理了","2处理了","3处理了","4处理了","5处理了","6处理了","7处理了","8处理了","9处理了","10处理了","11处理了","12处理了","13处理了","14处理了","15处理了","16处理了","17处理了","18处理了","19处理了","20处理了","21处理了","22处理了","23处理了","24处理了","25处理了","26处理了","27处理了","28处理了","29处理了","30处理了","31处理了","32处理了","33处理了","34处理了","35处理了","36处理了","37处理了","38处理了","39处理了","40处理了","41处理了","42处理了","43处理了","44处理了","45处理了","46处理了","47处理了","48处理了","49处理了","50处理了","51处理了","52处理了","53处理了","54处理了","55处理了","56处理了","57处理了","58处理了","59处理了","60处理了","61处理了","62处理了","63处理了","64处理了","65处理了","66处理了","67处理了","68处理了","69处理了","70处理了","71处理了","72处理了","73处理了","74处理了","75处理了","76处理了","77处理了","78处理了","79处理了","80处理了","81处理了","82处理了","83处理了","84处理了","85处理了","86处理了","87处理了","88处理了","89处理了","90处理了","91处理了","92处理了","93处理了","94处理了","95处理了","96处理了","97处理了","98处理了","99处理了","100处理了"]
    
    1处理======resList结果大小100
    
    1处理======总处理耗时:6142
    4处理===【处理后数据结构】["1处理了","2处理了","3处理了","4处理了","5处理了","6处理了","7处理了","8处理了","9处理了","10处理了","11处理了","12处理了","13处理了","14处理了","15处理了","16处理了","17处理了","18处理了","19处理了","20处理了","21处理了","22处理了","23处理了","24处理了","25处理了","26处理了","27处理了","28处理了","29处理了","30处理了","31处理了","32处理了","33处理了","34处理了","35处理了","36处理了","37处理了","38处理了","39处理了","40处理了","41处理了","42处理了","43处理了","44处理了","45处理了","46处理了","47处理了","48处理了","49处理了","50处理了","51处理了","52处理了","53处理了","54处理了","55处理了","56处理了","57处理了","58处理了","59处理了","60处理了","61处理了","62处理了","63处理了","64处理了","65处理了","66处理了","67处理了","68处理了","69处理了","70处理了","71处理了","72处理了","73处理了","74处理了","75处理了","76处理了","77处理了","78处理了","79处理了","80处理了","81处理了","82处理了","83处理了","84处理了","85处理了","86处理了","87处理了","88处理了","89处理了","90处理了","91处理了","92处理了","93处理了","94处理了","95处理了","96处理了","97处理了","98处理了","99处理了","100处理了"]
    
    4处理======resList结果大小100
    
    4处理======总处理耗时:6150
    
  • 解读

    1. resList结果大小100 --ConcurrentHashMap 数据结构ok
    2. 总处理耗时 --模拟4个厂商,每个厂商100个,每个任务3秒,耗时6秒ok
    3. 前池线程个数 --动态重复使用和衰减,可行
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

多线程处理并有序整合数据方案 的相关文章

  • Grails 3.x bootRun 失败

    我正在尝试在 grails 3 1 11 中运行一个项目 但出现错误 失败 构建失败并出现异常 什么地方出了错 任务 bootRun 执行失败 进程 命令 C Program Files Java jdk1 8 0 111 bin java
  • 在 Java 中连接和使用 Cassandra

    我已经阅读了一些关于 Cassandra 是什么以及它可以做什么的教程 但我的问题是如何在 Java 中与 Cassandra 交互 教程会很好 如果可能的话 有人可以告诉我是否应该使用 Thrift 还是 Hector 哪一个更好以及为什
  • 在 java 类和 android 活动之间传输时音频不清晰

    我有一个android活动 它连接到一个java类并以套接字的形式向它发送数据包 该类接收声音数据包并将它们扔到 PC 扬声器 该代码运行良好 但在 PC 扬声器中播放声音时会出现持续的抖动 中断 安卓活动 public class Sen
  • Java JDBC:更改表

    我希望对此表进行以下修改 添加 状态列 varchar 20 日期列 时间戳 我不确定该怎么做 String createTable Create table aircraft aircraftNumber int airLineCompa
  • 如何找到给定字符串的最长重复子串

    我是java新手 我被分配寻找字符串的最长子字符串 我在网上研究 似乎解决这个问题的好方法是实现后缀树 请告诉我如何做到这一点或者您是否有任何其他解决方案 请记住 这应该是在 Java 知识水平较低的情况下完成的 提前致谢 附 测试仪字符串
  • Android:捕获的图像未显示在图库中(媒体扫描仪意图不起作用)

    我遇到以下问题 我正在开发一个应用程序 用户可以在其中拍照 附加到帖子中 并将图片保存到外部存储中 我希望这张照片也显示在图片库中 并且我正在使用媒体扫描仪意图 但它似乎不起作用 我在编写代码时遵循官方的Android开发人员指南 所以我不
  • INSERT..RETURNING 在 JOOQ 中不起作用

    我有一个 MariaDB 数据库 我正在尝试在表中插入一行users 它有一个生成的id我想在插入后得到它 我见过this http www jooq org doc 3 8 manual sql building sql statemen
  • 控制Android的前置LED灯

    我试图在用户按下某个按钮时在前面的 LED 上实现 1 秒红色闪烁 但我很难找到有关如何访问和使用前置 LED 的文档 教程甚至代码示例 我的意思是位于 自拍 相机和触摸屏附近的 LED 我已经看到了使用手电筒和相机类 已弃用 的示例 但我
  • JavaMail 只获取新邮件

    我想知道是否有一种方法可以在javamail中只获取新消息 例如 在初始加载时 获取收件箱中的所有消息并存储它们 然后 每当应用程序再次加载时 仅获取新消息 而不是再次重新加载它们 javamail 可以做到这一点吗 它是如何工作的 一些背
  • 无法解析插件 Java Spring

    我正在使用 IntelliJ IDEA 并且我尝试通过 maven 安装依赖项 但它给了我这些错误 Cannot resolve plugin org apache maven plugins maven clean plugin 3 0
  • 如何为俚语和表情符号构建正则表达式 (regex)

    我需要构建一个正则表达式来匹配俚语 即 lol lmao imo 等 和表情符号 即 P 等 我按照以下示例进行操作http www coderanch com t 497238 java java Regular Expression D
  • Java按日期升序对列表对象进行排序[重复]

    这个问题在这里已经有答案了 我想按一个参数对对象列表进行排序 其日期格式为 YYYY MM DD HH mm 按升序排列 我找不到正确的解决方案 在 python 中使用 lambda 很容易对其进行排序 但在 Java 中我遇到了问题 f
  • AWS 无法从 START_OBJECT 中反序列化 java.lang.String 实例

    我创建了一个 Lambda 函数 我想在 API 网关的帮助下通过 URL 访问它 我已经把一切都设置好了 我还创建了一个application jsonAPI Gateway 中的正文映射模板如下所示 input input params
  • 如何从泛型类调用静态方法?

    我有一个包含静态创建方法的类 public class TestClass public static
  • 获取 JVM 上所有引导类的列表?

    有一种方法叫做findBootstrapClass对于一个类加载器 如果它是引导的 则返回一个类 有没有办法找到类已经加载了 您可以尝试首先通过例如获取引导类加载器呼叫 ClassLoader bootstrapLoader ClassLo
  • 编译器抱怨“缺少返回语句”,即使不可能达到缺少返回语句的条件

    在下面的方法中 编译器抱怨缺少退货声明即使该方法只有一条路径 并且它包含一个return陈述 抑制错误需要另一个return陈述 public int foo if true return 5 鉴于Java编译器可以识别无限循环 https
  • 使用 JMF 创建 RTP 流时出现问题

    我正处于一个项目的早期阶段 需要使用 RTP 广播DataStream创建自MediaLocation 我正在遵循一些示例代码 该代码目前在rptManager initalize localAddress 出现错误 无法打开本地数据端口
  • 如何修复 JNLP 应用程序中的“缺少代码库、权限和应用程序名称清单属性”?

    随着最近的 Java 更新 许多人都遇到了缺少 Java Web Start 应用程序的问题Codebase Permissions and Application name体现属性 尽管有资源可以帮助您完成此任务 但我找不到任何资源综合的
  • 按日期对 RecyclerView 进行排序

    我正在尝试按日期对 RecyclerView 进行排序 但我尝试了太多的事情 我不知道现在该尝试什么 问题就出在这条线上适配器 notifyDataSetChanged 因为如果我不放 不会显示错误 但也不会更新 recyclerview
  • 使用 xpath 和 vtd-xml 以字符串形式获取元素的子节点和文本

    这是我的 XML 的一部分

随机推荐

  • JAVA开发(神乎其神的区块链技术之数据上链)

    这是我第二遍写关于区块链的博文 前一篇文章 神乎其神的区块链概念和技术 主要介绍区块链的由来和基本概念 因为博主最近在做一个区块链项目 所以有时候也遇到一些概念性的知识需要去理解 比如数据的上链 谈到数据上链 我们先了解一下现在都有什么链
  • Combine中类似Rxswift中的onNext

    其实Comibne中也有类似的方法 就是它handleEvents cancellable integers publisher handleEvents receiveSubscription subs in print Subscrip
  • tensorflow-1.14 版本更新

    使用TensorFlow训练文本筛选 错误提示 AttributeError module tensorflow python platform flags has no attribute mark flag as required 由于
  • VMware安装GHOST版XP教程

    VMware安装GHOST版XP教程 本来我是无法安装GHOST版的XP系统在VMware上 我很苦恼 到处找方法 最后找到了这里 可是每个关于这个问题的帖子里边都说改问题早就被处理 让搜索老帖子 可是我搜索出来的帖子里边的回复都说是改问题
  • 近期数据挖掘学习_计划安排及相关资料(定期更新)

    理论学习 学习主线 1 机器学习 统计学习方法 李航 机器学习 周志华 机器学习笔记 吴恩达 Scikit Learn文档 2 统计学复习 深入浅出统计学 statistics for business economics by ander
  • 零基础入门STM32编程——点灯(HAL库)(六)

    系列教程 定时器原理与配置 系列教程 GPIO原理与配置原则 前情回顾 通过前面几篇的学习 见目录 我们对STM32的基本架构以及原理有了一定了解 对GPIO的概念了有一定的认识 接下来通过一个简单的点灯项目 进步学习STM32编程 一 项
  • 使用linux主义的问题

    第一点 看看是否有服务 没有则apt get install 第二点 更改文件后更新文件 source 文件 第三点 权限 一定要看看权限 否则上传或其他操作则不被允许
  • Python基础知识(第二天)

    链式赋值 系列解包覆值 常量 链式赋值 x y 123 相当于 x 123 y 123 系列解包覆值 a b c 4 5 6 相当于 a 4 b 5 c 6 常量 Python 不支持常量 即没有语法规则限制改变一个常量的值 我们只能约定常
  • 雅虎、领英接连退出中国,开发者:GitHub 也会受到影响吗?

    继半个月前微软宣布关闭领英 即 LinkedIn 在华业务后 本周二 雅虎也宣布了最新消息 自 2021 年 11 月 1 日起 用户将无法从中国大陆使用 Yahoo 的产品与服务 一时之间 许多人将这两起事件结合在一起 也由此引发了开发人
  • Windows使用C++模拟鼠标点击----防止校园网掉线--登录校园网

    Linux模拟鼠标使用shell脚本就可以实现了 可以搜一下就可以解决 Windows模拟鼠标点击使用Python总会出现问题 所以使用C 来实现 1 使用gl c include
  • 电脑商城项目总结-01用户管理模块(注册,登录,修改密码,个人信息,上传头像)

    目录 部分图片展示 application properties 创建数据库并且验证是否静态资源能够正常访问 创建用户表 实体类 持久层 业务层 控制层 拦截器 单元测试 部分图片展示 以下是大体上的代码 application prope
  • Mac平台VMware Fusion虚拟机无网络连接与解决方法

    打开设置Network 点击下方锁子打开权限后点击 新增一个 把所有能打的对勾都打上 打开虚拟机后点击上面的 lt gt 然后把对勾打到新增的那个网络设置上 然后重启 不是挂起 而是重启
  • SpringBoot修改端口号不生效

    springboot中端口失效问题 idea中除了在配置文件中配置端口 还可在Edit Configurations中配置端口号 以往在这里配置端口号都可生效 此次失效是因为 当前模块依赖的模块中resource文件未指定为资源文件 上图中
  • C++并发与异步知识点最全汇总

    c 并发 文章目录 c 并发 1 thread 2 this thread命名空间 3 互斥 1 mutex 2 符合RAII标准的锁 lock guard 3 符合RAII标准并且更自由 unique lock 4 死锁 1 死锁的预防
  • OpenGLES跨平台glReadPixels API问题解决

    1 引言 在原始Windows端上 我们使用glReadPixels 方法实现OpenGL 纹理到内存图像的转换 其中其支持的色彩类型包括GL RGBA GL RGB GL BGRA及GL BGR等色彩空间 便于我们实现纹理到各个色彩空间的
  • VLC搭建RTSP服务器的过程 -测试通过

    第一步 打开VLC 第二步 在媒体下拉菜单下 有一个子菜单 串流 如图所示 点击 串流 子菜单 弹出一个窗口 如下图所示 添加一个你要串流的本地文件 我刚才传给你的那个长一点的文件 第三步 会出现如下的界面 第五 点击下一步 第六步 在下拉
  • android 插入耳机 使用自身mic录音_苹果iPhone 12携最新系统强势登场,10款主流TWS耳机兼容性测试...

    北京时间2020年10月14日凌晨 苹果第二次秋季发布会成功落幕 会上发布了旗下搭载最新 iOS14 系统的 iPhone 12 系列智能手机和最新一代 HomePod mini 智能音箱 为了环保理念 苹果在此次发布会之后 官方商店在售
  • java时间工具类

    参考文档 https blog csdn net java mdzy article details 100099922 java时间工具类 package com td util import java sql Timestamp imp
  • python版本是3.9.3,如何匹配相应的pip或pip3?

    在 Windows 中 可以通过以下步骤来安装匹配 Python 3 9 3 版本的 pip 在浏览器中打开 https bootstrap pypa io get pip py 并下载该文件 打开命令提示符 Command Prompt
  • 多线程处理并有序整合数据方案

    方案设想 多线程异步 并行 处理待处理数据 for 线程池单例创实例和回收 防止处理过程中线程数过大 内存溢出 导致处理失败 例如持续for中new Thread 保证并行的线程处理个数 CountDownLatch 防止线程池未全部结束就