Zookeeper实现分布式锁

分别使用 Zookeeper 的原生 API 和 Curator 框架实现分布式锁

Zookeeper实现分布式锁

什么是分布式锁?

分布式环境下多个进程实例同时对同一个资源进行操作,为了解决这个问题,提出分布式锁;

进程访问资源,先获取锁,拿到锁代表有了对资源操作的权限了,其他没有拿到锁的进程需要等待;

这个锁可以是 Redis、Zookeeper,甚至也可以是数据库;

分布式锁的特点:

  • 互斥性:任何时刻,对于同一条数据,只有一台应用可以获取到分布式锁
  • 高可用性:提供分布式锁的服务需要做到高可用,小部分机器宕机不能影响正常使用
  • 防止锁超时:如果客户端没有释放锁,服务器会在一段时间之后自动释放锁,防止客户端宕机或网络异常时产生死锁
  • 独占性:加解锁必须由同一台机器进行,也就是谁加的锁,谁来释放,不能出现自己加的锁,但是别人释放了的情况

实现思路

整体思路:利用创建 Zookeeper 的临时有序节点,多个客户端同时创建临时有序的节点,谁的序号小(谁先创建)谁就拿到锁,其他序号依次监控自己的前一名进行等待,等待前一名释放锁,才会轮到自己;

整体过程:

  1. 客户端准备获取分布式锁,连接 Zookeeper 服务端,向/locks路径下创建一个临时的序列节点
  2. 获取/locks下所有的节点,看看自己是不是序号最小的节点:
    • 是:成功获取到锁,返回
    • 否:获取锁失败,对自己的前一个节点进行监听,阻塞直到监控到前一个节点发生delete事件时,代表轮到自己了,自己获取到锁,返回
  3. delete释放锁

环境准备

  1. 搭建一个普通的 maven 工程,引入相关依赖:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    <dependencies>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>RELEASE</version>
    </dependency>
    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.7</version>
    </dependency>

    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
    </dependency>

    <!-- 还需要引入 org.apache.zookeeper 不过上面已经引入了 -->

    </dependencies>
  2. 在项目的 resources 文件夹下创建 log4j.properties

    1
    2
    3
    4
    5
    6
    7
    8
    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n

使用 Zookeeper 原生 API 实现分布式锁

主要是三个方法:

  1. 构造方法:在构造方法里进行连接 Zookeeper 服务端,创建根节点,监听前一个节点事件
  2. 获取锁(加锁):就是上述的前两步骤,创建自己的专属锁路径,然后排序看看自己是不是第一个,是第一个代表成功加锁,然后返回,不是第一个就对前一个节点进行监控,然后阻塞等待直到前一个节点被释放掉
  3. 释放锁(解锁):就是delete掉自己创建的临时锁路径

具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package icu.sunnyc.zk.demo2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
* zk 原生 API 实现分布式锁 demo
* @author :hc
* @date :Created in 2022/5/14 13:13
* @modified
*/
public class DistributedLock {

private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);

/**
* 分布式锁根节点路径
*/
private final String rootNodePath = "/locks";

/**
* 分布式锁子节点路径
*/
private final String subNodePath = rootNodePath + "/seq-";

/**
* 用于监控是否与 zk 服务端完成建立连接
*/
private final CountDownLatch connectLatch = new CountDownLatch(1);

/**
* 用于监控前一个节点是否释放锁
*/
private final CountDownLatch waitLatch = new CountDownLatch(1);

/**
* 监控的前一个节点的 path
*/
private String waitPath = null;

/**
* 自己的专属锁路径
*/
private String myLockPath;

/**
* zk 客户端连接对象
*/
private final ZooKeeper zkClient;

public DistributedLock(String connectString, int sessionTimeout) throws IOException, InterruptedException,
KeeperException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 连接建立时 connectLatch 减一,唤醒后面 await 的内容,可以获取根节点状态做其他操作了
if (event.getState() == KeeperState.SyncConnected) {
connectLatch.countDown();
logger.info("{} 成功与服务器建立连接!", Thread.currentThread().getName());
}
// 发生了 waitPath 删除事件,也就是 watch 的前一个节点人家用完资源释放锁了
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
logger.info("线程 {} 监控到锁 {} 已经被释放啦", Thread.currentThread().getName(), waitPath);
waitLatch.countDown();
}
}
});
// 阻塞直到与服务端完成建立连接
connectLatch.await();
// 获取根节点状态信息
Stat rootNodeStat = zkClient.exists(rootNodePath, false);
// 根节点不存在,就创建一个根节点,供后续使用
if (rootNodeStat == null) {
logger.warn("根节点为空,正在创建根节点...");
String rootPath = zkClient.create(rootNodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
logger.info("根节点创建完毕,根节点路径:{}", rootPath);
}
}

/**
* 获取分布式锁 (加锁)
*/
public void getLock() {

try {
myLockPath = zkClient.create(subNodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取父节点下的所有子节点,看看自己是否是第一个
List<String> subNodeList = zkClient.getChildren(rootNodePath, false);
if (subNodeList.isEmpty()) {
logger.error("未知异常,获取到子节点为空");
return;
}
String myNodeName = myLockPath.split("/")[2];
// 排序
Collections.sort(subNodeList);
// 第一个是自己,代表已经成功获取到锁
if (myNodeName.equals(subNodeList.get(0))) {
return;
}
// 第一个不是自己,没能成功获取锁,找到自己的前一名,然后 watch 它,直到它被释放,自己就顺利接手锁
for (int i = 0; i < subNodeList.size(); i++) {
if (myNodeName.equals(subNodeList.get(i))) {
// watch 前一个节点路径
waitPath = rootNodePath + "/" + subNodeList.get(i - 1);
// 在 waitPath 上注册监听器,当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
zkClient.getData(waitPath, true, new Stat());
logger.info("线程:{},当前节点:{},已监控前一个节点:{}", Thread.currentThread().getName(),
myNodeName, waitPath);
}
}
// 上述已经 watch 了前一个锁了,此时只需要慢慢等待 waitPath 被释放就行
waitLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} catch (KeeperException e) {
e.printStackTrace();
}

}

/**
* 释放分布式锁 (解锁)
*/
public void releaseLock() {
// version -1 代表不管 version 多少,直接删除这个节点
try {
logger.info("线程:{} 已经释放锁 {}", Thread.currentThread().getName(), myLockPath);
zkClient.delete(myLockPath, -1);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}

分布式锁,并发获取锁测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package icu.sunnyc.zk.demo2;

import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 分布式锁 测试
* @author :hc
* @date :Created in 2022/5/14 14:13
* @modified
*/
public class DistributedLockTest {

private static final Logger logger = LoggerFactory.getLogger(DistributedLockTest.class);

public static void main(String[] args) throws InterruptedException {
Random random = new Random();
// 并发拿锁测试
// 客户端数量
int threadNumber = 5;
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(threadNumber);
ExecutorService threadPool = Executors.newFixedThreadPool(threadNumber);
for (int i = 0; i < threadNumber; i++) {
threadPool.submit(() -> {
try {
// 阻塞住 让这个线程别跑
start.await();
// 获取锁
DistributedLock distributedLock = new DistributedLock("hadoop102:2181,hadoop103:2181,hadoop104:2181", 2000);
distributedLock.getLock();
// sleep 一个随机数 做业务处理
logger.info("{} 已经成功拿到锁,正在处理自己的事情", Thread.currentThread().getName());
Thread.sleep(random.nextInt(5000));
distributedLock.releaseLock();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} catch (IOException | KeeperException e) {
e.printStackTrace();
} finally {
end.countDown();
}
});
}
start.countDown();
logger.info("程序已启动");
end.await();
threadPool.shutdown();
}
}

测试结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-3-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-1-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-2-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-5-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,767 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-4-EventThread 成功与服务器建立连接!
2022-05-14 21:19:22,793 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-4 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-5,当前节点:seq-0000000024,已监控前一个节点:/locks/seq-0000000023
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-1,当前节点:seq-0000000025,已监控前一个节点:/locks/seq-0000000024
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-2,当前节点:seq-0000000023,已监控前一个节点:/locks/seq-0000000022
2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3,当前节点:seq-0000000022,已监控前一个节点:/locks/seq-0000000021
2022-05-14 21:19:23,671 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-4 已经释放锁 /locks/seq-0000000021
2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-3-EventThread 成功与服务器建立连接!
2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-3-EventThread 监控到锁 /locks/seq-0000000021 已经被释放啦
2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-3 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:25,321 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3 已经释放锁 /locks/seq-0000000022
2022-05-14 21:19:25,324 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-2-EventThread 成功与服务器建立连接!
2022-05-14 21:19:25,325 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-2-EventThread 监控到锁 /locks/seq-0000000022 已经被释放啦
2022-05-14 21:19:25,325 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-2 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:29,987 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-2 已经释放锁 /locks/seq-0000000023
2022-05-14 21:19:29,991 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-5-EventThread 成功与服务器建立连接!
2022-05-14 21:19:29,991 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-5-EventThread 监控到锁 /locks/seq-0000000023 已经被释放啦
2022-05-14 21:19:29,991 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-5 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:31,508 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-5 已经释放锁 /locks/seq-0000000024
2022-05-14 21:19:31,512 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-1-EventThread 成功与服务器建立连接!
2022-05-14 21:19:31,512 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-1-EventThread 监控到锁 /locks/seq-0000000024 已经被释放啦
2022-05-14 21:19:31,512 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-1 已经成功拿到锁,正在处理自己的事情
2022-05-14 21:19:32,253 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-1 已经释放锁 /locks/seq-0000000025

Process finished with exit code 0

日志分为三部分来看:

可以看到:

  1. 五台服务器同时与 Zookeeper 服务端建立连接

  2. 4 号线程优先拿到锁了,因为他是 /locks/seq-0000000021 序号最小的

  3. 然后每个节点都监控了自己的前一个节点,等待前一个节点释放锁

    1
    2
    3
    4
    2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-5,当前节点:seq-0000000024,已监控前一个节点:/locks/seq-0000000023
    2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-1,当前节点:seq-0000000025,已监控前一个节点:/locks/seq-0000000024
    2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-2,当前节点:seq-0000000023,已监控前一个节点:/locks/seq-0000000022
    2022-05-14 21:19:22,803 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3,当前节点:seq-0000000022,已监控前一个节点:/locks/seq-0000000021
  4. 可以看到线程 3 节点 seq-0000000022监控了 /locks/seq-0000000021,所以 4 号线程释放锁之后,3 会拿到锁

  5. 可以看日志进行验证

    1
    2
    3
    4
    2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- pool-1-thread-3-EventThread 成功与服务器建立连接!
    2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程 pool-1-thread-3-EventThread 监控到锁 /locks/seq-0000000021 已经被释放啦
    2022-05-14 21:19:23,676 INFO [icu.sunnyc.zk.demo2.DistributedLockTest]- pool-1-thread-3 已经成功拿到锁,正在处理自己的事情
    2022-05-14 21:19:25,321 INFO [icu.sunnyc.zk.demo2.DistributedLock]- 线程:pool-1-thread-3 已经释放锁 /locks/seq-0000000022
  6. 依次类推,3 号线程释放锁,监控着 3 号的 2 号线程拿到锁;直到最后都处理完成

使用 Curator 框架的分布式锁

原生 API 开发存在的问题:

  1. 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
  2. Watch 需要重复注册,不然就不能生效
  3. 开发的复杂性还是比较高的

所以使用 Curator 实现好的分布式锁,方便且靠谱

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

参考:https://www.cnblogs.com/qlqwjy/p/10518900.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package icu.sunnyc.zk.demo3;

import icu.sunnyc.zk.demo2.DistributedLock;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 使用 Curator 框架的分布式锁
* @author :hc
* @date :Created in 2022/5/14 21:33
* @modified
*/
public class CuratorLockTest {

private static final Logger logger = LoggerFactory.getLogger(CuratorLockTest.class);

public static void main(String[] args) throws InterruptedException {
String rootNodePath = "/locks";
Random random = new Random();
// 并发拿锁测试
// 客户端数量
int threadNumber = 5;
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(threadNumber);
ExecutorService threadPool = Executors.newFixedThreadPool(threadNumber);
for (int i = 0; i < threadNumber; i++) {
threadPool.submit(() -> {
try {
// 阻塞住 让这个线程别跑
start.await();
InterProcessMutex lock = new InterProcessMutex(getCuratorFramework(), rootNodePath);
// 获取锁
lock.acquire();
logger.info("{} 已经成功拿到锁,正在处理自己的事情", Thread.currentThread().getName());
Thread.sleep(random.nextInt(5000));
// 释放锁
lock.release();
logger.info("{} 已经释放锁", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
} finally {
end.countDown();
}
});
}
start.countDown();
logger.info("程序已启动");
end.await();
threadPool.shutdown();

}

private static CuratorFramework getCuratorFramework() {
String zkServerPath = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
// 重试策略 重试 3 次,每次间隔 5 秒
RetryNTimes retryPolicy = new RetryNTimes(3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
// 连接创建超时时间
.connectionTimeoutMs(2000)
// 会话超时时间
.sessionTimeoutMs(2000)
.retryPolicy(retryPolicy).build();
zkClient.start();
logger.info("线程{} 连接已建立", Thread.currentThread().getName());
return zkClient;
}
}