Zookeeper实现分布式锁
分别使用 Zookeeper 的原生 API 和 Curator 框架实现分布式锁
Zookeeper实现分布式锁
什么是分布式锁?
分布式环境下多个进程实例同时对同一个资源进行操作,为了解决这个问题,提出分布式锁;
进程访问资源,先获取锁,拿到锁代表有了对资源操作的权限了,其他没有拿到锁的进程需要等待;
这个锁可以是 Redis、Zookeeper,甚至也可以是数据库;
分布式锁的特点:
- 互斥性:任何时刻,对于同一条数据,只有一台应用可以获取到分布式锁
- 高可用性:提供分布式锁的服务需要做到高可用,小部分机器宕机不能影响正常使用
- 防止锁超时:如果客户端没有释放锁,服务器会在一段时间之后自动释放锁,防止客户端宕机或网络异常时产生死锁
- 独占性:加解锁必须由同一台机器进行,也就是谁加的锁,谁来释放,不能出现自己加的锁,但是别人释放了的情况
实现思路
整体思路:利用创建 Zookeeper 的临时有序节点,多个客户端同时创建临时有序的节点,谁的序号小(谁先创建)谁就拿到锁,其他序号依次监控自己的前一名进行等待,等待前一名释放锁,才会轮到自己;
![image-20220514210406839](/articles/9edea0a3/E92dDD0f1Bimage-20220514210406839.png)
整体过程:
- 客户端准备获取分布式锁,连接 Zookeeper 服务端,向
/locks
路径下创建一个临时的序列节点 - 获取
/locks
下所有的节点,看看自己是不是序号最小的节点:- 是:成功获取到锁,返回
- 否:获取锁失败,对自己的前一个节点进行监听,阻塞直到监控到前一个节点发生
delete
事件时,代表轮到自己了,自己获取到锁,返回
delete
释放锁
环境准备
搭建一个普通的 maven 工程,引入相关依赖:
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.7</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <!-- 还需要引入 org.apache.zookeeper 不过上面已经引入了 --> </dependencies>
在项目的 resources 文件夹下创建 log4j.properties
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n
使用 Zookeeper 原生 API 实现分布式锁
主要是三个方法:
- 构造方法:在构造方法里进行连接 Zookeeper 服务端,创建根节点,监听前一个节点事件
- 获取锁(加锁):就是上述的前两步骤,创建自己的专属锁路径,然后排序看看自己是不是第一个,是第一个代表成功加锁,然后返回,不是第一个就对前一个节点进行监控,然后阻塞等待直到前一个节点被释放掉
- 释放锁(解锁):就是
delete
掉自己创建的临时锁路径
具体代码:
package icu.sunnyc.zk.demo2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* zk 原生 API 实现分布式锁 demo
* @author :hc
* @date :Created in 2022/5/14 13:13
* @modified :
*/
public class DistributedLock {
private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
/**
* 分布式锁根节点路径
*/
private final String rootNodePath = "/locks";
/**
* 分布式锁子节点路径
*/
private final String subNodePath = rootNodePath + "/seq-";
/**
* 用于监控是否与 zk 服务端完成建立连接
*/
private final CountDownLatch connectLatch = new CountDownLatch(1);
/**
* 用于监控前一个节点是否释放锁
*/
private final CountDownLatch waitLatch = new CountDownLatch(1);
/**
* 监控的前一个节点的 path
*/
private String waitPath = null;
/**
* 自己的专属锁路径
*/
private String myLockPath;
/**
* zk 客户端连接对象
*/
private final ZooKeeper zkClient;
public DistributedLock(String connectString, int sessionTimeout) throws IOException, InterruptedException,
KeeperException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 连接建立时 connectLatch 减一,唤醒后面 await 的内容,可以获取根节点状态做其他操作了
if (event.getState() == KeeperState.SyncConnected) {
connectLatch.countDown();
logger.info("{} 成功与服务器建立连接!", Thread.currentThread().getName());
}
// 发生了 waitPath 删除事件,也就是 watch 的前一个节点人家用完资源释放锁了
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
logger.info("线程 {} 监控到锁 {} 已经被释放啦", Thread.currentThread().getName(), waitPath);
waitLatch.countDown();
}
}
});
// 阻塞直到与服务端完成建立连接
connectLatch.await();
// 获取根节点状态信息
Stat rootNodeStat = zkClient.exists(rootNodePath, false);
// 根节点不存在,就创建一个根节点,供后续使用
if (rootNodeStat == null) {
logger.warn("根节点为空,正在创建根节点...");
String rootPath = zkClient.create(rootNodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
logger.info("根节点创建完毕,根节点路径:{}", rootPath);
}
}
/**
* 获取分布式锁 (加锁)
*/
public void getLock() {
try {
myLockPath = zkClient.create(subNodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取父节点下的所有子节点,看看自己是否是第一个
List<String> subNodeList = zkClient.getChildren(rootNodePath, false);
if (subNodeList.isEmpty()) {
logger.error("未知异常,获取到子节点为空");
return;
}
String myNodeName = myLockPath.split("/")[2];
// 排序
Collections.sort(subNodeList);
// 第一个是自己,代表已经成功获取到锁
if (myNodeName.equals(subNodeList.get(0))) {
return;
}
// 第一个不是自己,没能成功获取锁,找到自己的前一名,然后 watch 它,直到它被释放,自己就顺利接手锁
for (int i = 0; i < subNodeList.size(); i++) {
if (myNodeName.equals(subNodeList.get(i))) {
// watch 前一个节点路径
waitPath = rootNodePath + "/" + subNodeList.get(i - 1);
// 在 waitPath 上注册监听器,当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
zkClient.getData(waitPath, true, new Stat());
logger.info("线程:{},当前节点:{},已监控前一个节点:{}", Thread.currentThread().getName(),
myNodeName, waitPath);
}
}
// 上述已经 watch 了前一个锁了,此时只需要慢慢等待 waitPath 被释放就行
waitLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} catch (KeeperException e) {
e.printStackTrace();
}
}
/**
* 释放分布式锁 (解锁)
*/
public void releaseLock() {
// version -1 代表不管 version 多少,直接删除这个节点
try {
logger.info("线程:{} 已经释放锁 {}", Thread.currentThread().getName(), myLockPath);
zkClient.delete(myLockPath, -1);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
分布式锁,并发获取锁测试:
package icu.sunnyc.zk.demo2;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 分布式锁 测试
* @author :hc
* @date :Created in 2022/5/14 14:13
* @modified :
*/
public class DistributedLockTest {
private static final Logger logger = LoggerFactory.getLogger(DistributedLockTest.class);
public static void main(String[] args) throws InterruptedException {
Random random = new Random();
// 并发拿锁测试
// 客户端数量
int threadNumber = 5;
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(threadNumber);
ExecutorService threadPool = Executors.newFixedThreadPool(threadNumber);
for (int i = 0; i < threadNumber; i++) {
threadPool.submit(() -> {
try {
// 阻塞住 让这个线程别跑
start.await();
// 获取锁
DistributedLock distributedLock = new DistributedLock("hadoop102:2181,hadoop103:2181,hadoop104:2181", 2000);
distributedLock.getLock();
// sleep 一个随机数 做业务处理
logger.info("{} 已经成功拿到锁,正在处理自己的事情", Thread.currentThread().getName());
Thread.sleep(random.nextInt(5000));
distributedLock.releaseLock();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} catch (IOException | KeeperException e) {
e.printStackTrace();
} finally {
end.countDown();
}
});
}
start.countDown();
logger.info("程序已启动");
end.await();
threadPool.shutdown();
}
}
测试结果:
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-3-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-1-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-2-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-5-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-4-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,793 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-4 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-5,当前节点:seq-0000000024,已监控前一个节点:/locks/seq-0000000023
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-1,当前节点:seq-0000000025,已监控前一个节点:/locks/seq-0000000024
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-2,当前节点:seq-0000000023,已监控前一个节点:/locks/seq-0000000022
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3,当前节点:seq-0000000022,已监控前一个节点:/locks/seq-0000000021
2022-05-14 21:19:23,671 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-4 已经释放锁 /locks/seq-0000000021
2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-3-EventThread 成功与服务器建立连接!
2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-3-EventThread 监控到锁 /locks/seq-0000000021 已经被释放啦
2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-3 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:25,321 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3 已经释放锁 /locks/seq-0000000022
2022-05-14 21:19:25,324 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-2-EventThread 成功与服务器建立连接!
2022-05-14 21:19:25,325 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-2-EventThread 监控到锁 /locks/seq-0000000022 已经被释放啦
2022-05-14 21:19:25,325 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-2 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:29,987 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-2 已经释放锁 /locks/seq-0000000023
2022-05-14 21:19:29,991 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-5-EventThread 成功与服务器建立连接!
2022-05-14 21:19:29,991 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-5-EventThread 监控到锁 /locks/seq-0000000023 已经被释放啦
2022-05-14 21:19:29,991 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-5 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:31,508 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-5 已经释放锁 /locks/seq-0000000024
2022-05-14 21:19:31,512 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-1-EventThread 成功与服务器建立连接!
2022-05-14 21:19:31,512 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-1-EventThread 监控到锁 /locks/seq-0000000024 已经被释放啦
2022-05-14 21:19:31,512 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-1 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:32,253 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-1 已经释放锁 /locks/seq-0000000025
Process finished with exit code 0
日志分为三部分来看:
![image-20220514212509966](/articles/9edea0a3/F7b7cf3fF3image-20220514212509966.png)
可以看到:
五台服务器同时与 Zookeeper 服务端建立连接
4 号线程优先拿到锁了,因为他是
/locks/seq-0000000021
序号最小的然后每个节点都监控了自己的前一个节点,等待前一个节点释放锁
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-5,当前节点:seq-0000000024,已监控前一个节点:/locks/seq-0000000023 2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-1,当前节点:seq-0000000025,已监控前一个节点:/locks/seq-0000000024 2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-2,当前节点:seq-0000000023,已监控前一个节点:/locks/seq-0000000022 2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3,当前节点:seq-0000000022,已监控前一个节点:/locks/seq-0000000021
可以看到线程 3 节点
seq-0000000022
监控了/locks/seq-0000000021
,所以 4 号线程释放锁之后,3 会拿到锁可以看日志进行验证
2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-3-EventThread 成功与服务器建立连接! 2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-3-EventThread 监控到锁 /locks/seq-0000000021 已经被释放啦 2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-3 已经成功拿到锁,正在处理自己的事情 2022-05-14 21:19:25,321 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3 已经释放锁 /locks/seq-0000000022
依次类推,3 号线程释放锁,监控着 3 号的 2 号线程拿到锁;直到最后都处理完成
使用 Curator 框架的分布式锁
原生 API 开发存在的问题:
- 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
- Watch 需要重复注册,不然就不能生效
- 开发的复杂性还是比较高的
所以使用 Curator 实现好的分布式锁,方便且靠谱
- InterProcessMutex:分布式可重入排它锁
- InterProcessSemaphoreMutex:分布式排它锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
参考:https://www.cnblogs.com/qlqwjy/p/10518900.html
package icu.sunnyc.zk.demo3;
import icu.sunnyc.zk.demo2.DistributedLock;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 使用 Curator 框架的分布式锁
* @author :hc
* @date :Created in 2022/5/14 21:33
* @modified :
*/
public class CuratorLockTest {
private static final Logger logger = LoggerFactory.getLogger(CuratorLockTest.class);
public static void main(String[] args) throws InterruptedException {
String rootNodePath = "/locks";
Random random = new Random();
// 并发拿锁测试
// 客户端数量
int threadNumber = 5;
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(threadNumber);
ExecutorService threadPool = Executors.newFixedThreadPool(threadNumber);
for (int i = 0; i < threadNumber; i++) {
threadPool.submit(() -> {
try {
// 阻塞住 让这个线程别跑
start.await();
InterProcessMutex lock = new InterProcessMutex(getCuratorFramework(), rootNodePath);
// 获取锁
lock.acquire();
logger.info("{} 已经成功拿到锁,正在处理自己的事情", Thread.currentThread().getName());
Thread.sleep(random.nextInt(5000));
// 释放锁
lock.release();
logger.info("{} 已经释放锁", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
} finally {
end.countDown();
}
});
}
start.countDown();
logger.info("程序已启动");
end.await();
threadPool.shutdown();
}
private static CuratorFramework getCuratorFramework() {
String zkServerPath = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
// 重试策略 重试 3 次,每次间隔 5 秒
RetryNTimes retryPolicy = new RetryNTimes(3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
// 连接创建超时时间
.connectionTimeoutMs(2000)
// 会话超时时间
.sessionTimeoutMs(2000)
.retryPolicy(retryPolicy).build();
zkClient.start();
logger.info("线程{} 连接已建立", Thread.currentThread().getName());
return zkClient;
}
}
Zookeeper实现分布式锁
https://www.powercheng.fun/articles/9edea0a3/