公平锁总共用了Redis的三种数据类型,对应着 lua 脚本里面的keys1、2、3的参数:
- KEYS[1]
锁的名字,使用 Hash 数据类型,是可重入锁的基础,结构为 {”threadId1”: 1, “thread2”: 1} , key为线程id,value是锁的次数
- KEYS[2]
线程队列的名字,使用 List 数据类型,结构为 [ “threadId1”, “threadId2” ],按顺序存放需要获取锁的线程的id
- KEYS[3]
【Redisson源码解读-公平锁】时间队列的名字,使用 sorted set 数据类型,结构为 {”threadId2”:123, “threadId1”:190},key为线程id,value为获取锁的超时时间戳
同样的,介绍下参数:
- ARGV[1]:leaseTime 锁的持有时间
- ARGV[2]:线程id(描述不太准确,暂时按这样理解)
- ARGV[3]:waitTime 尝试获取锁的最大等待时间
- ARGV[4]:currentTime 当前时间戳
"while true do " +// list为空,证明没有人排队,退出循环"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +// 能到这里 , 证明有人排队,拿出在排队的第一个人的超时时间,如果超时了,则移除相应数据"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +"if timeout <= tonumber(ARGV[4]) then " +// 从时间队列和线程队列中移除"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else " +"break;" +"end;" +"end;" +
具体的逻辑我在注释中写的很清楚了,看的时候记住 KEYS[2]、KEYS[3] 对应着线程队列和时间队列接口 。主要注意的是,线程队列只有当一个线程持有锁 , 另一个线程获取不到锁时,才会有值(前面有人才排队,没人排什么队) 。接着看第二段// 检查是否可以获取锁 。当锁不存在,并且线程队列不存在或者线程队列第一位是当前线程,则可以获取锁"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +// remove this thread from the queue and timeout set// 都获取锁了,当然要从线程队列和时间队列中移除"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +// decrease timeouts for all waiting in the queue// 刷新时间队列中的时间"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +// acquire the lock and set the TTL for the lease// 和公平锁的设置一样,值加1并且设置过期时间"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +
翻译翻译就是,锁不存在(别人没有持有锁)并且线程队列不存在或者线程队列第一位是当前线程(不用排队或者自己排第一)才能获得锁 。因为时间队列中存放的是各个线程等待锁的超时时间戳,所以每次都需要刷新下 。继续下一段逻辑// 能到这里,证明前面拿不到锁,但是也要做可重入锁的处理"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2],1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +
这是可重入锁的处理,继续下一段// 时间队列中有值,证明线程已经在队列中 , 不需要往后执行逻辑了"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +"if timeout ~= false then " +// the real timeout is the timeout of the prior thread// in the queue, but this is approximately correct, and// avoids having to traverse the queue// 因为下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])// 所以这里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +"end;" +
举例子:线程1持有锁,线程2尝试第一次获取锁(不进入这段if),线程2第二次获取锁(进入了这段if) 。继续下一段"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +"local ttl;" +// 如果最后一个线程不是当前线程,则从时间集合取出(举例:线程1/2/3按顺序获取锁,此时pttl得到的是线程1的锁过期时间,zscore拿到的是线程2的锁的过期时间,此时线程3应该以线程2的为准)"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +"else " +// 否则直接获取锁的存活时间"ttl = redis.call('pttl', KEYS[1]);" +"end;" +// 过期时间 = 锁存活时间 + 等待时间 + 当前时间戳"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +// 如果添加到时间集合成功 , 则同时添加线程集合"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end;" +"return ttl;",
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- OpenHarmony移植案例: build lite源码分析之hb命令__entry__.py
- 【深入浅出 Yarn 架构与实现】1-2 搭建 Hadoop 源码阅读环境
- Redisson源码解读-分布式锁
- 源码级深度理解 Java SPI
- Dubbo-聊聊通信模块设计
- 【lwip】10-ICMP协议&源码分析
- 【lwip】09-IPv4协议&超全源码实现分析
- 京东云开发者|经典同态加密算法Paillier解读 - 原理、实现和应用
- 从BeanFactory源码看Bean的生命周期
- 详解AQS中的condition源码原理