利用ThreadPoolTaskExecutor创建线程池,并实现数据透传

2023-11-06

1.线程池配置

@Configuration
public class ThreadPoolConfig {

    /**
     * cpu内核 暂时默认8核
     */
    private static final int CORE_SIZE = 8;

    /**
     * 核心线程数 暂定为I/O密集型
     */
    private static final int CORE_POOL_SIZE = 2 * CORE_SIZE + 1;

    /**
     * 最大线程数
     */
    private static final int MAX_POOL_SIZE = 4 * CORE_SIZE + 1;

    /**
     * 线程队列容量
     */
    private static final int QUEUE_CAPACITY = 1000;

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        //看情况决定是否自定义线程池
        CustomThreadPoolExecutor threadPoolTaskExecutor = new CustomThreadPoolExecutor();
        threadPoolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        threadPoolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        threadPoolTaskExecutor.setQueueCapacity(QUEUE_CAPACITY);
        threadPoolTaskExecutor.setThreadNamePrefix("shiny-thread-");
        //交由调用方线程运行
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolTaskExecutor.setDaemon(true);
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

}

2.使用线程池

@Service
@Slf4j
public class IndexServiceImpl implements IndexService {

    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    /**
     * @return void
     * @desc 循环执行任务
     * @author dengkongze
     * @date 2022/3/30 16:09
     */
    @Override
    public void cycle() {
        log.info("开始执行任务!");
        for (int i = 0; i < 20; i++) {
            threadPoolTaskExecutor.execute(() -> {
                try {
                    Thread.sleep(5000);
                    //执行任务
                    log.info("执行完毕!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

}

3.调用方法,打印日志(tips:已配置打印REQUEST_ID)
console1
通过观察日志,不难看出:
a.线程池已配置成功(最后3个线程在16:54:11前还在队列中);
b.新开线程的数据未透传(REQUEST_ID打印为空),接下来处理这部分问题。

4.定义数据透传处理器父接口

public interface ThreadLocalFlowHandler {

    /**
     * 放入threadLocal
     * @param obj
     */
    void put(Object obj);

    /**
     * 取出数据
     * @return
     */
    Object get();

    /**
     * 清除数据
     */
    void clear();

}

5.实现该处理器 - MDC日志数据透传

public class MdcInfoFlowHandler implements ThreadLocalFlowHandler {

    @Override
    public void put(Object obj) {
        // MDC 透传
        if (!ObjectUtils.isEmpty(obj)) {
            MDC.setContextMap((Map<String, String>) obj);
        }
    }

    @Override
    public Object get() {
        return MDC.getCopyOfContextMap();
    }

    @Override
    public void clear() {
        MDC.clear();
    }

}

6.自定义线程池

@Slf4j
public class CustomThreadPoolExecutor extends ThreadPoolTaskExecutor {

    private static final List<ThreadLocalFlowHandler> threadLocalFlowHandlers = new ArrayList<>();

    static {
        threadLocalFlowHandlers.add(new MdcInfoFlowHandler());
    }

    @Override
    public void execute(Runnable task) {
        Map<ThreadLocalFlowHandler, Object> parentThreadLocal = getParentThreadLocal();
        super.execute(() -> agent(task, parentThreadLocal));
    }

    @Override
    public Future<?> submit(Runnable task) {
        Map<ThreadLocalFlowHandler, Object> parentThreadLocal = getParentThreadLocal();
        return super.submit(() -> agent(task, parentThreadLocal));
    }

    private Map<ThreadLocalFlowHandler, Object> getParentThreadLocal() {
        Map<ThreadLocalFlowHandler, Object> parentThreadLocal = new HashMap<>();
        threadLocalFlowHandlers.forEach(handler -> parentThreadLocal.put(handler, handler.get()));
        return parentThreadLocal;
    }

    /**
     * 透传ThreadLocal即可
     *
     * @param task
     * @param parentThreadLocal
     */
    private void agent(Runnable task, Map<ThreadLocalFlowHandler, Object> parentThreadLocal) {
        try {
            threadLocalFlowHandlers.forEach(handler -> {
                if (parentThreadLocal.containsKey(handler)) {
                    Object obj = parentThreadLocal.get(handler);
                    handler.put(obj);
                }
            });
            task.run();
        } catch (Throwable e) {
            log.error("thread error", e);
            throw e;
        } finally {
            //清空
            threadLocalFlowHandlers.forEach(ThreadLocalFlowHandler::clear);
        }
    }

}

7.再次调用方法,打印日志
console2

8.github源码:https://github.com/shiny1day/thread-pool-demo.git

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

利用ThreadPoolTaskExecutor创建线程池,并实现数据透传 的相关文章

  • 微服务架构超详细解析,写得太好了!

    本文将介绍微服务架构和相关的组件 介绍他们是什么以及为什么要使用微服务架构和这些组件 本文侧重于简明地表达微服务架构的全局图景 因此不会涉及具体如何使用组件等细节 为了防止不提供原网址的转载 特在这里加上原文链接 https www cnb
  • 什么是IPU?

    在图像识别的SOC中 有一个很重要的单元 IPU Image Processing Unit 图像处理单元 图像处理单元的目标是提供从图像输入 摄像头传感器 电视信号输入等 到显示设备 LCD显示屏 TV输出 外部图像处理单元等 端到端的数

随机推荐

  • 探索OLED拼接屏的特点及在莱山的场景化应用

    涞山oled拼接屏是一种高清晰度的显示屏 由多个oled屏幕拼接而成 它可以用于各种场合 如商业展示 广告宣传 会议演示等 涞山oled拼接屏具有以下特点 1 高清晰度 oled屏幕具有高对比度 高亮度 高色彩饱和度等特点 可以呈现出非常清
  • go-kit grpc调用及中间件封装

    存在问题 grpc 调用问题 通常我们向业务返回会定义如下的结构 code 20000 msg Success data 但是如果我们定义如下的proro grpc的返回值可以在客户端不能直接使用 还需要使用json进行解析 message
  • 计算机网络1:Tcp三次握手和四次挥手

    一 TCP传输的过程 三次握手 1 建立连接并确认连接 三次握手 过程 1 客户端向服务端发出连接请求SYN 1 seq x 等待服务端响应 状态由CLOSED转为SYN SENT 2 服务端做出响应ACK和连接信号SYN 1 seq y
  • linux在dockers安装rides教程

    在Docker中安装Rider是一项非常有用的技能 因为它可以让您在Linux操作系统上进行开发和调试 本文将介绍如何在Docker上安装Rider 步骤1 安装Docker 首先 您需要在Linux操作系统上安装Docker 您可以使用以
  • 清空文件夹下的SVN文件BAT脚本

    清空文件夹下的SVN文件BAT脚本 1 脚本功能 清空文件夹及其子文件下下的所有 svn文件 避免svn提交时冲突 2 脚本内容 echo on color 2f mode con cols 80 lines 25 REM echo 正在清
  • 智能合约相关设计

    1 运行环境 以太坊采用以太坊虚拟机作为智能合约的运行环境 以太坊虚拟机是一个隔离的轻量级虚拟机环境 运行在其中的智能合约代码无法访问本地网络 文件系统或其他进程 对同一个智能合约 查看什么是智能合约 来说 往往需要在多个以太坊虚拟机中同时
  • 详细实现最短路径(迪杰斯特拉算法)

    最短路径 说白了 就是图里从一个顶点到另一个顶点的最小权值之和 今天 小编带大家一起用迪杰斯特拉 Dijkstra 算法实现它吧 目录 一 实现原理 二 代码实现 一 思路 二 代码 一 实现原理 其实 在小编看来 迪杰斯特拉算法与普里姆算
  • ws协议与http协议的异同

    http协议 识别数据内容 与webSocket协议 同 建立在TCP之上 同http一样通过TCP来传输数据 不同 HTTP协议为单向协议 即浏览器只能向服务器请求资源 服务器才能将数据传送给浏览器 而服务器不能主动向浏览器传递数据 分为
  • Selenium及chromedriver安装教程

    文章目录 安装Python环境及Selenium工具包 使用命令行安装 使用Pycharm安装 安装chromedriver驱动 验证 安装Python环境及Selenium工具包 首先 我们需要安装Python环境 安装好了之后需要安装S
  • keras IMDB数据集 LSTM分类

    在keras提供的IMDB数据集中 word被映射为一个大于0的整数 表示该单词出现频率的排名 这样处理的目的是为了方便按照词频过滤单词 其中0用于表示unknown word 载入数据 x train shape 25000 是一个250
  • 如果IBM再给我一次实习机会

    2014年 我拿到了IBM斯图加特R D的实习机会 在连续被索尼和博世拒掉之后 这个实习对我来说弥足珍贵 我学的是通信专业 在这之前与编程相关的活动只有一学期的安卓Lab 还是靠抱队友大腿才及格 在申请时 我的编程能力可以说几乎为0 连我自
  • java: 找不到符号 符号: 类 ResourceVO 位置: 类 com.

    一 java找不到符号 如果你的代码里没有任何问题 但是java报错找不到符号 如下 解决方法
  • 《Pytorch深度学习和图神经网络(卷 2)》学习笔记——第二章

    基于图片内容的处理任务 主要包括目标检测 图片分割两大任务 目标检测 精度相对较高 主要是以检测框的方式 找出图片中目标物体所在坐标 模型运算量相对较小 相对较快 图片分割 精度相对较低 主要是以像素点的集合方式 找出图片中目标物体边缘的具
  • Prometheus 监控之 kafka

    初探 默认情况下 Kafka metrics 所有的 metric 都可以通过 JMX 获取 暴露kafka metrics 支持两种方式 1 在 Kafka Broker 外部 作为一个独立进程 通过 JMX 的 RMI 接口读取数据 这
  • linux:需要注意docker和aws的rds的mysql默认是UTC而不是中国时区

    问题 如题 解决办法 docker参考 mysql时间不对 修改时区 set global time zone 无效 小书生 的博客 CSDN博客 aws参考 https www youtube com watch v B NaqV A1B
  • 数字IC手撕代码--联发科(总线访问仲裁)

    题目描述 当A B两组的信号请求访问某个模块时 为了保证正确的访问 需要对这些信号进行仲裁 请用Verilog实现一个仲裁器 对两组请求信号进行仲后 要求 协议如图所示 请求方发送req request 信号1表示有请求给仲裁器 仲裁器响应
  • es6扩展运算符 (...)

    es6的扩展运算符就是取出参数对象中的所有可遍历属性 拷贝到当前对象之中 let bar a 1 b 2 let baz bar a 1 b 2 实际上是通过Object assign方法实现的 let baz Object assign
  • 【React】路由懒加载 React.lazy()

    React路由懒加载lazy 文章目录 React路由懒加载lazy React lazy 懒加载概念 React lazy 使用 React lazy 懒加载概念 我们在使用网站时 如果不对路由使用懒加载 则会导致刚打开网站就加载全部路由
  • spring security oauth2源码解析

    spring security oauth2源码解析 EnableResourceServer 启用资源服务配置 注入配置 ResourceServerConfiguration ResourceServerConfiguration 资源
  • 利用ThreadPoolTaskExecutor创建线程池,并实现数据透传

    1 线程池配置 Configuration public class ThreadPoolConfig cpu内核 暂时默认8核 private static final int CORE SIZE 8 核心线程数 暂定为I O密集型 pr