文章目录
四、客户端常用命令
命令行基本语法 | 功能描述 |
---|---|
help | 显示所有操作命令 |
ls [-w] [-s] path | 使用 ls 命令来查看znode的子节点, -w 监听子节点变化 -s 附加次级信息 |
create [-s] [-e] node value | 创建节点 -s 带序列的(后面追加系统自增的数字) -e 临时节点,重启或者超时时消失 |
get [-w] [-s] path | 获得节点的值 -w 监听节点内容变化 -s 附加次级信息 |
set | 设置/修改节点的具体值 |
stat | 查看节点状态 |
delete | 删除节点 |
deleteall | 递归删除节点和其子节点 |
五、Zookeeper节点
5.1 节点数据信息
[zk: localhost:2181(CONNECTED) 0] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x9
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
- czxid: 创建节点的事务 zxid 每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。
- ctime: znode 被创建的毫秒数
- mzxid: znode 最后更新的事务 zxid
- mtime: znode 最后修改的毫秒数
- pZxid: znode 最后更新的子节点 zxid
- cversion: znode 子节点变化号,znode 子节点修改次数
- dataversion: znode 数据变化号
- aclVersion: znode 访问控制列表的变化号
- ephemeralOwner: 如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0
- dataLength: znode 的数据长度
- numChildren: znode 子节点数量
5.2 节点类型
5.3 监听器原理
客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端
监听机制保证 ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序
六、服务器动态上下线监听案例
需求:某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
新建Maven项目并导入依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
服务端:
/**
* @author Bandit
* @create 2022/2/19 23:06
* 无论是客户端还是服务端,对于zk来说都是客户端,只是操作不同而已
*/
public class DistributeServer {
private final String connectString = "192.168.2.128:2181,192.168.2.128:2182,192.168.2.128:2183";
private static int sessionTimeout = 60000;
private ZooKeeper zkClient = null;
private final String ParentPath = "/servers";
//获取到zookeeper的客户端连接
public void init() throws IOException {
zkClient = new ZooKeeper(connectString,sessionTimeout,watchedEvent -> {
});
}
//注册服务器到zk上
public void registerServer(String hostname) throws KeeperException, InterruptedException {
String created = zkClient.create(ParentPath+"/server", hostname.getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//模式为 -e -s,暂时的带序号的
System.out.println(hostname+"is online"+created);
}
//业务功能
public void business(String hostname) throws InterruptedException {
System.out.println(hostname+"正在服务。。");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeServer server = new DistributeServer();
server.init();
server.registerServer(args[0]);
server.business(args[0]);
}
}
客户端:
/**
* @author Bandit
* @create 2022/2/19 23:16
* 客户端监听 /servers 有谁上线了
*/
public class DistributeClient {
private final String connectString = "192.168.2.128:2181,192.168.2.128:2182,192.168.2.128:2183";
private static int sessionTimeout = 60000;
private ZooKeeper zkClient = null;
private final String ParentPath = "/servers";
//获取连接
public void init() throws IOException {
zkClient = new ZooKeeper(connectString,sessionTimeout,watchedEvent -> {
System.out.println(watchedEvent.getPath()+"路径下:"+watchedEvent.getType());
try {
System.out.println("开始监听。。。");
listen();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
//监听服务器端有哪些可用
public void listen() throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren(ParentPath, true);
System.out.println("可用的服务端:"+children);
}
//业务
public void business() throws InterruptedException {
System.out.println("开始业务");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient client = new DistributeClient();
client.init();
client.listen();
client.business();
}
}
/**
* @author Bandit
* @create 2022/2/19 23:16
* 客户端监听 /servers 有谁上线了
*/
public class DistributeClient {
private final String connectString = "192.168.2.128:2181,192.168.2.128:2182,192.168.2.128:2183";
private static int sessionTimeout = 60000;
private ZooKeeper zkClient = null;
private final String ParentPath = "/servers";
//获取连接
public void init() throws IOException {
zkClient = new ZooKeeper(connectString,sessionTimeout,watchedEvent -> {
System.out.println(watchedEvent.getPath()+"路径下:"+watchedEvent.getType());
try {
System.out.println("开始监听。。。");
listen();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
//监听服务器端有哪些可用
public void listen() throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren(ParentPath, true);
System.out.println("可用的服务端:"+children);
}
//业务
public void business() throws InterruptedException {
System.out.println("开始业务");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient client = new DistributeClient();
client.init();
client.listen();
client.business();
}
}
运行截图:
先启动一个客户端

然后依次启动三个服务端,服务端在jvm启动参数那设置arg[0]分别为192.168.2.128:2181
,192.168.2.128:2182
,192.168.2.128:2183

然后依次下线
七、分布式锁案例
7.1 锁原理
利用watch监听机制
多个客户端竞争锁,即各自创建自己的节点,用-s -e
方式,创建顺序临时节点,谁排在第一个就成功获取到锁,其他的就等待直到排到自己

7.2 原生API代码实现
public class DistributeLock {
private final String connectString = "127.0.0.1:2181";
private static int sessionTimeout = 60000;
private ZooKeeper zkClient = null;
private final String rootNode = "/locks";
private final String subNode = "/seq-";
private String waitPath; //当前节点等待的节点,就是前一个节点
private String currentNode;//当前client创建的子节点
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
public DistributeLock() throws IOException, KeeperException, InterruptedException {
zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
//建立连接的时候让一个线程唤醒,往下执行
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected){
connectLatch.countDown();
}
//发生了waitPath的删除事件
if(watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted &&
watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
});
connectLatch.await();
Stat stat = zkClient.exists(rootNode, false);//判断根节点状态
if(stat==null){
System.out.println("根节点不存在,开始创建");
zkClient.create(rootNode,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
//加锁
public void lock(){
try {
currentNode = zkClient.create(rootNode+subNode,null, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
// wait 一小会, 让结果更清晰一些
Thread.sleep(10);
//获取节点判断是否为第一位
List<String> children = zkClient.getChildren(rootNode, false);
System.out.println("检测到:"+children);
if (children.size()==1){
return;//如果只有一个节点,那肯定是当前节点
} else {
Collections.sort(children);
int index = children.indexOf(children);
if(index==0){//排序第一位就获得锁
return;
} else {
//获取前一位的节点
waitPath = rootNode + "/" + children.get(index-1);
//在 waitPath 上注册监听器, 当 waitPath 被删除时,
//zookeeper 会回调监听器的 process 方法
zkClient.getData(waitPath,true,new Stat());
waitLatch.await();
return;
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//解锁
public void unlock(){
try {
zkClient.delete(currentNode,-1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
测试方法:
public class TestLock {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
for (int i = 0; i < 5; i++) {
final DistributeLock lock = new DistributeLock();
new Thread(()->{
try {
lock.lock();
System.out.println(Thread.currentThread().getName()+"获取了锁");
Thread.sleep(5 * 1000);
lock.unlock();
System.out.println(Thread.currentThread().getName()+"释放了锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
},i+"").start();
}
}
}
运行截图:
如果没加锁,会所有线程一起拿到锁
加锁后:
排队获取锁
7.3 Curator框架实现
官网:Apache Curator
原生的 Java API 开发存在的问题
- 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
- Watch 需要重复注册,不然就不能生效
- 开发的复杂性还是比较高的
- 不支持多节点删除和创建。需要自己去递归
代码实现:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
public class CuratorLockTest {
private final static String rootNode = "/locks";
private final static String connectString = "39.106.87.37:2181";
private final static int sessionTimeout = 60000;
private final static int connectTimeout = 60000;
public static void main(String[] args) {
new CuratorLockTest().test();
}
public void test(){
for (int i = 0; i < 5; i++) {
final InterProcessLock lock = new InterProcessMutex(getCuratorFramework(), rootNode);
new Thread(()->{
try {
lock.acquire();
System.out.println(Thread.currentThread().getName()+"获得锁");
lock.acquire();//可重入
System.out.println(Thread.currentThread().getName()+"再次获得锁");
Thread.sleep(5*1000);
lock.release();
System.out.println(Thread.currentThread().getName()+"释放锁");
lock.release();
System.out.println(Thread.currentThread().getName()+"再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
},"线程"+i).start();
}
}
public CuratorFramework getCuratorFramework(){
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.connectionTimeoutMs(connectTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(3000, 3))//重试策略,初试3秒,重试3次
.build();
client.start();
System.out.println("zookeeper 初始化完成");
return client;
}
}
运行截图:
