Zookeeper实现分布式锁

分别使用 Zookeeper 的原生 API 和 Curator 框架实现分布式锁

Zookeeper实现分布式锁

什么是分布式锁?

分布式环境下多个进程实例同时对同一个资源进行操作,为了解决这个问题,提出分布式锁;

进程访问资源,先获取锁,拿到锁代表有了对资源操作的权限了,其他没有拿到锁的进程需要等待;

这个锁可以是 Redis、Zookeeper,甚至也可以是数据库;

分布式锁的特点:

  • 互斥性:任何时刻,对于同一条数据,只有一台应用可以获取到分布式锁
  • 高可用性:提供分布式锁的服务需要做到高可用,小部分机器宕机不能影响正常使用
  • 防止锁超时:如果客户端没有释放锁,服务器会在一段时间之后自动释放锁,防止客户端宕机或网络异常时产生死锁
  • 独占性:加解锁必须由同一台机器进行,也就是谁加的锁,谁来释放,不能出现自己加的锁,但是别人释放了的情况

实现思路

整体思路:利用创建 Zookeeper 的临时有序节点,多个客户端同时创建临时有序的节点,谁的序号小(谁先创建)谁就拿到锁,其他序号依次监控自己的前一名进行等待,等待前一名释放锁,才会轮到自己;

整体过程:

  1. 客户端准备获取分布式锁,连接 Zookeeper 服务端,向/locks路径下创建一个临时的序列节点
  2. 获取/locks下所有的节点,看看自己是不是序号最小的节点:
    • 是:成功获取到锁,返回
    • 否:获取锁失败,对自己的前一个节点进行监听,阻塞直到监控到前一个节点发生delete事件时,代表轮到自己了,自己获取到锁,返回
  3. delete释放锁

环境准备

  1. 搭建一个普通的 maven 工程,引入相关依赖:

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.7</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
        
        <!-- 还需要引入 org.apache.zookeeper 不过上面已经引入了 -->
        
    </dependencies>
  2. 在项目的 resources 文件夹下创建 log4j.properties

    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n

使用 Zookeeper 原生 API 实现分布式锁

主要是三个方法:

  1. 构造方法:在构造方法里进行连接 Zookeeper 服务端,创建根节点,监听前一个节点事件
  2. 获取锁(加锁):就是上述的前两步骤,创建自己的专属锁路径,然后排序看看自己是不是第一个,是第一个代表成功加锁,然后返回,不是第一个就对前一个节点进行监控,然后阻塞等待直到前一个节点被释放掉
  3. 释放锁(解锁):就是delete掉自己创建的临时锁路径

具体代码:

package icu.sunnyc.zk.demo2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * zk 原生 API 实现分布式锁 demo
 * @author :hc
 * @date :Created in 2022/5/14 13:13
 * @modified :
 */
public class DistributedLock {

    private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);

    /**
     * 分布式锁根节点路径
     */
    private final String rootNodePath = "/locks";

    /**
     * 分布式锁子节点路径
     */
    private final String subNodePath = rootNodePath + "/seq-";

    /**
     * 用于监控是否与 zk 服务端完成建立连接
     */
    private final CountDownLatch connectLatch = new CountDownLatch(1);

    /**
     * 用于监控前一个节点是否释放锁
     */
    private final CountDownLatch waitLatch = new CountDownLatch(1);

    /**
     * 监控的前一个节点的 path
     */
    private String waitPath = null;

    /**
     * 自己的专属锁路径
     */
    private String myLockPath;

    /**
     * zk 客户端连接对象
     */
    private final ZooKeeper zkClient;

    public DistributedLock(String connectString, int sessionTimeout) throws IOException, InterruptedException,
            KeeperException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 连接建立时 connectLatch 减一,唤醒后面 await 的内容,可以获取根节点状态做其他操作了
                if (event.getState() == KeeperState.SyncConnected) {
                    connectLatch.countDown();
                    logger.info("{} 成功与服务器建立连接!", Thread.currentThread().getName());
                }
                // 发生了 waitPath 删除事件,也就是 watch 的前一个节点人家用完资源释放锁了
                if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                    logger.info("线程 {} 监控到锁 {} 已经被释放啦", Thread.currentThread().getName(), waitPath);
                    waitLatch.countDown();
                }
            }
        });
        // 阻塞直到与服务端完成建立连接
        connectLatch.await();
        // 获取根节点状态信息
        Stat rootNodeStat = zkClient.exists(rootNodePath, false);
        // 根节点不存在,就创建一个根节点,供后续使用
        if (rootNodeStat == null) {
            logger.warn("根节点为空,正在创建根节点...");
            String rootPath = zkClient.create(rootNodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            logger.info("根节点创建完毕,根节点路径:{}", rootPath);
        }
    }

    /**
     * 获取分布式锁 (加锁)
     */
    public void getLock() {

        try {
            myLockPath = zkClient.create(subNodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 获取父节点下的所有子节点,看看自己是否是第一个
            List<String> subNodeList = zkClient.getChildren(rootNodePath, false);
            if (subNodeList.isEmpty()) {
                logger.error("未知异常,获取到子节点为空");
                return;
            }
            String myNodeName = myLockPath.split("/")[2];
            // 排序
            Collections.sort(subNodeList);
            // 第一个是自己,代表已经成功获取到锁
            if (myNodeName.equals(subNodeList.get(0))) {
                return;
            }
            // 第一个不是自己,没能成功获取锁,找到自己的前一名,然后 watch 它,直到它被释放,自己就顺利接手锁
            for (int i = 0; i < subNodeList.size(); i++) {
                if (myNodeName.equals(subNodeList.get(i))) {
                    // watch 前一个节点路径
                    waitPath = rootNodePath + "/" + subNodeList.get(i - 1);
                    // 在 waitPath 上注册监听器,当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
                    zkClient.getData(waitPath, true, new Stat());
                    logger.info("线程:{},当前节点:{},已监控前一个节点:{}", Thread.currentThread().getName(),
                            myNodeName, waitPath);
                }
            }
            // 上述已经 watch 了前一个锁了,此时只需要慢慢等待 waitPath 被释放就行
            waitLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

    /**
     * 释放分布式锁 (解锁)
     */
    public void releaseLock() {
        // version -1 代表不管 version 多少,直接删除这个节点
        try {
            logger.info("线程:{} 已经释放锁 {}", Thread.currentThread().getName(), myLockPath);
            zkClient.delete(myLockPath, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}

分布式锁,并发获取锁测试:

package icu.sunnyc.zk.demo2;

import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 分布式锁 测试
 * @author :hc
 * @date :Created in 2022/5/14 14:13
 * @modified :
 */
public class DistributedLockTest {

    private static final Logger logger = LoggerFactory.getLogger(DistributedLockTest.class);

    public static void main(String[] args) throws InterruptedException {
        Random random = new Random();
        // 并发拿锁测试
        // 客户端数量
        int threadNumber = 5;
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(threadNumber);
        ExecutorService threadPool = Executors.newFixedThreadPool(threadNumber);
        for (int i = 0; i < threadNumber; i++) {
            threadPool.submit(() -> {
               try {
                   // 阻塞住 让这个线程别跑
                   start.await();
                   // 获取锁
                   DistributedLock distributedLock = new DistributedLock("hadoop102:2181,hadoop103:2181,hadoop104:2181", 2000);
                   distributedLock.getLock();
                   // sleep 一个随机数 做业务处理
                   logger.info("{} 已经成功拿到锁,正在处理自己的事情", Thread.currentThread().getName());
                   Thread.sleep(random.nextInt(5000));
                   distributedLock.releaseLock();
               } catch (InterruptedException e) {
                   e.printStackTrace();
                   Thread.currentThread().interrupt();
               } catch (IOException | KeeperException e) {
                   e.printStackTrace();
               } finally {
                   end.countDown();
               }
            });
        }
        start.countDown();
        logger.info("程序已启动");
        end.await();
        threadPool.shutdown();
    }
}

测试结果:

2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-3-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-1-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-2-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-5-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-4-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,793 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-4 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-5,当前节点:seq-0000000024,已监控前一个节点:/locks/seq-0000000023
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-1,当前节点:seq-0000000025,已监控前一个节点:/locks/seq-0000000024
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-2,当前节点:seq-0000000023,已监控前一个节点:/locks/seq-0000000022
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3,当前节点:seq-0000000022,已监控前一个节点:/locks/seq-0000000021
2022-05-14 21:19:23,671 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-4 已经释放锁 /locks/seq-0000000021
2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-3-EventThread 成功与服务器建立连接!
2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-3-EventThread 监控到锁 /locks/seq-0000000021 已经被释放啦
2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-3 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:25,321 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3 已经释放锁 /locks/seq-0000000022
2022-05-14 21:19:25,324 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-2-EventThread 成功与服务器建立连接!
2022-05-14 21:19:25,325 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-2-EventThread 监控到锁 /locks/seq-0000000022 已经被释放啦
2022-05-14 21:19:25,325 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-2 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:29,987 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-2 已经释放锁 /locks/seq-0000000023
2022-05-14 21:19:29,991 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-5-EventThread 成功与服务器建立连接!
2022-05-14 21:19:29,991 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-5-EventThread 监控到锁 /locks/seq-0000000023 已经被释放啦
2022-05-14 21:19:29,991 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-5 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:31,508 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-5 已经释放锁 /locks/seq-0000000024
2022-05-14 21:19:31,512 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-1-EventThread 成功与服务器建立连接!
2022-05-14 21:19:31,512 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-1-EventThread 监控到锁 /locks/seq-0000000024 已经被释放啦
2022-05-14 21:19:31,512 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-1 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:32,253 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-1 已经释放锁 /locks/seq-0000000025

Process finished with exit code 0

日志分为三部分来看:

可以看到:

  1. 五台服务器同时与 Zookeeper 服务端建立连接

  2. 4 号线程优先拿到锁了,因为他是 /locks/seq-0000000021 序号最小的

  3. 然后每个节点都监控了自己的前一个节点,等待前一个节点释放锁

    2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-5,当前节点:seq-0000000024,已监控前一个节点:/locks/seq-0000000023
    2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-1,当前节点:seq-0000000025,已监控前一个节点:/locks/seq-0000000024
    2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-2,当前节点:seq-0000000023,已监控前一个节点:/locks/seq-0000000022
    2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3,当前节点:seq-0000000022,已监控前一个节点:/locks/seq-0000000021
  4. 可以看到线程 3 节点 seq-0000000022监控了 /locks/seq-0000000021,所以 4 号线程释放锁之后,3 会拿到锁

  5. 可以看日志进行验证

    2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-3-EventThread 成功与服务器建立连接!
    2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-3-EventThread 监控到锁 /locks/seq-0000000021 已经被释放啦
    2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-3 已经成功拿到锁,正在处理自己的事情
    2022-05-14 21:19:25,321 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3 已经释放锁 /locks/seq-0000000022
  6. 依次类推,3 号线程释放锁,监控着 3 号的 2 号线程拿到锁;直到最后都处理完成

使用 Curator 框架的分布式锁

原生 API 开发存在的问题:

  1. 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
  2. Watch 需要重复注册,不然就不能生效
  3. 开发的复杂性还是比较高的

所以使用 Curator 实现好的分布式锁,方便且靠谱

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

参考:https://www.cnblogs.com/qlqwjy/p/10518900.html

package icu.sunnyc.zk.demo3;

import icu.sunnyc.zk.demo2.DistributedLock;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 使用 Curator 框架的分布式锁
 * @author :hc
 * @date :Created in 2022/5/14 21:33
 * @modified :
 */
public class CuratorLockTest {

    private static final Logger logger = LoggerFactory.getLogger(CuratorLockTest.class);

    public static void main(String[] args) throws InterruptedException {
        String rootNodePath = "/locks";
        Random random = new Random();
        // 并发拿锁测试
        // 客户端数量
        int threadNumber = 5;
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(threadNumber);
        ExecutorService threadPool = Executors.newFixedThreadPool(threadNumber);
        for (int i = 0; i < threadNumber; i++) {
            threadPool.submit(() -> {
                try {
                    // 阻塞住 让这个线程别跑
                    start.await();
                    InterProcessMutex lock = new InterProcessMutex(getCuratorFramework(), rootNodePath);
                    // 获取锁
                    lock.acquire();
                    logger.info("{} 已经成功拿到锁,正在处理自己的事情", Thread.currentThread().getName());
                    Thread.sleep(random.nextInt(5000));
                    // 释放锁
                    lock.release();
                    logger.info("{} 已经释放锁", Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    end.countDown();
                }
            });
        }
        start.countDown();
        logger.info("程序已启动");
        end.await();
        threadPool.shutdown();

    }

    private static CuratorFramework getCuratorFramework() {
        String zkServerPath = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
        // 重试策略 重试 3 次,每次间隔 5 秒
        RetryNTimes retryPolicy = new RetryNTimes(3, 5000);
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                // 连接创建超时时间
                .connectionTimeoutMs(2000)
                // 会话超时时间
                .sessionTimeoutMs(2000)
                .retryPolicy(retryPolicy).build();
        zkClient.start();
        logger.info("线程{} 连接已建立", Thread.currentThread().getName());
        return zkClient;
    }
}

Zookeeper实现分布式锁
https://www.powercheng.fun/articles/9edea0a3/
作者
powercheng
发布于
2022年5月14日
许可协议