代码技巧——如何关闭订单?延迟任务的实现方案【建议收藏】

2023-11-15

先思考个问题:为什么要关闭订单

业务上:

1. 提供待付款时间,而不是简单的"一次付款机会",提高业务指标之一的成单率;成单率=成功下单的人数/发起支付的人数;

2. 下单成功意味着这个商品被当前订单占用,库存已经预扣减,如果迟迟不支付则需要回收库存;

用户体验上:

1. 增加紧迫感,这个紧迫感是指这个商品热销,我好不容易锁定后待支付,有一个时间倒计时的提醒“我再不付款,这个商品就被释放了”;

2. 提供容错方案,如支付过程存在问题,网络系统原因,或者当前卡没有钱/扣款失败,或者我想再看看别的最终价进行一个对比;退出支付页面给的倒计时,是为了告诉用户我为你保留多久,你随时可以再次发起支付;

3. 简化操作流程,用户因为一些支付、犹豫等原因,暂时放弃支付了,但事后又突然想支付呢?当然不能让用户重新选择商品,选择商品属性,然后再到支付页面;所以保留订单一段时间,是给用户提供便利;

待付款时间长短差异&位置与形式的差异,主要跟商家/平台售卖效率与收益相关,但也要考虑用户体验,因为会影响长远的效益;

本篇介绍交易系统下,关闭订单的几种实现方式;

1. 业务背景

以下均为曾负责的商城业务的真实业务场景:

1. 商城用户下单,15分钟催付,30分钟订单未付款自动取消;

2. 商城活动系统,活动开始前15分钟对设置了秒杀提醒的用户进行push推送;

3. 商城活动系统,拼团活动开奖时,计算成功拼团队伍得分;

4. 秒杀系统,用户成功秒杀到商品,5分钟未提交订单退还库存;

2. 方案一:定时任务轮询

上述场景传统的解决方案为定时任务扫表执行,一般情况下,我们的实现都是通过一个job,定时扫一段时间内的数据,对满足执行条件数据的进行相应的业务操作;如果数据量比较大,业务操作耗时,可能要对业务操作进行异步处理,或者通过其他job框架对数据进行拆分;

优点:未引入第三方框架,实现简单;未引入额外的系统交互,可靠性高;

缺点

1. 定时任务频率不好把握;频率过小,会对数据库产生较大压力且如果任务执行时间过长,会导致任务积压;频率过大,任务的及时性无法得到有效的保障,带来一定的时间延迟;

2. 空转导致的数据库资源浪费;如场景2,为了保障秒杀提醒的及时性,定时任务每分钟会执行一次;但秒杀活动是不定期举行的,大部分定时任务查询不到任何数据;造成网络IO和磁盘IO的消耗;

3. 数据量大的时候扫描数据的时间开销会很大,实现方案需要调整,并且可能更加复杂;

定时任务的方案对于数据量大、实时性要求高的应用场景不太适合,但是对于数据量小(索引设置的合理)、实时性要求不高的场景(如一些B端运营后台场景)完全是可以快速支持的,由于是基于定时扫库,因此天然的保证了延迟任务不会丢失;

这种方案不能说是完全不能用,也不要因为没使用框架/算法就说这种方案很low没法用;例如《领导:谁再用定时任务实现关闭订单,立马滚蛋! - 掘金》这篇文章的标题,就略显狭隘;

方案本就是针对解决不同场景的问题的,没有绝对的通用方案,也没有绝对不能用的方案;引入一种方案一定是结合具体问题的,任何方案都会带来接入成本和或多或少的弊端;

3. 方案二:用JDK实现延迟任务

JDK中自带了一些API可以支持延迟任务,缺点是单机执行,并且存放消息的队列/线程任务存放在JVM内存中,异常中断/重启时会丢失任务;下面分别给出两种实现方式的代码示例;

  • ScheduledExecutorService

JDK自带线程池,它能调度一些命令在一段时间之后执行,或者周期性的执行,下面给出了示例,其中 ScheduledExecutorService#schedule 支持延迟指定时间后执行一次任务;

package com.internet.demo.service.testschedule;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author Akira
 * @description
 * @date 2022/9/30
 */
public class TestScheduledExecutorService {

    public static void main(String[] args) {
        // 线程池(核心池)的大小会影响多个任务的执行 单线程下任务执行会阻塞
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

        // #schedule 延迟指定时间后执行一次
        executorService.schedule(() -> {
            System.out.println("runD1 start " + System.currentTimeMillis());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ignore) {
            }
            System.out.println("runD1 " + System.currentTimeMillis());
        }, 1000L, TimeUnit.MILLISECONDS);
        executorService.schedule(() -> {
            System.out.println("runD2 start " + System.currentTimeMillis());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ignore) {
            }
            System.out.println("runD2 " + System.currentTimeMillis());
        }, 2000L, TimeUnit.MILLISECONDS);

        // #scheduleAtFixedRate 按照固定周期频率执行 若任务执行时间大于间隔周期 则实际间隔为任务执行时间
        executorService.scheduleAtFixedRate(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ignore) {
            }
            System.out.println("runA " + System.currentTimeMillis());
        }, 0, 1000, TimeUnit.MILLISECONDS);


        // #scheduleWithFixedDelay 按照固定延迟时间周期执行 执行结束后再开始计算延迟时间
        executorService.scheduleWithFixedDelay(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ignore) {
            }
            System.out.println("runB " + System.currentTimeMillis());
        }, 0, 1000, TimeUnit.MILLISECONDS);

    }

}
  • DelayQueue

Java中的DelayQueue位于java.util.concurrent包下,作为单机实现,它很好的实现了延迟一段时间后触发事件的需求;由于是线程安全的,它可以有多个消费者和多个生产者,从而在某些情况下可以提升性能;使用DelayQueue需要考虑程序挂掉之后,内存里面未处理消息的丢失带来的影响;

原理上,DelayQueue本质是封装了一个优先级队列PriorityQueue,使之线程安全,加上Delay功能,也就是说,消费者线程只能在队列中的消息“过期”之后才能返回数据获取到消息,不然只能获取到null;关于优先级排序,使用了最小堆让队列在数据量较大的时候比较有优势,插入和获取时间复杂度相对都比较好,都是O(logN);

下面是代码示例:

package com.internet.demo.service.testschedule;

import lombok.Data;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author Akira
 * @description
 * @date 2022/9/30
 */
public class TestDelayQueue {

    public static void main(String[] args) {
        final DelayQueue<MyDelayTask> delayQueue = new DelayQueue<>();
        delayQueue.put(new MyDelayTask(5000, "taskA"));
        delayQueue.put(new MyDelayTask(2000, "taskB"));
        delayQueue.put(new MyDelayTask(8000, "taskC"));

        while (delayQueue.size() != 0) {

            // 如果没到时间,该方法会返回null
            final MyDelayTask task = delayQueue.poll();
            System.out.println("尝试取出任务 task=" + ((task == null) ? "null" : task.toString()) + " nowTime:" + System.currentTimeMillis());

            if (task != null) {
                System.out.println(task.runTask());
            }

            try {
                // 1000ms轮询一次 尝试从队列中拿到任务
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException ignore) {
            }
        }

    }

}

@Data
class MyDelayTask implements Delayed {

    /**
     * 延迟时间 单位ms
     */
    private int delayTime;

    /**
     * 执行时间 时间戳格式 单位ms
     */
    private long execTime;

    /**
     * 任务参数
     */
    private String taskParams;

    public MyDelayTask(int delayTime, String taskParams) {
        this.delayTime = delayTime;
        this.taskParams = taskParams;
        // 结合当前时间 计算任务的目标执行时间
        this.execTime = System.currentTimeMillis() + delayTime;
    }

    /**
     * Delayed接口的抽象方法 定义当前距离目标时间的延迟时间
     *
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        // 执行目标时间 - 当前时间
        return unit.convert(execTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * Comparable接口的抽象方法 比较两个Delayed对象的大小 会用于确定任务在优先级队列中的排序 使用Delayed#getDelay来计算
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        // 精度到ms即可
        long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
        return (int) delta;
    }

    /**
     * 自定义的任务执行逻辑
     *
     * @return
     */
    public Object runTask() {
        return String.join("-", "执行任务", this.taskParams, String.valueOf(System.currentTimeMillis()));
    }
}

 4. 方案三:监听Redis过期时间不推荐

Redis的key是支持设置TTL的,通过修改Redis的配置文件开启过期事件,然后在项目工程中注册监听redis过期事件的监听器,对监听的key(如订单以order开头)做匹配完成后续的处理;

注意,使用该功能需要下载2.8.0及以上的版本,这一部分详细内容可以访问redis官网:Redis keyspace notifications | Redis

先看测试代码:

(1)开启Redis过期事件,需要在集群中的每个redis的配置文件中写上一下代码:

notify-keyspace-events Ex

notify-keyspace-events默认的配置是空值"",表示不接收任何通知,修改配置后需要重启redis实例以生效配置;

(2)配置Redis监听器

先注册RedisMessageListenerContainer,为Redis消息监听器提供异步行为的容器;

@Configuration
public class RedisConfig {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        //Redis消息监听器
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        //设置Redis链接工厂
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

然后定义key过期的监听器,通过继承KeyExpirationEventMessageListener来完成;

//消息过期监听器
@Component
public class RedisExpireListener extends KeyExpirationEventMessageListener {
    // 构造器注入RedisMessageListenerContainer依赖
    public RedisExpireListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    //当消息过期,触发方法
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        System.out.println("Key -> "+expiredKey +"过期了...");
    }
}

接下来可以简单的通过命令行设置一个较短的过期时间的key来验证监听器配置成功;

但是请注意一点:Redis的过期事件不一定准时!

这篇《请勿过度依赖Redis的过期监听!》文章做了一个类似压力测试的实验,作者分别增加Redis的key数量规模,从1W到5W,观察redis过期事件的准时情况,结论如下:

测试结果:

(1)当key数量小于1万的时候,基本上都可以在10s内完成过期通知;

(2)当key数量到3万的时候,就有部分key会延迟120s;

(3)当key数量到5万的时候 , 大部分都已经滞后了两分钟 , 对于业务方来说已经完全无法忍受了;

分析问题:为什么会出现过期通知不及时的情况?

我们首先得了解redis底层是如何知道key过期的,它有三种方案:

Redis的3种过期策略
策略 描述 优点 缺点
定时删除 设置key的时候同时为它创建一个定时器 准时删除 CPU消耗大,存在大量设置了过期时间的key时每个key都维护一个定时器
定期删除 开启定时任务在某个时间点扫描有哪些key过期了 CPU消耗小 删除不及时,精度取决于定时任务执行频率;CPU消耗介于定时删除和惰性删除之间
惰性删除 操作key的时候判断是否过期 CPU消耗小 删除不及时;当过期key未被访问时,这些已过期的key将一直存在于内存中,占用内存空间

redis默认采用的策略是:定期删除+惰性删除;

定期删除的过程:

(1)每隔一段时间,Redis会分别去各个库随机拿20个非永久Key,判断它们是否过期,过期则删除,如果这一次拿的key中有超过1/4的数据过期,则再执行一遍过程1,直到过期数据不超过当次拿出来的20条记录的1/4;可以通过配置redis.conf中的hz修改Redis执行定期删除的频率,默认hz=10,即每100ms执行一次,1/4与每次拿的数量20暂时未找到配置项;

(2)如果当前数据库没有非永久key,则跳过当前数据库;

(3)如果key已过期,但没有被定期删除,由于惰性删除策略,在下次请求获取该数据时会将该数据删除;

因此,当key数量较多时,定时随机获取"非永久key"的时候不一定能拿到已经过期key,所以就可能造成key过期没有及时通知

其实也有解决思路:(1)让各个库中存放的"非永久key"的数量尽量少,能更大概率的被一次扫描出来;(2)调整redis过期删除任务的执行频率和单次扫描的数量(增大redis的CPU性能开销);

不过,无论是哪种方案,都会为了"实现延迟任务"而影响现有的redis配置或性能开销,都不太可取;实际上,Redis的官方文档明确的说明了"Basically ​​expired​​ events are generated when the Redis server deletes the key and not when the time to live theoretically reaches the value of zero.",也就是说"当Redis删除key时产生过期事件,这个时间基本上不会刚好是key到达TTL的时间",因此不要太过依赖中间件的这些看似好用的"特性",需要自己仔细先研究下官方文档;此外,键空间通知采用的是发送即忘(fire and forget)策略,这意味着它并不像消息队列一样保证送达,订阅了过期事件的客户端会丢失所有在其断线期间所有分发给它的事件;

下面这两篇官方文章讲的很清楚:

总结就是:利用redis的过期事件来实现延迟任务,这是一种比定时扫描数据库更 “LOW” 的解决方案,请不要使用!!!

本节参考:

Redis监听Key的过期事件

请勿过度依赖Redis的过期监听

Redis监听过期key不及时问题

5. 方案四:用Redis ZSet实现延迟任务

上面讲了基于JDK的延迟任务的实现方式,缺点是基于单体应用的内存的方式运行,未持久化,一旦出现单点故障,可能出现延时任务数据的丢失;这里介绍Redis ZSet实现延时任务的方式,Redis的持久化、多节点部署天然的可以解决单点故障的问题;

先介绍一下ZSet的应用特性,ZSet作为Redis的有序集合数据结构存在,排序的依据就是score;有点类似上面介绍的JDK中的DelayQueue的原理,DelayQueue使用优先级队列(实现Comparator接口来定义优先关系)来维护不同延迟时间任务的顺序;而ZSet就是通过分数score来排列顺序的,支持通过score的范围来获取元素的列表,具体是通过跳表和哈希表共同完成,这里不再赘述底层实现;

原理很简单,类似JDK DelayQueue的方案,将score定义为目标执行时间,即任务生成时间 + 延时时间,ZSet按照score进行排序,最先执行的任务会排在前面,接下来只需要开启Redis扫描任务,获取"当前时间 > score"的延时任务并执行即可;因为是基于Redis命令操作,因此性能可以保证,定时任务的周期可以适当短一点;

下面给一个简单的订单场景的代码示例:

@Component
public class OrderDelayService  implements InitializingBean {
  //redis zset key
  public static final String ORDER_DELAY_TASK_KEY = "delaytask:order";

  @Resource
  private StringRedisTemplate stringRedisTemplate;

  //生成订单-order为订单信息,可以是订单流水号,用于延时任务达到时效后关闭订单
  public void produce(String orderSerialNo){
    stringRedisTemplate.opsForZSet().add(
            ORDER_DELAY_TASK_KEY,     // redis key
            orderSerialNo,    // zset  member
            System.currentTimeMillis() + (30 * 60 * 1000)    //zset score 30分钟延时
    );
  }

  //延时任务,也是异步任务,延时任务达到时效之后关闭订单,并将延时任务从redis zset删除
  @Async("test")
  public void consuming(){
       
      Set<ZSetOperations.TypedTuple<String>> orderSerialNos = stringRedisTemplate.opsForZSet().rangeByScoreWithScores(
              ORDER_DELAY_TASK_KEY,
              System.currentTimeMillis() - (30 * 60 * 1000),  //延时任务score最小值 最小为30分钟前的记录
              System.currentTimeMillis() //延时任务score最大值(当前时间)
      );
      if (!CollectionUtils.isEmpty(orderSerialNos)) {
        for (ZSetOperations.TypedTuple<String> orderSerialNo : orderSerialNos) {
          //这里根据orderSerialNo去检查用户是否完成了订单支付
          //如果用户没有支付订单,去执行订单关闭的操作
          System.out.println("订单" + orderSerialNo.getValue() + "超时被自动关闭");
          //订单关闭之后,将订单延时任务从队列中删除
          stringRedisTemplate.opsForZSet().remove(ORDER_DELAY_TASK_KEY, orderSerialNo.getValue());
        }
      }
  }

  //该类对象Bean实例化之后,就开启while扫描任务
  @Override
  public void afterPropertiesSet() throws Exception {
    new Thread(() -> {  //开启新的线程,否则SpringBoot应用初始化无法启动
      while(true){
        try {
          Thread.sleep(5 * 1000);   //每5秒扫描一次redis库获取延时数据,不用太频繁没必要
        } catch (InterruptedException e) {
          e.printStackTrace();  //本文只是示例,生产环境请做好相关的异常处理
        }
        consuming();
      }
    }).start();
  }
}

这种方案以及前面提及的JDK中的延迟队列,其本质都是优先级队列 + 定时任务扫描的方案,可见定时任务方案其实一点也不low,主要看怎么去实现,以及权衡方案实现的成本及带来的问题;总之,适合自己的业务场景即可;

6. 方案五:使用Redisson DelayQueue 延迟队列

先简单介绍下Redisson,将我们常用的Jedis与其做一个对比:Jedis是Redis的Java实现的客户端,其API提供了比较全面的Redis命令的支持;而Redisson实现了分布式和可扩展的Java数据结构,和Jedis相比,功能较为简单,不支持字符串操作,不支持排序、事务、管道、分区等Redis特性;

Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上;

换句话说,就是Jedis中的Java方法基本和Redis的API保持着一致,了解Redis的API,也就能熟练的使用Jedis;而Redisson中的方法则是进行比较高的抽象,每个方法调用可能进行了一个或多个Redis方法调用;它还提供了一系列的分布式Java常用对象,基本可以与Java的基本数据结构通用,此外还提供了许多分布式服务;

Redisson使用非阻塞的I/O和基于Netty框架的事件驱动的通信层,相对于使用阻塞的I/O的Jedis,其方法调用是异步的;支持Redis 2.8以上版本,支持Java1.6+以上版本;

先给个代码示例:

1. 定义延迟队列、阻塞队列以及添加延迟任务(消息)的方法;

可以看到这里有2个队列:监听的目标队列RBlockingQueue和中转队列RDelayedQueue;RDelayedQueue会把过期的消息放入到我们的目标队列中,我们只要从RBlockingQueue队列中取数据即可;

@Slf4j
@Component
public class RedissonDelayQueueClient implements InitializingBean {

    @Resource
    private RedissonClient redissonClient;

    private final Map<String, RDelayedQueue<DelayMessage>> delayQueueMap = new ConcurrentHashMap<>(16);

    public void addDelayMessage(DelayMessage delayMessage) {
        log.info("delayMessage={}", delayMessage);
        if (delayQueueMap.get(delayMessage.getQueueName()) == null) {
            log.warn("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());
            return;
        }
        delayMessage.setCreateTime(DateUtil.getNowFormatStr());
        RDelayedQueue<DelayMessage> rDelayedQueue = delayQueueMap.get(delayMessage.getQueueName());
        rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit() == null ? TimeUnit.SECONDS : delayMessage.getTimeUnit());
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumer,并且service名称为 ${queueName}Consumer
        List<String> queueNameList = Lists.newArrayList("orderAutoCancelDelayQueue");

        // 加载延迟队列
        for (String queueName : queueNameList) {
            DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueName + "Consumer");
            if (delayQueueConsumer == null) {
                throw new RuntimeException("queueName=" + queueName + ",delayQueueConsumer=null,请检查配置...");
            }
            // Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,
            // 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。
            RBlockingQueue<DelayMessage> rBlockingQueue = redissonClient.getBlockingDeque(queueName);
            RDelayedQueue<DelayMessage> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
            delayQueueMap.put(queueName, rDelayedQueue);

            // 订阅新元素的到来,调用的是takeAsync(),异步执行
            rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);
        }
    }
}

2. 定义消息的消费者及处理消息的方法

public interface DelayQueueConsumer {

    /**
     * 执行延迟消息
     *
     * @param delayMessage delayMessage
     */
    void execute(DelayMessage delayMessage);

}

// ----- //

@Service("orderAutoCancelDelayQueueConsumer")
@Slf4j
public class OrderAutoCancelDelayQueueConsumer implements DelayQueueConsumer {

    @Override
    public void execute(DelayMessage delayMessage) {
        log.info("====OrderAutoCancelConsumer=====delayMessage={}", delayMessage);
    }
}

做个简单的原理分析,Redisson实现延迟队列的思路与上述的Redis ZSet方案部分类似,实际上,Redisson使用了两个list + 一个sorted-set + pub/sub来实现延时队列,而不是单一的仅监听sort-set

  • sorted-set:存放未到期的消息,score为到期时间,提供消息延时排序功能;
  • list-0:存放未到期消息,作为消息的原始顺序视图,提供如查询、删除指定第几条消息的功能(分析源码得出的,查看哪些地方有使用这个list);
  • list-q:消费队列(阻塞队列),存放到期后的消息,提供消费;

 总结下来步骤如下:

第一步:客户端启动,Redisson先订阅一个key;同时 BLPOP key 0,无限监听一个阻塞队列(等里面有数据了就返回);
第二步:当有延迟任务数据生产时,Redisson先把数据放到一个ZSet集合(按延时到期时间的时间戳为分数排序),同时发布上面订阅的key,发布内容为数据到期的timeout;

第三步:客户端收到订阅的队列的通知后,就在自己进程里面开启延时任务(基于Netty的时间轮HashedWheelTimer),延时时间为发布内容timeout;
第四步:客户端进程的延时任务到了时间执行,从ZSet分页取出已过期(timeout小于当前时间)的数据,然后将数据rpush到第一步的阻塞队列里,然后将当前数据从ZSet移除,取完之后,又执行 BLPOP key 0 继续无限监听阻塞队列;
第五步:上一步客户端监听的阻塞队列返回取到数据,回调到 RBlockingQueue 的 take方法;于是,我们就收到了数据;

可见,Redisson不是通过轮询ZSet的,而是将延时任务执行放到进程里面实现,只有到了timeout时间才会从Redis ZSet取数据;实际上,Redisson里面还有很多异常,重试机制,这里不再展开;源码分析可以参考Redisson延迟队列实现原理探究基于redisson的延迟队列实践这两篇文章;

本节参考:

Redisson使用手册 - BookStack

Jedis与Redisson选型对比

基于Redisson的延迟队列实现

7. 方案六:RabbitMQ实现延时队列

RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能;

  • 特性1 Time To Live (TTL) 

消息处理支持过期时间,RabbitMQ针对队列中的消息处理过期时间有两种方法可以设置;

A. 通过队列属性设置(对Queue设置x-expires),队列中所有消息都有相同的过期时间;
B. 对消息进行单独设置(对Message设置x-message-ttl),每条消息TTL可以不同;

如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准;消息在队列的生存时间一旦超过设置的TTL值,就成为死信deadletter

注意两种设置过期时间的区别

A. 如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列则会被丢到死信队列中);
B. 而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间

  • 特性2 Dead Letter Exchanges(DLX)

死信队列的设计目的是为了存储没有被正常消费的消息,便于排查和重新投递;

RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了死信deadletter,则按照这两个参数重新路由转发到指定的队列;

x-dead-letter-exchange:出现deadletter之后将deadletter重新发送到指定exchange;
x-dead-letter-routing-key:出现deadletter之后将deadletter重新按照指定的routing-key发送

队列出现死信deadletter的情况有:

  • 消息或者队列的TTL过期
  • 队列达到最大长度
  • 消息被消费端拒绝(basic.rejectorbasic.nack)并且requeue=false

综合上述两个特性,实现延迟消息的一种方案为:

(1)对队列设置了TTL规则;因此需要预先设定好多个不同TTL档位的队列用来生产死信,队列不配置消费节点,相当于队列是暂存消息的"容器";注意:对队列而非消息设置TTL的原因是保证消息一过期,就会被丢到死信队列中;
(2)将以上多个生产死信的不同档位的队列绑定同一个死信交换机exchange,消息按照routingKey投递到死信消费队列;延迟消息的消费者监听这个队列;

以上方案存在的问题:

延迟时间TTL不能任意指定,二是提前固定几种延迟时间档位;如果要增加一个新的时间需求,就要新增一个队列;如果不能实现消息粒度上的TTL,并使其在任意设置的TTL时间及时死亡,就无法设计成一个通用的延时队列;

好在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来配合实现延迟队列功能;同时插件依赖Erlang/OPT 18.0及以上;

该插件下,会新增一种新的交换机类型,该类型下的消息支持延迟投递机制,消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中;

相关源码可参考:

RabbitMQ 延迟队列详解

Rabbitmq 实现延迟队列的两种方式

一文带你搞定RabbitMQ延迟队列

此外,为了减小MQ的压力,可能需要结合定时任务一起实现;因为例如订单,订单在不同的流转状态中都会产生这种超时处理,消息会堆积很长的时间,每天百万单,这种消息堆积轻松上千万,对MQ本身会造成很大的压力;因此,可以通过定时任务提前将一定时间段内的记录投递到MQ而非直接丢入,可以一定程度环节MQ的压力;

基于RabbitMQ实现延迟队列的方案存在的缺点:需要公司的消息中间件团队支持以上的配置,并且公司采用的消息中间件可能是更加主流的RocketMQ,这种情况下不建议自己接入原生的RabbitMQ;

其他消息中间件延迟消息

来看下目前业界主要开源的消息中间件对定时消息/延迟消息的支持情况;

上图是阿里云上对业界MQ功能的对比,其中开源产品中只有阿里的RocketMQ支持延迟消息,且是固定的18个Level;固定Level的含义是延迟是特定级别的,比如支持3秒、5秒的Level,那么用户只能发送3秒延迟或者5秒延迟,不能发送8秒延迟的消息;消息队列RocketMQ的阿里云版本(收费版本)才支持到精确到秒级别的延迟消息(没有特定Level的限制);

题外话,使用Redis过期监听或者RabbitMQ死信队列做延时任务,都是以中间件设计者预想之外的方式使用中间件,这种"骚操作"通常会存在某些隐患,比如缺乏可靠性保证、带来更大CPU消耗、资源泄漏等;

比较出名的一个事例是很多人使用Redis的List作为消息队列,但实际上存在诸多问题:1.消费者下线,数据会丢失;2.不支持数据持久化,Redis 宕机,数据也会丢失;3.消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失;以致于最后作者看不下去写了Disque并最后演变为Redis消息队列——Redis Stream工作中还是尽量不要滥用中间件,用专业的中间件做专业的事,少玩一些过度自信的操作,如果要用请先仔细阅读官方文档;

8. 方案七:时间轮算法

时间轮是一种很优秀的定时任务的数据结构,先来简单了解一下netty时间轮算法的原理;

顾名思义,从上图看,时间轮确实是一个"轮子",实际上是个环形数组的数据结构;

举个例子,如图它将时间轮分成8个bucket,假设每个时间轮轮片的分隔时间段tickDuration=1s(即:指针经过每个格子花费时间是1s),若当前的时刻指向bucket=3,那么在18秒后需要被执行的任务需要落到((3+18)%8=5取余运算)的5号bucket上,且还要需要经过2圈((3+18)/8=2除法运算);假如有多个需要在该时间段内执行的任务,就会组成一个双向链表;

时间轮算法的精确度取决于每个时间轮轮片的分隔时间段tickDuration;时间轮指针是一个Worker线程,在时间轮整点的时候执行双向链表中的任务;Worker线程是单线程,一个bucket、一个bucket的顺序处理任务;所以我们一般将延时任务做成异步执行,避免阻塞后续的任务执行;

使用时间轮算法在Kafka、Netty中都有实现和对应的API,下面以Netty中的时间轮算法给一个代码示例;

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.45.Final</version>
</dependency>

1. 定义一个包含512个bucket的时间轮,每个时间轮的轮片时间间隔是100毫秒;

@Bean("hashedWheelTimer")
public HashedWheelTimer hashedWheelTimer(){
    return new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
}

2. 生成创建延迟任务,延时任务将在30分钟之后被执行,下文的lambda表达式实现了一个TimerTask延时任务;

//订单下单操作
void order(String orderInfo) {
  //下单的时候,向时间轮中添加一个30分钟的延时任务
  hashedWheelTimer.newTimeout(task -> {
    //注意这里使用异步任务线程池或者开启线程进行订单取消任务的处理
    cancelOrder(orderInfo);
  }, 30, TimeUnit.MINUTES);
}

相对于使用JDK的DelayQueue,时间轮算法实现延时任务其算法上具有优势,执行性能相对好一些;其缺点就是所有的延时任务以及延时触发的管理,都是在单个应用服务的内存中进行的,一旦该应用服务发生故障重启服务,时间轮任务数据将全部丢失;这一缺点和DelayQueue是一样的;

由于绝大多数时间轮实现是纯内存没有持久化的,因此在涉及延迟任务方案时,一定要考虑任务数据的持久化以及时间轮进程崩溃之后的恢复方案;

本章参考:

延时任务-基于netty时间轮算法实现

Netty时间轮延时任务-腾讯云

秒懂 Kafka 时间轮(TimingWheel) - 知乎

浅谈时间轮算法

9. 小结

  • 首先推荐使用RocketMQ等现成的具备延迟消息功能的消息队列,最好是公司的中间件天然支持,尽量避免自己去尝试"造轮子";
  • 可以考虑使用Redisson DelayQueue等基于Redis的延时队列方案,但要为Redis节点崩溃等情况设计补偿保护机制;同时需要考虑延迟任务的数据量大时带来的延迟问题(这篇文章《Redisson延时队列详解》里面有说明);
  • 可以考虑使用时间轮算法方案,由于时间轮重启远比Redis重启要频繁,定因此时需要考虑扫库等保护机制;
  • 最后一点,不要使用Redis过期监听实现定时任务,因为时间消息不准时且过期时间的投递不可靠;

参考:

今日讨论:为什么会有待付款时间_腾讯新闻

领导:谁再用定时任务实现关闭订单,立马滚蛋! - 掘金

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

代码技巧——如何关闭订单?延迟任务的实现方案【建议收藏】 的相关文章

  • 有没有办法让特定的key在集群模式下定位到特定的redis实例上?

    我想让我的多锁位于不同的redis实例上 我发现redission可以指定一个实例来执行命令 但是如果该命令与key相关 则指定的实例会将命令传输到另一个实例 你能给我一些建议吗 你可以 但这并不是微不足道的 首先 Redis 在键中使用大
  • StackExchange.Redis的正确使用方法

    这个想法是使用更少的连接和更好的性能 连接会随时过期吗 对于另一个问题 redis GetDatabase 打开新连接 private static ConnectionMultiplexer redis private static ID
  • 想要在后台不间断地运行redis-server

    我已经下载了 redis 2 6 16 tar gz 文件并安装成功 安装后我运行 src redis server 它工作正常 但我不想每次都手动运行 src redis server 而是希望 redis server 作为后台进程持续
  • Redis+Docker+Django - 错误 111 连接被拒绝

    我正在尝试使用 Redis 作为使用 Docker Compose 的 Django 项目的 Celery 代理 我无法弄清楚我到底做错了什么 但尽管控制台日志消息告诉我 Redis 正在运行并接受连接 事实上 当我这样做时 docker
  • 如何将“.csv”数据文件导入Redis数据库

    如何将 csv 数据文件导入 Redis 数据库 csv 文件中包含 id 时间 纬度 经度 列 您能否向我建议导入 CSV 文件并能够执行空间查询的最佳方法 这是一个非常广泛的问题 因为我们不知道您想要什么数据结构 您期望什么查询等等 为
  • 节点应用程序之间共享会话?

    我目前有两个独立的节点应用程序在两个不同的端口上运行 但共享相同的后端数据存储 我需要在两个应用程序之间共享用户会话 以便当用户通过一个应用程序登录时 他们的会话可用 并且他们似乎已登录到另一个应用程序 在本例中 它是一个面向公众的网站和一
  • 当 Jedis 与 Spring Data 一起使用时,为什么数据会以奇怪的键存储在 Redis 中?

    我将 Spring Data Redis 与 Jedis 一起使用 我正在尝试存储带有密钥的哈希值vc list id 我能够成功插入到redis 但是 当我使用 redis cli 检查密钥时 我没有看到密钥vc 501381 相反我看到
  • 由于配置文件错误,无法启动 Redis 服务器

    我刚刚按照此处的说明安装了 Redis http redis io download http redis io download 当我运行 redis server redis conf 时出现以下错误 FATAL CONFIG FILE
  • Redis 中存储整数和字符串的区别

    这两个命令有什么区别吗 LPUSH myset 123 LPUSH myset 123 我想存储大约 500 万个整数 并且我想以最有效的方式做到这一点 不 没有什么区别 两者都存储为字符串 从redis io http redis io
  • Web API 缓存 - 如何使用分布式缓存实现失效

    我有一个 API 目前不使用任何缓存 我确实有一个正在使用的中间件 它可以生成缓存标头 Cache Control Expires ETag Last Modified 使用https github com KevinDockx HttpC
  • ServiceStack PooledRedisClientManager 故障转移如何工作?

    根据 git commit 消息 ServiceStack 最近添加了故障转移支持 我最初认为这意味着我可以关闭我的一个 Redis 实例 并且我的池客户端管理器将优雅地处理故障转移并尝试与我的备用 Redis 实例之一连接 不幸的是 我的
  • 在 Rails 应用程序上将 HASH 保存到 Redis

    我刚刚开始使用 Redis 和 Rails 所以这可能是一个愚蠢的问题 我试图将哈希值保存到 Redis 服务器 但是当我检索它时 它只是一个字符串 IE hash field gt value field2 gt value2 redis
  • 使用通配符查找键

    我已经使用分号保存了数据 redis gt keys party 1 party congress president 2 party bjp president 3 party bjp 4 party sena 是否有任何命令可以列出所有
  • Redis - 错误:值不是有效的浮点数

    我在 Redis 中有一个排序集 我试图通过在Python代码中使用zincrby来更新特定元素的计数器值 例如 conn zincrby usersSet float 1 user1 但它显示错误为 错误 值不是有效的浮点数 我在 cli
  • 执行 SET {Key} 超时,inst: 0,mgr: Inactive,queue: 2, qu=1, qs=1, qc=0, wr=1/1, in=0/0

    我正在尝试使用 StackExchange Redis 客户端将 90 KB pdf 文件保存到 Azure Redis 缓存中 我已将该文件转换为字节数组并尝试使用 stringSet 方法保存它并收到错误 Code byte bytes
  • Flask-SocketIO redis 订阅

    我在用着https github com miguelgrinberg Flask SocketIO https github com miguelgrinberg Flask SocketIO实现 WebSocket 服务器 我需要从另一
  • Spring-boot中将redis-cache反序列化为对象的问题

    我在 Client 类中使用 JsonNode 来处理 MySQL 8 数据库中 JSON 类型的字段 即使对于 API 请求 它也能很好地工作 但是当我使用 Redis 启用缓存 我确实需要它 时 我注意到 Redis 无法序列化 Jso
  • 如何在Redis中存储聚合目录树搜索结果

    我有一个很大的产品目录树 目前包含约 36000 个类别和约 100 万个产品 即叶子 它的结构如下 最大深度为 5 Cat1 Cat11 Cat111 Cat1111 Product1 Cat1112 Product1 Cat1113 P
  • 无法通过节点应用程序连接到redis,两者都在docker中

    我正在尝试将我的应用程序连接到 redis 但我得到 ioredis Unhandled error event Error connect ECONNREFUSED 127 0 0 1 6379 当我做 docker exec it ed
  • 将redis数据移至MySQL的更快方法

    我们拥有庞大的购物和产品交易系统 我们在 MySQL 方面遇到了很多问题 因此经过几次研发后 我们计划使用 Redis 并开始将 Redis 集成到我们的系统中 继之前直接访问数据库之后 现在我们已经移动了Redis系统 用户购物车详情 关

随机推荐

  • spring系列文章(一) 关于IDEA中 add framework support没有web application选项的问题

    今天在回顾smm框架时发现一个问题 也就是在add framework support没有web application选项的问题 问题原因 说明你未完全添加web支持 但是系统认定你有web支持 为什么你没有web application
  • 云安全技术——Hyper-V虚拟化技术

    目录 5 1 了解Hyper V 5 2 安装Hyper V 5 3创建虚拟机 Hyper V虚拟化技术 实验目的 了解 Windows 虚拟化的概念 了解 Hyper V的背景及发展 了解 Hyper V 的功能特性 了解 Hyper V
  • 虚幻4学习笔记(5)开关门、使用蓝图开关门、按键和鼠标点击开关门

    开关门 开关门 前置准备 单独设置开关门 设置时间轴 定义蓝图类 完成开关门 针对中心轴 在物体中心 不在右下角的调整方法 世界坐标轴 绝对坐标轴 不会改变 设置按键开关门 鼠标开关门 B站UP谌嘉诚课程 https www bilibil
  • 华为招公关总监:接触近10位路透记者 年薪20万美元

    原标题 华为招聘公关总监 接触近10位路透资深记者 年薪高达20万美元 TechWeb 3月7日消息 据国外媒体报道 华为正在招聘公关总监 开出的年薪最高达20万美元 最近华为招聘人员接触了近10位路透资深记者 为公关总监职位招人 给其中几
  • openwrt x86 版安装纪实

    1 下载源码 已有编译环境 直接在ubuntu 中 git openwrt 源码 https dev openwrt org wiki GetSource git clone b chaos calmer git github com op
  • 成功解决 error: reference to ‘xx‘ is ambiguous

    解决问题 error reference to xx is ambiguous 解决思路 1 错误代码 list int malloc sizeof int n 2 错误原因 翻译 错误 对 xx 的引用不明确 这句话翻译出来后应该已经差不
  • 巅峰对决之Swarm、Kubernetes、Mesos

    转载自 http dockone io article 1138 感谢作者和编者的分享 编者的话 这篇文章对比了三大主流调度框架 Swarm Kubernetes和Mesos 文章不仅从理论上讨论了各个框架的优缺点 还从两个实际的案例出发
  • 20210429# Python解释器的下载和安装

    backTo 20210428 工具使用 https blog csdn net qq 17079255 article details 115357016 目录 目标 一 解释器的作用 二 下载Python解释器 三 安装Python解释
  • 华为OD机试 - 数字字符串组合倒序(Java)

    题目描述 对数字 字符 数字串 字符串 以及数字与字符串组合进行倒序排列 字符范围 由 a 到 z A 到 Z 数字范围 由 0 到 9 符号的定义 作为连接符使用时作为字符串的一部分 例如 20 years 作为一个整体字符串呈现 连续出
  • 精选案例

    顺应 十四五 规划中关于 加快金融机构数字化转型 要求 中国人民银行印发了 金融科技发展规划 2022 2025年 近几年来 金融行业牢牢占据着国内产业数字化转型市场投入的榜首位置 IDC调查显示 2022上半年 中国金融云市场规模达到34
  • 实现内存的整页分配

    1 位图和内存池 位图 位图中的一位表示物理内存中的一页是否被分配 API见blog 位图API 内存池 建立内存池对位图进行操作 分配页内存 在分页机制下有虚拟和物理两种地址 分别为了管理 需要创建虚拟内存地址池和物理内存地址池 内存池
  • 2022蓝桥杯省赛b组补题[九进制转十进制],[顺子日期],[刷题统计],[ 修剪灌木]

    九进制转十进制 九进制转十进制 蓝桥云课 lanqiao cn 代码 include
  • RTMP/RTP/RTSP/RTCP的区别

    简介 用一句简单的话总结 RTSP发起 终结流媒体 应用层 RTP传输流媒体数据 传输层 RTCP对RTP进行控制 同步 传输层 之所以以前对这几个有点分不清 是因为CTC标准里没有对RTCP进行要求 因此在标准RTSP的代码中没有看到相关
  • lapack c语言,Visual C ++ 2010和Lapack,Blas库(Visual C++ 2010 and Lapack, Blas libraries)

    Visual C 2010和Lapack Blas库 Visual C 2010 and Lapack Blas libraries 我想使用Blas和Lapack库来使用一些rutines 但我不知道如何在Visual C 2010使用它
  • word2016如何插入目录以及页码

    不废话 直接写入步骤 具体步骤如下 插入目录 第一步 切换到视图 在视图页面点击大纲视图 第二步 左上角设置各个标题的级别 如下 分别点击引用 目录 选择一个即可设置好目录 第二步的图片 从第二页插入页码 双击调出页眉页脚 设置页码格式 起
  • 刃边法计算MTF(ESF、LSF、PSF)

    MTF 调制传递函数 评价一个成像系统目前主流的办法主要有三种TV line检测 MTF检测 和SFR检测 MTF是Modulation Transfer Function的英文简称 中文为调制传递函数 是指调制度随空间频率变化的函数称为调
  • 自学网络安全究竟该从何学起?

    一 为什么选择网络安全 这几年随着我国 国家网络空间安全战略 网络安全法 网络安全等级保护2 0 等一系列政策 法规 标准的持续落地 网络安全行业地位 薪资随之水涨船高 未来3 5年 是安全行业的黄金发展期 提前踏入行业 能享受行业发展红利
  • IDEA旗舰版安装与概述

    1 IDEA介绍 IDEA 全称 IntelliJ IDEA 是java编程语言开发的集成环境 IntelliJ在业界被公认为最好的java开发工具 尤其在智能代码助手 代码自动提示 重构 JavaEE支持 各类版本工具 git svn等
  • Cobalt Strike Malleable C2

    郑重声明 本笔记编写目的只用于安全知识提升 并与更多人共享安全知识 切勿使用笔记中的技术进行违法活动 利用笔记中的技术造成的后果与作者本人无关 倡导维护网络安全人人有责 共同维护网络文明和谐 Cobalt Strike Malleable
  • 代码技巧——如何关闭订单?延迟任务的实现方案【建议收藏】

    先思考个问题 为什么要关闭订单 业务上 1 提供待付款时间 而不是简单的 一次付款机会 提高业务指标之一的成单率 成单率 成功下单的人数 发起支付的人数 2 下单成功意味着这个商品被当前订单占用 库存已经预扣减 如果迟迟不支付则需要回收库存