0
点赞
收藏
分享

微信扫一扫

zookeeper节点类型,整合代码实现服务器动态监听

大雁f 2022-03-12 阅读 75

节点类型

Linux中创建节点

创建永久节点名称为jiedian1,值为dadada,无序号

创建子节点

获取节点值

创建带序号的节点 只需要加-s

创建临时

临时带序号

此时 quit 退出后,临时节点会消失

修改节点

监听器原理

监听那个节点数据变化,若变化通知客户端

使用命令行操作,注册一次监听一次

此时用其他机器对节点进行修改,此时就会通知当前客户端

整合代码

创建maven工程,添加依赖

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>RELEASE</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>1.2.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.7</version>
    </dependency>
</dependencies>

配置日志文件,在resource下创建log4j.properties

log4j.rootLogger=INFO, stdout 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n 
log4j.appender.logfile=org.apache.log4j.FileAppender 
log4j.appender.logfile.File=target/spring.log 
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

编写测试类

public class zkClient {
    //不能有空格
    private static String connectString = "192.168.6.100:2181,192.168.6.101:2181,192.168.6.102:2181";
    //超时时间
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient = null;
    //接连集群
    @Before
    public void init() throws Exception {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new
                Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                 // 收到事件通知后的回调函数(用户的业务逻辑)
                        System.out.println(watchedEvent.getType() + "--"
                                + watchedEvent.getPath());
                    // 再次启动监听
                        try {
                            List<String> children = zkClient.getChildren("/",
                                    true);
                            for (String child : children) {
                                System.out.println(child);
                            }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
        });
    }
    // 创建子节点
    @Test
    public void create() throws Exception {
    // 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
        String nodeCreated = zkClient.create("/lzq",
                "jiedain".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);
    }
    // 获取子节点并监听  注册一次生效一次
    @Test
    public void getChildren() throws Exception {
        List<String> children = zkClient.getChildren("/", true);
        for (String child : children) {
            System.out.println(child);
        }
        // 延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

}

写数据流程

写流程之写入请求直接发送给Leader节点,直接写,然后同步到从节点,当超过半数写完,返回给客户端

写流程之写入请求发送给follower节点,通知主节点写。。。。

服务器动态上下线监听

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知 到主节点服务器的上下线

 先在集群上创建/servers 节点

IDEA中,服务器端向 Zookeeper 注册代码

import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
public class DistributeServer {
    private static String connectString =
            "192.168.6.100:2181,192.168.6.101:2181,192.168.6.102:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";
    // 创建到 zk 的客户端连接
    public void getConnect() throws IOException{
        zk = new ZooKeeper(connectString, sessionTimeout, new
                Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                    }
                });
    }
    // 注册服务器
    public void registServer(String hostname) throws Exception{
        String create = zk.create(parentNode + "/server",
                hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname +" is online "+ create);
    }
    // 业务功能
    public void business(String hostname) throws Exception{
        System.out.println(hostname + " is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }
    public static void main(String[] args) throws Exception {
    // 1 获取 zk 连接
        DistributeServer server = new DistributeServer();
        server.getConnect();
    // 2 利用 zk 连接注册服务器信息
        server.registServer(args[0]);
    // 3 启动业务功能
        server.business(args[0]);
    }
}

客户端

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DistributeClient {
    private static String connectString =
            "192.168.6.100:2181,192.168.6.101:2181,192.168.6.102:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";
    // 创建到 zk 的客户端连接
    public void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new
                Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
    // 再次启动监听
                        try {
                            getServerList();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
    }
    // 获取服务器列表信息
    public void getServerList() throws Exception {
        // 1 获取服务器子节点信息,并且对父节点进行监听
        List<String> children = zk.getChildren(parentNode, true);
        // 2 存储服务器信息列表
        ArrayList<String> servers = new ArrayList<>();
        // 3 遍历所有节点,获取节点中的主机名称信息
        for (String child : children) {
            byte[] data = zk.getData(parentNode + "/" + child,
                    false, null);
            servers.add(new String(data));
        }
        // 4 打印服务器列表信息
        System.out.println(servers);
    }
    // 业务功能
    public void business() throws Exception{
        System.out.println("client is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }
    public static void main(String[] args) throws Exception {
    // 1 获取 zk 连接
        DistributeClient client = new DistributeClient();
        client.getConnect();
    // 2 获取 servers 的子节点信息,从中获取服务器信息列表
        client.getServerList();
    // 3 业务进程启动
        client.business();
    }
}

此时修改或创建节点就会在控制台输出

举报

相关推荐

0 条评论