zk系列三:zookeeper实战之分布式锁实现

一、分布式锁的通用实现思路分布式锁的概念以及常规解决方案可以参考之前的博客:聊聊分布式锁的解决方案;今天我们先分析下分布式锁的实现思路;

  • 首先,需要保证唯一性 , 即某一时点只能有一个线程访问某一资源;比方说待办短信通知功能 , 每天早上九点短信提醒所有工单的处理人处理工单,假设服务部署了20个容器,那么早上九点的时候会有20个线程启动准备发送短信,此时我们只能让一个线程执行短信发送 , 否则用户会收到20条相同的短信;
  • 其次,需要考虑下何时应该释放锁?这又分三种情况,一是拿到锁的线程正常结束 , 另一种是获取锁的线程异常退出,还有种是获取锁的线程一直阻塞;第一种情况直接释放即可,第二种情况可以通过定义下锁的过期时间然后通过定时任务去释放锁;zk的话直接通过临时节点即可;最后一种阻塞的情况也可以通过定时任务来释放,但是需要根据业务来综合判断 , 如果业务本身就是长时间耗时的操作那么锁的过期时间就得设置的久一点
  • 最后 , 当拿到锁的线程释放锁的时候,如何通知其他线程可以抢锁了呢这里简单介绍两种解决方案,一种是所有需要锁的线程主动轮询,固定时间去访问下看锁是否释放,但是这种方案无端增加服务器压力并且时效性无法保证;另一种就是zk的watch , 监听锁所在的目录 , 一有变化立马得到通知
二、ZK实现分布式锁的思路
  • zk通过每个线程在同一父目录下创建临时有序节点,然后通过比较节点的id大小来实现分布式锁功能;再通过zk的watch机制实时获取节点的状态,如果被删除立即重新争抢锁;具体流程见线图:
    zk系列三:zookeeper实战之分布式锁实现

    文章插图
提示:需要关注下图里判断自身不是最小节点时的监听情况,为什么不监听父节点?原因图里已有描述,这里就不再赘述
三、ZK实现分布式锁的编码实现1、核心工具类实现通过不断的调试 , 我封装了一个ZkLockHelper类,里面封装了上锁和释放锁的方法 , 为了方便我将zk的一些监听和回调机智也融合到一起了,并没有抽出来,下面贴上该类的全部代码
package com.darling.service.zookeeper.lock;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import org.junit.platform.commons.util.StringUtils;import java.util.Collections;import java.util.List;import java.util.Objects;import java.util.concurrent.CountDownLatch;/** * @description: * @author: dll * @date: Created in 2022/11/4 8:41 * @version: * @modified By: */@Data@Slf4jpublic class ZkLockHelper implements AsyncCallback.StringCallback, AsyncCallback.StatCallback,Watcher, AsyncCallback.ChildrenCallback {private final String lockPath = "/lockItem";ZooKeeper zkClient;String threadName;CountDownLatch cd = new CountDownLatch(1);private String pathName;/*** 上锁*/public void tryLock() {try {log.info("线程:{}正在创建节点",threadName);zkClient.create(lockPath,(threadName).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"AAA");log.info("线程:{}正在阻塞......",threadName);// 由于上面是异步创建所以这里需要阻塞住当前线程cd.await();} catch (InterruptedException e) {e.printStackTrace();}}/*** 释放锁*/public void unLock() {try {zkClient.delete(pathName,-1);System.out.println(threadName + " 工作结束....");} catch (Exception e) {e.printStackTrace();}}/*** create方法的回调,创建成功后在此处获取/DCSLock的子目录,比较节点ID是否最小 , 是则拿到锁 。。。* @param rc状态码* @param pathcreate方法的path入参* @param ctxcreate方法的上下文入参* @param name创建成功的临时有序节点的名称 , 即在path的后面加上了zk维护的自增ID;*注意如果创建的不是有序节点,那么此处的name和path的内容一致*/@Overridepublic void processResult(int rc, String path, Object ctx, String name) {log.info(">>>>>>>>>>>>>>>>>processResult,rx:{},path:{},ctx:{},name:{}",rc,path,ctx.toString(),name);if (StringUtils.isNotBlank(name)) {try {pathName =name ;// 此处path需注意要写/zkClient.getChildren("/", false,this,"123");//List<String> children = zkClient.getChildren("/", false);//log.info(">>>>>threadName:{},children:{}",threadName,children);//// 给children排序//Collections.sort(children);//int i = children.indexOf(pathName.substring(1));//// 判断自身是否第一个//if (Objects.equals(i,0)) {//// 是第一个则表示抢到了锁//log.info("线程{}抢到了锁",threadName);//cd.countDown();//}else {//// 表示没抢到锁//log.info("线程{}抢锁失败 , 重新注册监听器",threadName);//zkClient.exists("/"+children.get(i-1),this,this,"AAA");//}} catch (Exception e) {e.printStackTrace();}}}/*** exists方法的回调,此处暂不做处理* @param rc* @param path* @param ctx* @param stat*/@Overridepublic void processResult(int rc, String path, Object ctx, Stat stat) {}/*** exists的watch监听* @param event*/@Overridepublic void process(WatchedEvent event) {//如果第一个线程锁释放了,等价于第一个线程删除了节点,此时只有第二个线程会监控的到switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zkClient.getChildren("/", false,this,"123");//// 此处path需注意要写"/"//List<String> children = null;//try {//children = zkClient.getChildren("/", false);//} catch (KeeperException e) {//e.printStackTrace();//} catch (InterruptedException e) {//e.printStackTrace();//}//log.info(">>>>>threadName:{},children:{}",threadName,children);//// 给children排序//Collections.sort(children);//int i = children.indexOf(pathName.substring(1));//// 判断自身是否第一个//if (Objects.equals(i,0)) {//// 是第一个则表示抢到了锁//log.info("线程{}抢到了锁",threadName);//cd.countDown();//}else {///**//*表示没抢到锁;需要判断前置节点存不存在,其实这里并不是特别关心前置节点存不存在,所以其回调可以不处理;//*但是这里关注的前置节点的监听,当前置节点监听到被删除时就是其他线程抢锁之时//*///zkClient.exists("/"+children.get(i-1),this,this,"AAA");//}break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}/*** getChildren方法的回调* @param rc* @param path* @param ctx* @param children*/@Overridepublic void processResult(int rc, String path, Object ctx, List<String> children) {try {log.info(">>>>>threadName:{},children:{}", threadName, children);if (Objects.isNull(children)) {return;}// 给children排序Collections.sort(children);int i = children.indexOf(pathName.substring(1));// 判断自身是否第一个if (Objects.equals(i, 0)) {// 是第一个则表示抢到了锁log.info("线程{}抢到了锁", threadName);cd.countDown();} else {// 表示没抢到锁log.info("线程{}抢锁失败,重新注册监听器", threadName);/***表示没抢到锁;需要判断前置节点存不存在,其实这里并不是特别关心前置节点存不存在,所以其回调可以不处理;*但是这里关注的前置节点的监听,当前置节点监听到被删除时就是其他线程抢锁之时*/zkClient.exists("/" + children.get(i - 1), this, this, "AAA");}} catch (Exception e) {e.printStackTrace();}}}

推荐阅读