先看一下整体逻辑:
- 尝试加锁 , 成功直接返回true
- 判断超时
- 订阅
- 判断超时
- 循环 (尝试获取锁 → 判断超时 → 阻塞等待 )
tryLock
方法看着很长,但是有很多代码都是重复的,本小节重点说一下尝试加锁的方法tryAcquire
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {// 调用lua脚本 , 尝试加锁ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 这里的if、else的区别就在于,如果没有设置leaseTime,就使用默认的internalLockLeaseTime(默认30秒)ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquired// 如果ttlRemaining为空 , 也就是tryLockInnerAsync方法中的lua执行结果返回空 , 证明获取锁成功if (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 如果没有设置锁的持有时间(leaseTime),则启动看门狗,定时给锁续期 , 防止业务逻辑未执行完成锁就过期了scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}
在tryAcquireAsync
方法中 , 主要分为两段逻辑:- 调用lua脚本加锁:tryLockInnerAsync
- 看门狗:scheduleExpirationRenewal
// RedissonLock#tryLockInnerAsync<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}
Redisson使用了 Hash 结构来表示一个锁,这样 Hash 里面的 key 为线程id,value 为锁的次数 。这样巧妙地解决了可重入锁的问题 。下面我们来分析下这段 lua 脚本的逻辑(下面说的threadId都是指变量,不是说key就叫’threadId’):
- 如果锁(hash结构)不存在,则创建 , 并添加一个键值对 (threadId : 1),并设置锁的过期时间
- 如果锁存在,则将键值对 threadId 对应的值 + 1 , 并设置锁的过期时间
- 如果不如何1,2点,则返回锁的剩余过期时间
RedissonLock#tryLock
中 , 当经过一些尝试获取锁,超时判断之后,代码来到while循环中 。这个while循环是个死循环,只有成功获取锁或者超时,才会退出 。一般死循环的设计中,都会有阻塞等待的代码,否则如果循环中的逻辑短时间拿不到结果 , 会造成资源抢占和浪费 。阻塞代码就是下面这段if (ttl >= 0 && ttl < time) {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}
commandExecutor.getNow(subscribeFuture).getLatch() 得到的是一个Semaphore
信号量对象,这是jdk的内置对象,Semaphore#tryAcquire
表示阻塞并等待唤醒 。那么信号量什么时候被唤醒呢?在订阅方法中RedissonLock#subscribe
。订阅方法的逻辑也不少 , 咱们直接讲其最终调用的处理方法// LockPubSub#onMessageprotected void onMessage(RedissonLockEntry value, Long message) {// 普通的解锁走的是这个if (message.equals(UNLOCK_MESSAGE)) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}// 这里就是唤醒信号量的地方value.getLatch().release();// 这个是读写锁用的} else if (message.equals(READ_UNLOCK_MESSAGE)) {while (true) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute == null) {break;}runnableToExecute.run();}value.getLatch().release(value.getLatch().getQueueLength());}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 源码级深度理解 Java SPI
- Dubbo-聊聊通信模块设计
- 【lwip】10-ICMP协议&源码分析
- 【lwip】09-IPv4协议&超全源码实现分析
- 京东云开发者|经典同态加密算法Paillier解读 - 原理、实现和应用
- 从BeanFactory源码看Bean的生命周期
- 详解AQS中的condition源码原理
- 实例解读丨关于GaussDB ETCD服务异常
- 【lwip】08-ARP协议一图笔记及源码实现
- 【lwip】07-链路层收发以太网数据帧源码分析