redisson究极爽文-手把手带你实现redisson的发布订阅,消息队列,延迟队列(死信队列),(模仿)分布式线程池

2023-11-10

参考资料 :分布式中间件实战:java版 (书籍), 多线程视频教程(视频)…

项目启动环境

导入依赖

  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.7.graal</version>
    </dependency>
        
   <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
   </dependency>
        
   <dependency>
           <groupId>org.redisson</groupId>
           <artifactId>redisson</artifactId>
           <version>3.13.6</version>
   </dependency>

添加配置类

从spring容器中获取bean


import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class BeanContext implements ApplicationContextAware {

    @Autowired
    private static ApplicationContext applicationContext;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        BeanContext.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext(){
        return applicationContext;
    }

    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException {
        return (T)applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clz) throws BeansException {
        return (T)applicationContext.getBean(clz);
    }
}

redisson配置类,当然你也可以写在yml里面 或者 是配置集群啥的

@Configuration
public class RedissonConfig {

   @Autowired
    private Environment environment ;

   @Bean
   public RedissonClient config(){
       Config config = new Config();
       config.setCodec(new org.redisson.client.codec.StringCodec());


       config.useSingleServer().setConnectionPoolSize(50);//设置对于master节点的连接池中连接数最大为500
       config.useSingleServer().setIdleConnectionTimeout(100000);
       //如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。
       config.useSingleServer().setConnectTimeout(300000);//同任何节点建立连接时的等待超时。时间单位是毫秒。
       config.useSingleServer().setTimeout(30000);//等待节点回复命令的时间。该时间从命令发送成功时开始计时。


    切记这里要序列化 ,不然 fastjson 
//  不能反序列化 阻塞队列中的string元素
       Codec codec = new JsonJacksonCodec();
       config.setCodec(codec);

// 你的IP
       config.useSingleServer().setAddress("redis://***.***.***.**:6379").setKeepAlive(true)   ;
       return Redisson.create(config) ;

   }

}

ok~~,项目环境搭建完了

发布订阅

生产者(消息发布者)

@Slf4j
@Component
public class UserLoginPublisher {
    public static final String TOPICKEYLOGINUSER = "UserLoginKey" ;

    @Autowired
    private RedissonClient redissonClient ;

    public void sendMsg(UserLoginDto user){
        try {
            log.info("准备发送消息 ~~");
            // TOPICKEYLOGINUSER : 是一个string ,是订阅的主题
            RTopic clientTopic = redissonClient.getTopic(TOPICKEYLOGINUSER,new SerializationCodec());
           // 消息发布(这里是异步的形式)
            clientTopic.publishAsync(user);
            long l = clientTopic.countSubscribers();
            System.out.println(l);
        } catch (Exception e) {
            log.info("消息发送失败 ~~:{}",e);
        }
    }
}

消费者(订阅者)

订阅者的 redissonClient.getTopic(“UserLoginKey”,new SerializationCodec()); 要和 发布者的 topic 一样

import com.alibaba.fastjson.JSON;
import com.example.redissiontest.dto.UserLoginDto;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.codec.SerializationCodec;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;


@Slf4j
@Component
//implements ApplicationRunner, Ordered 是实现 线程的一种方式 当然你也可以 继承 runnable
// CommandLineRunner也可以在spring启动的时候进行执行
public class UserLoginSubscriber implements ApplicationRunner, Ordered {

    private static final String TOPICKEYLOGINUSER = UserLoginPublisher.TOPICKEYLOGINUSER;

    @Autowired
    private RedissonClient redissonClient ;


    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("线程正在运行 ~~");
        try {
            RTopic topic = redissonClient.getTopic("UserLoginKey",new SerializationCodec());
            topic.addListener(UserLoginDto.class, new MessageListener<UserLoginDto>() {
                @Override
                public void onMessage(CharSequence charSequence, UserLoginDto userLoginDto) {
                    String s = JSON.toJSONString(userLoginDto);
                    System.out.println("收到消息:"+s);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    //开局运行
    @Override
    public int getOrder() {
        return 0;
    }
}

测试

   @Test
    void contextLoadsPublish() {
        UserLoginDto loginDto = new UserLoginDto("cunk", "109922", 1);
        userLoginPublisher.sendMsg(loginDto);

    }

消息队列

生产者

在这里插入图片描述

基于Redis的分布式队列Queue是Redisson提供的又一个功能组件,按照不同的特性,分布式队列Queue还可以分为双端队列Deque、阻塞队列Blocking Queue、有界阻塞队列(Bounded Blocking Queue)、阻塞双端队列(Blocking Deque)、阻塞公平队列(Blocking Fair Queue)、阻塞公平双端队列(Blocking Fair Deque)等功能组件,不同的功能组件其作用不尽相同,适用的业务场景也是不一样的。
在实际业务场景中,不管是采用哪一种功能组件作为“队列”,其底层核心的执行逻辑仍旧是借助“基于发布-订阅式的主题”来实现的
注意这里有个消息重试机制 , 消息的发送和接收需要 是同一个队列里面

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class QueuePublisher {
   @Autowired
    private RedissonClient redissonClient ;

   String key = QueueConsumer.queueName ;

   public void sendMSG(String msg){
       try {
           RQueue<Object> queue = redissonClient.getQueue(key);
           queue.add(msg) ;
           log.info("消息队列发送消息成功~~");
       } catch (Exception e) {
           log.info("消息队列发送消息失败~~");
           // 消息重试
           RQueue<Object> queue = redissonClient.getQueue(key);
           queue.add(msg) ;
       }
   }

}

消费者

@Slf4j
@Component
public class QueueConsumer implements ApplicationRunner, Ordered {

   public static final String queueName = "redssionName" ;


    @Autowired
    private RedissonClient redissonClient ;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        RQueue<String> queue = redissonClient.getQueue(queueName);

        while (true){
            String msg = queue.poll();
            if (msg!=null){
             log.info("消息队列监听到消息:{}",msg);
            }

        }

    }

    //
    @Override
    public int getOrder() {
        return -1;
    }
}

测试

 @GetMapping("/test/{msg}")
    public String queue(@PathVariable("msg") String msg) {
        try {
            queuePublisher.sendMSG(msg);
        } catch (Exception e) {
            //发送失败 重试
            queuePublisher.sendMSG(msg);

        }
        return "ok" ;
    }

延迟队列

用户的需求是多样化的,永远不会按照程序员的思路走!在实际的生产环境中,仍旧存在着需要处理不同 TTL(即过期时间/存活时间)的业务数据的场景,为了解决此种业务场景,Redisson提供了“延迟队列”这个强大的功能组件,它可以解决RabbitMQ死信队列出现的缺陷,即不管在什么时候,消息将按照 TTL从小到大的顺序先后被真正的队列监听、消费,其在实际项目中的执行流程如图

在这里插入图片描述

生产者


import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class RedissonDelayQueuePublisher {

   @Autowired
   RedissonClient redissonClient ;

   public static final String QUEUE_DELAY_KEY = "delayqueueKey";

   public void sendDelayMsg(String msg,Long ttl){

      //阻塞队列
       RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue(QUEUE_DELAY_KEY);

       //延迟队列
       RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

       delayedQueue.offer(msg,ttl, TimeUnit.SECONDS);

       log.info("延迟队列 , 阻塞队列生成");
   }

}

消费者


import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;



@EnableScheduling
@Component
@Slf4j
public class RedissonDelayQueueConsumer {

   @Autowired
    private RedissonClient redissonClient ;

   private String delayqueueKey = RedissonDelayQueuePublisher.QUEUE_DELAY_KEY;

    @Scheduled(cron = "0/1 * * * * ?")
   public void consumerMsg() throws InterruptedException {

       RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(delayqueueKey);
       Object msg = blockingDeque.take();

       if (msg!=null){
           log.info("从消息队列中取出消息:{}",(String)msg);
       }
   }

}

测试

 @Autowired
    private RedissonDelayQueuePublisher redissonDelayQueuePublisher ;

    @Autowired
    RedissonDelayQueueConsumer redissonDelayQueueConsumer ;

    @GetMapping("/delaytest")
    public void queue() throws InterruptedException {

        for (int i = 0; i < 10; i++) {
            Random random = new Random();
            int ttl = random.nextInt(10);
            Long ttlTime = Long.valueOf(ttl);
            String  msg ="这是一条消息,他的延迟时间是:"+ttl ;
            redissonDelayQueuePublisher.sendDelayMsg(msg,ttlTime);
        }
    }

在这里插入图片描述

分布式线程池

我的思路 1. redisson 延迟队列实现线程池(将普通线程池中的阻塞队列换成redisson的阻塞队列就行)
2.把普通线程池 改造成立 阻塞队列基于 redisson的分布式阻塞队列 , 线程池变成了一个线程不停监听 redisson 。虽然中间有很 多插曲 ,大体设计就是这样

线程池代码

建议先看看我的这篇文章 手搓线程池
在这里插入图片描述

import com.alibaba.fastjson.JSON;
import com.example.redissiontest.config.BeanContext;
import com.sun.org.apache.xpath.internal.operations.Bool;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;


import java.util.HashSet;
import java.util.concurrent.BlockingQueue;

@Component
@Slf4j
public class ThreadPool implements Runnable  {

    RedissonClient redissonClient = BeanContext.getBean(RedissonClient.class);

    private Boolean flage =false ;
   private int coreSize ;
   private long timeOut ;
   private BlockingQueue<String> blockQueue ;
   private BlockingQueue<String> blockQueuework ;
   private HashSet<Worker> workers = new HashSet<>() ;

    public ThreadPool() {
        RBlockingQueue<String> blockQueue = redissonClient.getBlockingQueue("ThreadPool");
        RBlockingQueue<String> blockQueuework = redissonClient.getBlockingQueue("workQueue");
        this.coreSize = 4;
        this.blockQueue =blockQueue ;
        this.blockQueuework = blockQueuework ;
    }



    //执行任务
    public void execute(Task task) throws InterruptedException {
        //任务数量没有超过线程数量 ,交给 work执行
        //else
        //任务数超过coreSize的时候 加入任务队列暂存
        synchronized (workers){
            if (workers.size() < coreSize){
                Worker worker = new Worker(task);
                log.debug("新增work",worker);
                worker.start();
                workers.add(worker) ;
            }else {
                log.debug("加入任务队列",task);
                String taskStr = JSON.toJSONString(task);
                blockQueuework.put(taskStr);
            }
        }

    }



    //work :线程 ,(用来执行任务的)
    class Worker extends Thread {
        private Task task ;

        public Worker(Task task) {
            this.task = task;
        }

        @SneakyThrows
        @Override
        public void run() {
            //执行任务
            //当task不为空 ,执行完毕
            //当task执行完毕,接着从任务队列获取任务

            while (task!=null ||( task = JSON.parseObject(blockQueuework.take(), Task.class))!=null){
                try{
                    log.debug("正在执行...{}",task);
                    task.run();

                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null ;
                    if (blockQueuework.size() == 0){
                        log.info("线程池没有任务 ,阻塞中.....");
                    }

                }
            }

            synchronized (workers){
                log.debug("work被移除...{}",task);
                workers.remove(this) ;
            }
        }
    }


    @SneakyThrows
    @Override
    public void run() {
        while (true){

            String take = blockQueue.take();
            Task task = JSON.parseObject(take, Task.class);
            this.execute(task);
        }
    }


}

测试代码


    @GetMapping("/threadPool")
    public void queuePool() throws InterruptedException {
        RBlockingQueue<String> blockQueue = redissonClient.getBlockingQueue("ThreadPool");

        for (int i = 0; i < 15; i++) {
            Task task = new Task();
            String taskS = JSON.toJSONString(task);
            blockQueue.put(taskS);
        }
    }


在这里插入图片描述

当然 你也可以使用策略模式增加他的 功能 ,你也实现一下把~~~
而且他的拓展性非常好, 你可以创建更多不同类型的线程池进行粘合进去,反正 所有线程池都是监听的一条阻塞队列 ,当然你还可以
拓展 将 不同类型的任务对象 放入不同类型的 线程池中。
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

redisson究极爽文-手把手带你实现redisson的发布订阅,消息队列,延迟队列(死信队列),(模仿)分布式线程池 的相关文章

随机推荐

  • Android Studio代码提示自动补全设置

    最近学习Android开发课程 用的是Android studio开发工具 刚开始用发现竟然没有代码提示补全功能 我自己去看了一下设置 又设置了匹配补全提示 code起来还是不行 后来上网搜索 都是一些关于如何设置个性化自动补全提示的内容
  • BC1.2快充协议介绍

    BC1 2定义 BC1 2 Battery Charging v1 2 是USB IF下属的BC Battery Charging 小组制定的协议 主要用于规范电池充电的需求 该协议最早基于USB2 0协议来实现 BC1 2充电端口 USB
  • 有无监督,上下游任务,高斯分布,BN总结

    1 无监督和有监督的区别 有无标号 label与预测结果做损失loss transformer有监督的 BERT 在预训练中在没有标号的数据集上训练 在微调的时候同样是用一个BERT模型 但是它的权重被初始化成在预训练中得到的权重 有标号的
  • 华为OD机试 - 字符串加密(Java)

    题目描述 给你一串未加密的字符串str 通过对字符串的每一个字母进行改变来实现加密 加密方式是在每一个字母str i 偏移特定数组元素a i 的量 数组a前三位已经赋值 a 0 1 a 1 2 a 2 4 当i gt 3时 数组元素a i
  • 软件工程学习过程中工具、资料汇总与心得

    因为在上了半年课以后 发现学的课程太杂 要的工具太多 回顾当初找工具找到病毒工具的苦不堪言的黑历史 在此整理学习用到的所有工具 保持更新 因为文件已经被别人上传了 还要积分什么鬼的 信息化时代共享不好嘛 因此将文件均上传至百度网盘 下载缓慢
  • vue+nodejs 搭建网站全过程

    Vue js Node js MongoDB 的网站搭建示例 安装和初始化项目 使用 Vue Cli 初始化前端项目 vue create my site 使用 Express 初始化后端项目 npx express generator m
  • 如何解决局域网广播风暴

    晚唐诗人许浑曾写过一首诗 咸阳城东楼 其中有一句名句 被传诵千古 山雨欲来风满楼 山雨欲来风满楼 是全诗的警句 周围的群山 雨意越来越浓 大雨即将到来 城楼上 已是满楼的狂风 全句只有寥寥七个字 却十分形象地写出了山城暴雨即将来临时的情景
  • 华为OD机试 - 阿里巴巴找黄金宝箱(I)(Python)

    题目描述 一贫如洗的樵夫阿里巴巴在去砍柴的路上 无意中发现了强盗集团的藏宝地 藏宝地有编号从0 N的箱子 每个箱子上面贴有一个数字 箱子中可能有一个黄金宝箱 黄金宝箱满足排在它之前的所有箱子数字和等于排在它之后的所有箱子数字之和 第一个箱子
  • SQL SERVER行列不同分类的展示---PIVOT

    行列不同分类的SQL SERVER展示 由于工作的需求 需要对行不同分类 列也不同分类 可以将行分类之后对列进行每列CASE WHEN 进行展示 但是这种方法太蠢了 而且代码不够简洁 因此网上寻找了行转列函数 上最基础的做法 select
  • 关于javascript中number类型与string类型的比较

    javascript中number类型与string类型的比较 应该是根据number类型的数值情况 将string转换为与number数值相对应的值再比较 var numVal 10 00 if numVal 10 0000 consol
  • 搭建QNX开发环境-qnx系统环境开发

    锋影 e mail 174176320 qq com QNX是可以提供试用30天 目前发布最新的是qnx7 0版本 申请也是只能7 0 做好白老鼠的准备 老版本不再申请试用 其实多数时候 老版本的650 650sp1 和较新稳定的660版本
  • caffe 查看caffemodel中的参数与数据

    在用caffe训练完一个模型之后 我们想更加直观的查看这个模型该怎么做呢 caffe框架训练出来的caffemodel是一个类似于黑盒的东西 我们无法直接看到它的本质 需要借助caffe所定义的接口来协助我们 详细的文档在caffe官网上都
  • gcc链接脚本和启动文件详解

    C代码生成可执行程序分为 预编译 编译 汇编 链接四个阶段 预处理器把源程序聚合在一起 并把宏定义转换为源语言 编译器根据预处理的源程序生成汇编程序 汇编器处理汇编程序 生成可重定位的机器代码 连接器将可重定位的目标代码和库文件连接到一起
  • 基于51单片机直流电机PID调速PWM输出LCD1602液晶显示设计

    视频演示地址 https www bilibili com video BV1LK4y1R7ju 该设计是由AT89C51单片机为主控芯片显示为1602液晶构成直流电机调速 开机默认不转按下启动后电机开始运行 PID控制PWM进行调速 按键
  • Arduino使用ESP8266模块联网

    ESP8266模块准备 1 透传程序烧写 2 Arduino与ESP8266接线 Arduino模块程序 测试 总结 上一篇文章已经介绍了 利用 ArduinoIDE开发ESP8266模块 这篇文章介绍一下arduino怎么通过ESP826
  • unity鼠标事件

    鼠标事件 鼠标事件 都是当鼠标和gui或者碰撞体 Collider 交互时候触发 需要说明的是drag其实就是鼠标down后up之前持续每帧都会发送此消息 OnMouseDown 当鼠标上的按钮被按下时触发的事件 OnMouseDrag 当
  • LLVM IR格式的基本介绍

    LLVM IR以两种格式存储在磁盘上 1 位码 bc文件 2 汇编文本 ll文件 以sum c源代码为例 int sum int a int b return a b 使用Clang生成位码 命令如下 clang sum c emit ll
  • 单片机数码管从00到99C语言_51单片机数码管实现1到99显示

    在 51 单片机上实现用数码管显示 1 到 99 的数字 并且时间间隔为 1 秒 全部代码如下 include define uchar unsigned char define uint unsigned int sbit dula P2
  • C语言之tentative definition

    参考链接 What Are Tentative Symbols
  • redisson究极爽文-手把手带你实现redisson的发布订阅,消息队列,延迟队列(死信队列),(模仿)分布式线程池

    参考资料 分布式中间件实战 java版 书籍 多线程视频教程 视频 项目启动环境 导入依赖