Redisson源码解读-分布式锁

前言Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid) 。Redisson有一样功能是可重入的分布式锁 。本文来讨论一下这个功能的特点以及源码分析 。
前置知识在讲Redisson,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制,磨刀不误砍柴工 。
分布式锁的思考首先思考下,如果我们自己去实现一个分布式锁,这个锁需要具备哪些功能?

  1. 互斥(这是一个锁最基本的功能)
  2. 锁失效机制(也就是可以设置锁定时长,防止死锁)
  3. 高性能、高可用
  4. 阻塞、非阻塞
  5. 可重入、公平锁
  6. 。。。
可见,实现一个分布式锁,需要考虑的东西有很多 。那么 , 如果用Redis来实现分布式锁呢?如果只需要具备上面说的1、2点功能,要怎么写?(ps:我就不写了,自己想去)
Redis订阅/发布机制Redisson中用到了Redis的订阅/发布机制,下面简单介绍下 。
简单来说就是如果client2 、 client5 和 client1 订阅了 channel1,当有消息发布到 channel1 的时候 , client2 、 client5 和 client1 都会收到这个消息 。
Redisson源码解读-分布式锁

文章插图
图片来自 菜鸟教程-Redis发布订阅
Redisson源码版本:3.17.7
下面以Redisson官方的可重入同步锁例子为入口,解读下源码 。
RLock lock = redisson.getLock("anyLock");// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);if (res) {try {...} finally {lock.unlock();}}加锁我用时序图来表示加锁和订阅的过程 。时序图中括号后面的c1、c2代表client1,client2
Redisson源码解读-分布式锁

文章插图
当线程2获取了锁但还没释放锁时,如果线程1去获取锁,会阻塞等待 , 直到线程2解锁,通过Redis的发布订阅机制唤醒线程1,再次去获取锁 。
加锁方法是 lock.tryLock(100, 10, TimeUnit.SECONDS),对应着就是RedissonLock#tryLock
/** * 获取锁 * @param waitTime尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败 * @param leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间 , 确保在锁有效期内业务能处理完) * @param unit 时间单位 * @return 获取锁成功返回true , 失败返回false */@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();// 当前时间long threadId = Thread.currentThread().getId();// 当前线程id// 尝试加锁,加锁成功返回null,失败返回锁的剩余超时时间Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// 获取锁成功if (ttl == null) {return true;}// time小于0代表此时已经超过获取锁的等待时间,直接返回falsetime -= System.currentTimeMillis() - current;if (time <= 0) {// 没看懂这个方法,里面里面空运行,有知道的大神还请不吝赐教acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);try {subscribeFuture.get(time, TimeUnit.MILLISECONDS);} catch (TimeoutException e) {if (!subscribeFuture.cancel(false)) {subscribeFuture.whenComplete((res, ex) -> {// 出现异常 , 取消订阅if (ex == null) {unsubscribe(res, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;} catch (ExecutionException e) {acquireFailed(waitTime, unit, threadId);return false;}try {// 判断是否超时(超过了waitTime)time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {// 再次获取锁,成功则返回long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 阻塞等待信号量唤醒或者超时,接收到订阅时唤醒// 使用的是Semaphore#tryAcquire()currentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {// 因为是同步操作,所以无论加锁成功或失败 , 都取消订阅unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);}//return get(tryLockAsync(waitTime, leaseTime, unit));}

推荐阅读