4.1 需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
4.2 需求分析
4.3 具体实现
(1)先在zk集群根目录上创建/servers 节点
[zk: localhost:2181(CONNECTED) 2] create /servers "servers"
(2)在 Idea 中创建包名:com.pcz.zkcase1
(3)服务器端向 Zookeeper 注册代码
package com.pcz.zkcase1;
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private String connectString="hadoop1:2181,hadoop2:2181,hadoop3:2181";
private int sessionTimeout=2000; //2000ms=2s
private ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeServer server = new DistributeServer();
//1.连接zk
server.getConnect();
//2.注册服务器到集群
server.regist(args[0]);
//3.启动业务逻辑(睡觉)
server.bussiness();
}
private void bussiness() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void regist(String hostname) throws KeeperException, InterruptedException {
String s = zk.create("/servers", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname+"已经上线~");
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
}
}
(4)客户端代码
package com.pcz.zkcase1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
private ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient client = new DistributeClient();
//1.连接zk集群
client.getConnect();
//2.监听/servers下的子节点变化(增加和删除)
client.getServerList();
//3.业务逻辑(睡觉)
client.bussiness();
}
private void bussiness() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getServerList() throws KeeperException, InterruptedException {
List<String> children = zk.getChildren("/servers", true);
//把主机名称封装到一个集合里(存储服务器信息列表)
ArrayList<String> servers = new ArrayList<>();
//循环遍历取出子节点里面的值
for (String child : children) {
//获取/server下面的对应的数据
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
System.out.println(servers);
}
private void getConnect() throws IOException {
String connectString="hadoop1:2181,hadoop2:2181,hadoop3:2181";
int sessionTimeout=2000;
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//循环调用监听器,可不用多次注册监听器
try {
getServerList();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
4.4 测试
1)在 Linux 命令行上操作增加减少服务器
(1)启动 DistributeClient 客户端
(2)在 hadoop2 上 zk 的客户端/servers 目录上创建临时带序号节点
[zk: localhost:2181(CONNECTED) 0] create -s -e /servers/server1 "FuWuQi1"
Created /servers/server10000000000
[zk: localhost:2181(CONNECTED) 1] create -s -e /servers/hadoop2 "hadoop2"
Created /servers/hadoop20000000001
[zk: localhost:2181(CONNECTED) 2] create -s -e /servers/hadoop3 "hadoop3"
Created /servers/hadoop30000000002
(3)观察 Idea 控制台变化
(4)执行删除操作
[zk: localhost:2181(CONNECTED) 5] delete /servers/server10000000000
(5)观察 Idea 控制台变化
2)在 Idea 上操作增加减少服务器
(1)启动 DistributeClient 客户端(如果已经启动过,不需要重启)
(2)启动 DistributeServer 服务
①点击 Edit Configurations…
②在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop5
③运行 DistributeServer
④观察 DistributeServer 控制台,提示hadoop5已经上线
⑤观察 DistributeClient 控制台,提示hadoop3 hadoop2 hadoop5 已经上线
hadoop2和hadoop3是本来就在线的节点