一、背景
开发过程中或多或少会遇到某些场景,要求数据在规定的时间内如果没处理就要失效掉;
如:用户下单,订单在30分钟内没支付就要自动取消,防止长时间占用库存等;
面对这种情况我们来扒拉一下:
- 系统启一个定时任务,定时扫库,取出超过30分钟的数据,取消释放库存
- 用户下单后将订单放入MQ的延迟队列(比如RocketMQ的延迟消息),依赖于MQ的通知实时触发取消
- 将数据放入Redis,定时扫描Redis
- 将数据放入收尾相连的环形链路中,依次触发超时
二、实现思路
1、定时任务扫库
在分布式定时任务系统中启动一个定时任务,定时任务时间可根据实际业务定义(1分钟、5分钟)
存在问题:
- 如果是单库:通常数据量比较多,如果每秒扫描可能比较耗资源,往往是每分钟或、、、超时有误差
- 如果是分库分表:需要扫描全量库表,
好处:
- 实现相对简单,就是一次普通的查询
数据量少的情况下可以使用,注意查询条件要配以索引,否则会非常慢
2、依赖MQ的延迟队列实现
以RocketMQ为例:
当Producer端发送延迟消息时,Broker会创建一个SCHEDULE_TOPIC*开头的Topic,然后以该topic持久化
然后给每个topic启动定时任务扫描,将数据放入真正的Topic推送到Consumer端消费
存在问题:有些消息队列不支持延迟队列比如rabbit,需要依赖插件;rocketmq延迟时间只支持配置的枚举时间,有一定局限性
3、基于redis定时--全量扫描实现
这种实现跟第一种基于数据库基本类似,就是将数据库换成redis
存在问题:每次都是全量数据拉取,如果数据量过大会这种资源占用,再说redis单线程处理,要防止过大数据拉取
4、基于redis--将数据放入List数据结构
放数据的时候使用rightPush函数从右测逐个放入
启动定时任务(每秒)取(index(0)函数)List第一个,判断是否超时
如果没超时:结束本次
如果超时:获取前100个(按业务自定义数量)判断是否超时,如果全部超时再取100判断,如果循环遇到未超时的结束本次,并将超时的从reids删除leftPop()
好处:相对上种来说,避免了全量拉取数据
存在问题:可能出现多次网络交互,每次拉取数量要按照自己业务来调整
5、基于redis--将数据存入map
本地启动一个定时任务(每秒),redis维护一个变量
订单入库时:先取reids里的变量,然后以该变量命名新建map,将数据放入map
每跑一次定时任务变量++,然后拉取该map下的订单,判断超时的对应删除
变量==到达(超时时间秒数)个数时,将变量赋值1
这样就生成了一个收尾相连的环形List<Map>
好处:数据精确到秒级失效,每次从redis拉取的数据最多是2秒内的数据,数据量可空
存在问题:如果超时时间相对较长,Redis里将创建过多的map
三、代码实现
针对第5种实现
1、订单进入时的操作
private static final String ANNULAR_NUM ="ORDER:INDEX_NUM";
public void annularAdd(String orderNo){
//ANNULAR_NUM,自增变量(如:1-120)
Object obj = redisTemplate.opsForValue().get(ANNULAR_NUM);
if (obj!=null){
//num 当前的自增变量值(1-120)
Integer num = Integer.valueOf(obj.toString());
redisTemplate.opsForHash().put("ORDER:MAP:"+num,
orderNo,
LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()+"");
}
}
查询reids自增变量,如果不为空就将订单放入对应的map
2、启动定时任务(每秒)
private static final String ANNULAR_NUM ="ORDER:INDEX_NUM";
public void annularAddSchedule() {
Object obj = redisTemplate.opsForValue().get(ANNULAR_NUM);
int num=obj!=null ? Integer.valueOf(obj.toString()) : 0;
Long increment=1L;
//如果当前redis自增数值大于等于超时时间秒数,就置为初始值
if (num >= 120) {
redisTemplate.opsForValue().set(ANNULAR_NUM,increment+"");
}else {
increment = redisTemplate.opsForValue().increment(ANNULAR_NUM);
}
System.out.println("当前map"+increment);
Map map = redisTemplate.opsForHash().entries("ORDER:MAP:" + increment);
List<String> orders = new ArrayList<>();
long timeout = LocalDateTime.now().plusSeconds(-119).toInstant(ZoneOffset.of("+8")).toEpochMilli();
map.forEach((key, value) -> {
Long aLong = Long.valueOf(value.toString());
//如果当前时间-超时时间>订单的超时时间,说明订单已超时
if (timeout > aLong) {
orders.add(key.toString());
}
});
System.out.println("超时的orders=========" + orders.size());
if (orders != null && orders.size() > 0) {
Long delete = redisTemplate.opsForHash().delete("ORDER:MAP:" + increment, orders.toArray());
//使用线程池处理自己业务逻辑
executorService.execute(() -> {
userInterface.getUser(JSON.toJSONString(orders));
});
System.out.println("delete=========" + delete);
}
}
公众号主要记录各种源码、面试题、微服务技术栈,帮忙关注一波,非常感谢