redis分布式锁

2023-05-16

1. Redis分布式锁最简单的实现

想要实现分布式锁,必须要求 Redis 有「互斥」的能力,我们可以使用 SETNX 命令,这个命令表示SET if Not Exists,即如果 key 不存在,才会设置它的值,否则什么也不做。

1.1redis加锁的流程

1.加锁set key uuid ex time nx

2.操作 共享资源

3.释放锁:lua脚本(原子性),先get判断锁是否属于自己,在del锁

1.2两个客户端进程可以执行这个命令,达到互斥,就可以实现一个分布式锁。

客户端 1 申请加锁,加锁成功:

客户端 2 申请加锁,因为它后到达,加锁失败:

此时,加锁成功的客户端,就可以去操作「共享资源」,例如,修改 MySQL 的某一行数据,或者调用一个 API 请求。

操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。如何释放锁呢?

也很简单,直接使用 DEL 命令删除这个 key 即可,这个逻辑非常简单。

但是,它存在一个很大的问题,当客户端 1 拿到锁后,如果发生下面的场景,就会造成「死锁」:

1、程序处理业务逻辑异常,没及时释放锁

2、进程挂了,没机会释放锁

这时,这个客户端就会一直占用这个锁,而其它客户端就「永远」拿不到这把锁了。怎么解决这个问题呢?

如何避免死锁?

我们很容易想到的方案是,在申请锁时,给这把锁设置一个「租期」。

在 Redis 中实现时,就是给这个 key 设置一个「过期时间」。这里我们假设,操作共享资源的时间不会超过 10s,那么在加锁时,给这个 key 设置 10s 过期即可:

SETNX lock 1    // 加锁
EXPIRE lock 10  // 10s后自动过期

这样一来,无论客户端是否异常,这个锁都可以在 10s 后被「自动释放」,其它客户端依旧可以拿到锁。

但现在还是有问题:

现在的操作,加锁、设置过期是 2 条命令,有没有可能只执行了第一条,第二条却「来不及」执行的情况发生呢?例如:

* SETNX 执行成功,执行EXPIRE  时由于网络问题,执行失败
* SETNX 执行成功,Redis 异常宕机,EXPIRE 没有机会执行
* SETNX 执行成功,客户端异常崩溃,EXPIRE也没有机会执行

总之,这两条命令不能保证是原子操作(一起成功),就有潜在的风险导致过期时间设置失败,依旧发生「死锁」问题。

在 Redis 2.6.12 之后,Redis 扩展了 SET 命令的参数,用这一条命令就可以了:

SET lock 1 EX 10 NX

2锁被别人释放怎么办?

上面的命令执行时,每个客户端在释放锁时,都是「无脑」操作,并没有检查这把锁是否还「归自己持有」,所以就会发生释放别人锁的风险,这样的解锁流程,很不「严谨」!如何解决这个问题呢?

解决办法是:客户端在加锁时,设置一个只有自己知道的「唯一标识」进去。

例如,可以是自己的线程 ID,也可以是一个 UUID(随机且唯一),这里我们以UUID 举例:

 SET lock $uuid EX 20 NX

 之后,在释放锁时,要先判断这把锁是否还归自己持有,伪代码可以这么写:

if redis.get("lock") == $uuid:
    redis.del("lock")

 这里释放锁使用的是 GET + DEL 两条命令,这时,又会遇到我们前面讲的原子性问题了。这里可以使用lua脚本来解决。

安全释放锁的 Lua 脚本如下:

if redis.call("GET",KEYS[1]) == ARGV[1]
then
    return redis.call("DEL",KEYS[1])
else
    return 0
end

好了,这样一路优化,整个的加锁、解锁的流程就更「严谨」了。

这里我们先小结一下,基于 Redis 实现的分布式锁,一个严谨的的流程如下:

1、加锁

SET lock_key $unique_id EX $expire_time NX

2、操作共享资源

3、释放锁:Lua 脚本,先 GET 判断锁是否归属自己,再DEL 释放锁

Java代码实现分布式锁

package com.msb.redis.lock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;

import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 分布式锁的实现
 */
@Component
public class RedisDistLock implements Lock {

    private final static int LOCK_TIME = 5*1000;
    private final static String RS_DISTLOCK_NS = "tdln:";
    /*
     if redis.call('get',KEYS[1])==ARGV[1] then
        return redis.call('del', KEYS[1])
    else return 0 end
     */
    private final static String RELEASE_LOCK_LUA =
            "if redis.call('get',KEYS[1])==ARGV[1] then\n" +
                    "        return redis.call('del', KEYS[1])\n" +
                    "    else return 0 end";
    /*保存每个线程的独有的ID值*/
    private ThreadLocal<String> lockerId = new ThreadLocal<>();

    /*解决锁的重入*/
    private Thread ownerThread;
    private String lockName = "lock";

    @Autowired
    private JedisPool jedisPool;

    public String getLockName() {
        return lockName;
    }

    public void setLockName(String lockName) {
        this.lockName = lockName;
    }

    public Thread getOwnerThread() {
        return ownerThread;
    }

    public void setOwnerThread(Thread ownerThread) {
        this.ownerThread = ownerThread;
    }

    @Override
    public void lock() {
        while(!tryLock()){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException("不支持可中断获取锁!");
    }

    @Override
    public boolean tryLock() {
        Thread t = Thread.currentThread();
        if(ownerThread==t){/*说明本线程持有锁*/
            return true;
        }else if(ownerThread!=null){/*本进程里有其他线程持有分布式锁*/
            return false;
        }
        Jedis jedis = null;
        try {
            String id = UUID.randomUUID().toString();
            SetParams params = new SetParams();
            params.px(LOCK_TIME);
            params.nx();
            synchronized (this){/*线程们,本地抢锁*/
                if((ownerThread==null)&&
                "OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))){
                    lockerId.set(id);
                    setOwnerThread(t);
                    return true;
                }else{
                    return false;
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("分布式锁尝试加锁失败!");
        } finally {
            jedis.close();
        }
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        throw new UnsupportedOperationException("不支持等待尝试获取锁!");
    }

    @Override
    public void unlock() {
        if(ownerThread!=Thread.currentThread()) {
            throw new RuntimeException("试图释放无所有权的锁!");
        }
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,
                    Arrays.asList(RS_DISTLOCK_NS+lockName),
                    Arrays.asList(lockerId.get()));
            if(result.longValue()!=0L){
                System.out.println("Redis上的锁已释放!");
            }else{
                System.out.println("Redis上的锁释放失败!");
            }
        } catch (Exception e) {
            throw new RuntimeException("释放锁失败!",e);
        } finally {
            if(jedis!=null) jedis.close();
            lockerId.remove();
            setOwnerThread(null);
            System.out.println("本地锁所有权已释放!");
        }
    }

    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException("不支持等待通知操作!");
    }

}

3.锁过期时间不好评估怎么办

 3.1看门狗模式

 <!--引入Redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-redis</artifactId>
            <version>1.4.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.12.3</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.3</version>
        </dependency>

 

package com.mashibing.lock.rdl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;

import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 分布式锁,附带看门狗线程的实现:加锁,保持锁1秒
 */
@Component
public class RedisDistLockWithDog implements Lock {

    private final static int LOCK_TIME = 1*1000;
    private final static String LOCK_TIME_STR = String.valueOf(LOCK_TIME);
    private final static String RS_DISTLOCK_NS = "tdln2:";
    /*
     if redis.call('get',KEYS[1])==ARGV[1] then
        return redis.call('del', KEYS[1])
    else return 0 end
     */
    private final static String RELEASE_LOCK_LUA =
            "if redis.call('get',KEYS[1])==ARGV[1] then\n" +
                    "        return redis.call('del', KEYS[1])\n" +
                    "    else return 0 end";
    /*还有并发问题,考虑ThreadLocal*/
    private ThreadLocal<String> lockerId = new ThreadLocal<>();

    private Thread ownerThread;
    private String lockName = "lock";

    @Autowired
    private JedisPool jedisPool;

    public String getLockName() {
        return lockName;
    }

    public void setLockName(String lockName) {
        this.lockName = lockName;
    }

    public Thread getOwnerThread() {
        return ownerThread;
    }

    public void setOwnerThread(Thread ownerThread) {
        this.ownerThread = ownerThread;
    }

    @Override
    public void lock() {
        while(!tryLock()){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException("不支持可中断获取锁!");
    }

    @Override
    public boolean tryLock() {
        Thread t=Thread.currentThread();
        /*说明本线程正在持有锁*/
        if(ownerThread==t) {
            return true;
        }else if(ownerThread!=null){/*说明本进程中有别的线程正在持有分布式锁*/
            return false;
        }
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            /*每一个锁的持有人都分配一个唯一的id,也可采用snowflake算法*/
            String id = UUID.randomUUID().toString();

            SetParams params = new SetParams();
            params.px(LOCK_TIME); //加锁时间1s
            params.nx();
            synchronized (this){
                if ((ownerThread==null)&&
                        "OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))) {
                    lockerId.set(id);
                    setOwnerThread(t);
                    if(expireThread == null){//看门狗线程启动
                        expireThread = new Thread(new ExpireTask(),"expireThread");
                        expireThread.setDaemon(true);
                        expireThread.start();
                    }
                    //往延迟阻塞队列中加入元素(让看门口可以在过期之前一点点的时间去做锁的续期)
                    delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockName,id)));
                    System.out.println(Thread.currentThread().getName()+"已获得锁----");
                    return true;
                }else{
                    System.out.println(Thread.currentThread().getName()+"无法获得锁----");
                    return false;
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("分布式锁尝试加锁失败!",e);
        } finally {
            jedis.close();
        }
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        throw new UnsupportedOperationException("不支持等待尝试获取锁!");
    }

    @Override
    public void unlock() {
        if(ownerThread!=Thread.currentThread()) {
            throw new RuntimeException("试图释放无所有权的锁!");
        }
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,
                    Arrays.asList(RS_DISTLOCK_NS+lockName),
                    Arrays.asList(lockerId.get()));
            System.out.println(result);
            if(result.longValue()!=0L){
                System.out.println("Redis上的锁已释放!");
            }else{
                System.out.println("Redis上的锁释放失败!");
            }
        } catch (Exception e) {
            throw new RuntimeException("释放锁失败!",e);
        } finally {
            if(jedis!=null) jedis.close();
            lockerId.remove();
            setOwnerThread(null);
        }
    }

    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException("不支持等待通知操作!");
    }

    /*看门狗线程*/
    private Thread expireThread;
    //通过delayDog 避免无谓的轮询,减少看门狗线程的轮序次数   阻塞延迟队列   刷1  没有刷2
    private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();
    //续锁逻辑:判断是持有锁的线程才能续锁
    private final static String DELAY_LOCK_LUA =
            "if redis.call('get',KEYS[1])==ARGV[1] then\n" +
                    "        return redis.call('pexpire', KEYS[1],ARGV[2])\n" +
                    "    else return 0 end";
    private class ExpireTask implements Runnable{

        @Override
        public void run() {
            System.out.println("看门狗线程已启动......");
            while(!Thread.currentThread().isInterrupted()) {
                try {
                    LockItem lockItem = delayDog.take().getData();//只有元素快到期了才能take到  0.9s
                    Jedis jedis = null;
                    try {
                        jedis = jedisPool.getResource();
                        Long result = (Long)jedis.eval(DELAY_LOCK_LUA,
                                Arrays.asList(RS_DISTLOCK_NS+lockItem.getKey ()),
                                Arrays.asList(lockItem.getValue(),LOCK_TIME_STR));
                        if(result.longValue()==0L){
                            System.out.println("Redis上的锁已释放,无需续期!");
                        }else{
                            delayDog.add(new ItemVo<>((int)LOCK_TIME,//1000ms
                                    new LockItem(lockItem.getKey(),lockItem.getValue())));
                            System.out.println("Redis上的锁已续期:"+LOCK_TIME);
                        }
                    } catch (Exception e) {
                        throw new RuntimeException("锁续期失败!",e);
                    } finally {
                        if(jedis!=null) jedis.close();
                    }
                } catch (InterruptedException e) {
                    System.out.println("看门狗线程被中断");
                    break;
                }
            }
            System.out.println("看门狗线程准备关闭......");
        }
    }

//    @PostConstruct
//    public void initExpireThread(){
//
//    }

    @PreDestroy
    public void closeExpireThread(){
        if(null!=expireThread){
            expireThread.interrupt();
        }
    }
}

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

redis分布式锁 的相关文章

随机推荐

  • 【FPGA】UART串口通信——奇偶校验实现

    文章目录 一 奇偶校验位二 设计思路三 仿真测试 一 奇偶校验位 奇偶校验位是基于uart的数据上进行一个判断 奇校验 xff1a 数据1个数为奇时 xff0c 校验为0 xff0c 反之为1 偶校验 xff1a 数据0个数为偶时 xff0
  • Jmeter性能测试(21)--jmeter常用插件介绍

    jmeter作为一个开源的接口性能测试工具 xff0c 其本身的小巧和灵活性给了测试人员很大的帮助 xff0c 但其本身作为一个开源工具 xff0c 相比于一些商业工具 xff08 比如LoadRunner xff09 xff0c 在功能的
  • reStructuredText 初学者语法汇总

    文档格式编辑 xff0c 目前主流最强大的是LaTeX xff0c 但是语法太复杂 xff0c 环境要求也多 xff0c 有的时候也是写文档往往选择 Markdown 常常怀疑文档编辑的Markdown不是亲生的 xff0c 很多功能不全
  • C++如何写自定义的头文件

    C 43 43 是一种非常热门的面向对象语言 xff0c 受到很多人的欢迎 在C语言的基础上 xff0c C 43 43 进行了很多的改进 xff0c 并引入了许多新的头文件 xff0c 方便我们使用 比如要进行字符串操作就引入string
  • 虚拟机Vmware安装Ubuntu系统

    vm虚拟机下载安装 安装位置可以任意选择 xff0c 但是路径中不要出现中文字符 两个选项都取消掉 点击许可证 激活码就是许可证选择一个复制即可 Ubuntu系统下载安装 Ubuntu系统可直接前往其官网进行下载 buntu 22 04 L
  • 深入理解JVM虚拟机

    文章目录 深入理解JVM虚拟机JDK1 8新特性 xff1a JVM架构图 xff1a 类装载器知识点 xff1a 类装载器 xff1a 双亲委派机制 xff1a 沙箱安全机制 xff1a Execution Engine执行引擎负责解释命
  • Linux基础指令详解

    目录 前言 Linux基本指令 1 ls指令 1 1 ls 1 2 ls l 1 3 ls a 1 4 ls d 1 5 绝对路径和相对路径 2 pwd指令 3 cd指令 4 touch指令 5 mkdir指令 6 rmdir指令和rm指令
  • aruco识别,python实现

    需要配置anaconda xff0c 用spyder进行python语言编辑 xff0c 实现对aruco码的编写 代码比较垃圾 xff0c 不喜勿喷 配置过程如下 xff1a 视觉系统的运行需要搭建视觉环境 xff0c 包括 xff0c
  • 关于使用HTTP上传文件到hdfs文件系统

    String tenantName 61 System getenv 34 OPENPALETTE NAMESPACE 34 String dataIntegrationServiceName 61 34 dataintegration m
  • 详解 FTP、FTPS 与 SFTP 的原理

    FTP FTPS 与 SFTP 简介 FTP FTP 即 文件传输协议 xff08 英语 xff1a File Transfer Protocol 的缩写 xff09 是一个用于计算机网络上在客户端和服务器之间进行文件传输的应用层协议 完整
  • 用函数调用,把在一个函数的内部改变另一个函数的变量(这个变量相对于第一个函数就是一个外部变量)

    include lt stdio h gt void add int p 指针是整数 xff0c 所以其类型是int整型 这里 p外部变量num的地址 xff0c 即 p 61 num p 43 43 指针变量在单独使用时记得要加括号表示一
  • keil5 C51版本安装及MDK5合并,搭建STM32开发环境(详细教程)

    keil5安装及MDK5合并 资源说明 已将文章中涉及到的所有软件安装包及注册机2032版都放置到百度网盘 xff0c 链接 xff1a 百度云盘链接 提取码 xff1a 0109 1 C51安装 首先在keil官网里下载软件安装包 xff
  • Jmeter性能测试(22)--内存溢出原因及解决方法

    jmeter是一个java开发的开源性能测试工具 xff0c 在性能测试中可支持模拟并发压测 xff0c 但有时候当模拟并发请求较大或者脚本运行时间较长时 xff0c 压力机会出现卡顿甚至报异常 内存溢出 xff0c 这里就介绍下如何解决内
  • Ubuntu16.04的安装教程

    Ubuntu16 04的安装 这里我们会介绍Ubuntu16 04的史诗级保姆教程 开始了 xff0c 车速有点快 xff0c 系好安全带 xff0c 发车了 xff01 1 打开浏览器 xff0c 找到Ubuntu的官网 2 单击 系统桌
  • 电脑触摸板无法使用,I2C HID设备异常处理。

    本人电脑 戴尔 Vostro3400 xff0c win10系统 xff0c 触摸板突然失灵 xff0c 客服让我更新BIOS驱动 xff0c 关闭电源选项的快速启动等等 好了一阵 xff0c 再一重启又失灵 经过无限次百度 xff0c 终
  • 自我简介,对软件工程课程的希望及个人目标

    我是一名桂林理工大学信息科学与工程学院软件工程本科大二年纪的学生 xff0c 热爱学习 xff0c 努力上进 xff0c 对未来充满希望 xff0c 很高兴能学习软件工程这门课程 我对这门课程的希望 xff1a 1 我希望通过学习软件工程这
  • 用HTML、CSS写一个酷炫的动态搜索框

    用HTML CSS写一个酷炫的动态搜索框 可伸展的动态搜索框 xff01 复制粘贴即可用 xff01 HTML部分 xff1a span class token doctype lt DOCTYPE html gt span span cl
  • 使用 curl/git 命令时出现 Failed to connect to XXX port 443: 拒绝连接

    文章目录 原因与过程解决办法 原因与过程 今天在linux下安装docker compose出现Failed connect to github com 443 拒绝连接 网上查了下说是DNS被污染 xff0c 改下host文件 解决办法
  • 【C++】30h速成C++从入门到精通(STL介绍、string类)

    STL简介 什么是STL STL standard template libaray 标准模板库 xff1a 是C 43 43 标准库的重要组成部分 xff0c 不仅是一个可复用的组件库 xff0c 而且是一个包罗数据结构与算法的软件框架
  • redis分布式锁

    1 Redis分布式锁最简单的实现 想要实现分布式锁 xff0c 必须要求 Redis 有 互斥 的能力 xff0c 我们可以使用 SETNX 命令 xff0c 这个命令表示SET if Not Exists xff0c 即如果 key 不