Redis高并发分布式锁详解( 二 )

会发现补一个超时时间的话依旧无法避免之前的问题,故加锁和设置超时时间需要保持原子性 。
3)采用原子操作:Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);基于设置了超时时间,那么我们如何考量超时时间呢,业务执行多久我们根本不可得知 。故容易出现时间到期了,业务还没执行完 。这就容易出现A持有锁执行任务 , 还没完成就超时了 , B持有锁执行任务,A执行完 , 释放锁【此时会释放B的锁】的情况 。所以释放锁必须要持有锁本人才能执行 。
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {stringRedisTemplate.delete(lockKey);}所以clientId需要是分布式ID , 然后释放锁改为判断clientId符合才能去释放 。
3.改进之后的情况:
代码示例
public String deductStock() {String lockKey = "lock:product_101";String clientId = UUID.randomUUID().toString();Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);if (!result) {return "error_code";}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { Long realStock = (Long) stringRedisTemplate.opsForValue().decrement("stock");System.out.println("扣减成功 , 剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); } } return "end"; }发现说明
1)即时加了判断,我们会发现依旧会存在问题【因为判断与释放锁操作不是原子性的】,如果在判断里面加上休眠进行试验
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {Thread.sleep(20000);stringRedisTemplate.delete(lockKey);}我们会发现根本问题依旧没有解决,只是减少了发生的情况 。究其原因 , 本质上还是锁超时导致的 。解决这个问题就要引入一个完美的解决方案叫做锁续命 。
2)锁续命(watchDog):假设主线程抢到锁开始执行业务逻辑,开启一个分线程,在分线程里边做一个定时任务,比如说设置的锁超时时间是30s,那么我们的定时任务时间就设置为10s,定时任务设置的时间一定要比锁超时时间?。?每10s定时任务先去判断主线程有没有结束,没有结束的话说明主线程就还在,还在进行业务逻辑操作,这个时候我们执行一条expire命令,将主线程锁的超时时间重新设置为30s,这样的话只要主线程还没结束 , 主线程就会被分线程定时任务去做续命逻辑,维持在30s,判断主线程结束 , 就不再执行续命逻辑 。
Redisson分布式锁框架剖析1.引入依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.6.5</version></dependency>2.进行配置
@Beanpublic Redisson redisson() {// 此为单机模式Config config = new Config();config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);return (Redisson) Redisson.create(config);}3.业务代码展示
public String deductStock2() {String lockKey = "lock:product_101";//获取锁对象RLock redissonLock = redisson.getLock(lockKey);//加分布式锁redissonLock.lock();try {Long stock = (Long) stringRedisTemplate.opsForValue().decrement("stock");if (stock > 0) {Long realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + "");System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} finally {//解锁redissonLock.unlock();}return "end";}发现说明
1.如果在集群架构下面,分布式锁如果在Master节点上写成功了就会返回给客户端 , 但是此时还需要同步给从节点 。
2.如果在此时间内Master节点结点宕机,那么数据将会消失,而从节点上没有锁的信息(变为Master节点) 。【主从架构锁失效问题】
4.为解决主从架构锁失效问题引入的RedLock(不建议用 , 因为本质上还是没有解决主从架构锁失效问题)
0.原理展示
 

Redis高并发分布式锁详解

文章插图
1.redssion 集群配置(在resource下创建 redssion.yml文件)
clusterServersConfig:# 连接空闲超时,单位:毫秒 默认10000idleConnectionTimeout: 10000pingTimeout: 1000# 同任何节点建立连接时的等待超时 。时间单位是毫秒 默认10000connectTimeout: 10000# 等待节点回复命令的时间 。该时间从命令发送成功时开始计时 。默认3000timeout: 3000# 命令失败重试次数retryAttempts: 3# 命令重试发送时间间隔,单位:毫秒retryInterval: 1500# 重新连接时间间隔,单位:毫秒reconnectionTimeout: 3000# 执行失败最大次数failedAttempts: 3# 密码password: test1234# 单个连接最大订阅数量subscriptionsPerConnection: 5clientName: null# loadBalancer 负载均衡算法类的选择loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}#从节点发布和订阅连接的最小空闲连接数slaveSubscriptionConnectionMinimumIdleSize: 1#从节点发布和订阅连接池大小 默认值50slaveSubscriptionConnectionPoolSize: 50# 从节点最小空闲连接数 默认值32slaveConnectionMinimumIdleSize: 32# 从节点连接池大小 默认64slaveConnectionPoolSize: 64# 主节点最小空闲连接数 默认32masterConnectionMinimumIdleSize: 32# 主节点连接池大小 默认64masterConnectionPoolSize: 64# 订阅操作的负载均衡模式subscriptionMode: SLAVE# 只在从服务器读取readMode: SLAVE# 集群地址nodeAddresses:- "redis://IP地址:30001"//- "redis://IP地址:30002"- "redis://IP地址:30003"- "redis://IP地址:30004"- "redis://IP地址:30005"- "redis://IP地址:30006"# 对Redis集群节点状态扫描的时间间隔 。单位是毫秒 。默认1000scanInterval: 1000#这个线程池数量被所有RTopic对象监听器 , RRemoteService调用者和RExecutorService任务共同共享 。默认2threads: 0#这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务 , 以及底层客户端所一同共享的线程池里保存的线程数量 。默认2nettyThreads: 0# 编码方式 默认org.redisson.codec.JsonJacksonCodeccodec: !<org.redisson.codec.JsonJacksonCodec> {}#传输模式transportMode: NIO# 分布式锁自动过期时间,防止死锁 , 默认30000lockWatchdogTimeout: 30000# 通过该参数来修改是否按订阅发布消息的接收顺序出来消息,如果选否将对消息实行并行处理,该参数只适用于订阅发布消息的情况, 默认truekeepPubSubOrder: true# 用来指定高性能引擎的行为 。由于该变量值的选用与使用场景息息相关(NORMAL除外)我们建议对每个参数值都进行尝试 。

推荐阅读