0
点赞
收藏
分享

微信扫一扫

ZooKeeper学习总结【附代码】

phpworkerman 2022-04-05 阅读 73

目录

1、ZooKeeper概念

2、安装与配置

3、Zookerper命令操作

3.1 Zookeeper 数据模型

3.2 Zookeeper服务端常用命令

3.3 Zookeeper客户端常用命令

4、ZooKeeper JavaAPI操作

4.1 Curator 介绍

4.2 Curator API 常用操作

4.2.1 建立连接

4.2.2 添加节点

4.2.3 查询节点

4.2.4 修改节点

4.2.5 删除节点

4.2.6 Watch事件监听

4.3 分布式锁

4.3.1 Zookeeper分布式锁原理

4.4 模拟12306 售票案例

5、ZooKeeper集群搭建

6、代码仓库地址


1、ZooKeeper概念

Zookeeper 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小 猪)的管理员。简称zk。

Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。简单点说就是它是来管理分布式应用程序的,它自己不做事情,它是来管人的。

Zookeeper 提供的主要功能包括:

2、安装与配置

Linux版ZooKeeper安装_一切总会归于平淡的博客-CSDN博客

3、Zookerper命令操作

3.1 Zookeeper 数据模型

  1. ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。

  2. 这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。

  3. 节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。

  4. 节点可以分为四大类:

  • PERSISTENT 持久化节点

  • EPHEMERAL 临时节点 :-e

  • PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s

  • EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es

3.2 Zookeeper服务端常用命令

启动 ZooKeeper 服务

 ./zkServer.sh  start

查看 ZooKeeper 服务状态

./zkServer.sh status

停止 ZooKeeper 服务

./zkServer.sh stop

重启 ZooKeeper 服务

./zkServer.sh restart

3.3 Zookeeper客户端常用命令

连接ZooKeeper服务端

#./zkCli.sh –server ip:port
./zkCli.sh -server localhost:2181

 断开连接

quit

 

显示指定目录下节点

ls 目录

 创建节点

create /节点path value

 获取节点值

get /节点path

 设置节点值

set /节点path value

 删除单个节点

delete /节点path

 删除带有子节点的节点

deleteall /节点path

查看命令帮助

help

创建临时节点

create -e /节点path value

 创建顺序节点

create -s /节点path value

创建临时顺序节点

create -es /节点path value

 

ls2 /

 查询节点详细信息

ls –s /节点path

 

 

4、ZooKeeper JavaAPI操作

4.1 Curator 介绍

其实Curator 是一个 外来者,其实ZooKeeper 提供了很多Java客户端。

常见的ZooKeeper Java API :

  1. 原生Java API

  2. ZkClient

  3. Curator

Curator 项目的目标是简化 ZooKeeper 客户端的使用。 Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。 官网:Apache Curator –

4.2 Curator API 常用操作

4.2.1 建立连接

 

/**
     * @description:建立连接
     * @author: jie
     * @time: 2022/4/3 23:02
     */
    @Test
    void testConnect() {
        /**
         *  第一个参数 : 连接字符串
         *  第二个参数 : 会话超时时间
         *  第三个参数 : 连接超时时间
         *  第四个参数 : 重试策略
         *
         */
        //重试策略 参数: 每次休眠的时间,最大的重试次数
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        //1、第一种连接方式
//        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.58.150:2181",
//                60 * 1000, 15 * 1000, retryPolicy);
//        //2、开启连接
//        client.start();
        //第二种方式 CuratorFrameworkFactory.builder();
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.58.150:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).namespace("jie").build();
        //2、开启连接
        client.start();
    }

4.2.2 添加节点

1、创建节点

 /**
     * @description:创建节点 :节点类型 持久  临时 顺序 数据
     * @author: jie
     * @time: 2022/4/4 11:35
     */
    @Test
    void testCreate() {
        try {
            //1.基本创建 注:如果创建节点,没有指定数据。则默认将当前客户端的IP作为数据存储
            client.create().forPath("/app1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

2、创建节点 带数据

 @Test
    void testCreate2() {
        try {
            //1.创建节点 带有数据 注:如果创建节点,没有指定数据。则默认将当前客户端的IP作为数据存储
            client.create().forPath("/app2","求个关注".getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

 3、设置节点类型

@Test
    void testCreate3() {
        try {
            //设置节点的类型 默认类型:持久化 CreateMode.EPHEMERAL 临时节点,方法结束消失,因为只存在于一次会话
            client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3 ","求个关注".getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

4、创建多级节点

 @Test
    void testCreate4() {
        try {
            //创建多级节点 creatingParentsIfNeeded():如果父节点不存在,就先创建父节点
            client.create().creatingParentsIfNeeded().forPath("/app4/p1 ","求个关注".getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

4.2.3 查询节点

1、查询数据

/**
     * @description:查询节点
     * @author: jie
     * @time: 2022/4/4 12:04
     */
    @Test
    void testGet() {
        try {
            //查询数据
            byte[] bytes = client.getData().forPath("/app2");
            System.out.println(new String(bytes));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

2、查询子节点

 @Test
    void testGet2() {
        try {
            //查询子节点
            List<String> list = client.getChildren().forPath("/app4");
            System.out.println(list);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

3、查询节点状态信息

 @Test
    void testGet3() {
        try {
            Stat status = new Stat();
            System.out.println(status);
            //查询节点的状态信息
            client.getData().storingStatIn(status).forPath("/app1");
            System.out.println(status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

4.2.4 修改节点

1、修改数据

 /**
     * @description:修改数据
     * @author: jie
     * @time: 2022/4/4 12:20
     */
    @Test
    void testset() {
        try {
            client.setData().forPath("/app1","求个点赞".getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

2、根据版本修改

 @Test
    void testsetForVersion() {
        try {
            Stat status = new Stat();
            //查询节点的状态信息
            client.getData().storingStatIn(status).forPath("/app1");
            //version 是通过查询出来的,目的就是为了让其他客户端或者线程不干扰。
            int version = status.getVersion();
            System.out.println(version);
            //根据版本修改
            client.setData().withVersion(version).forPath("/app1","求个关注".getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

4.2.5 删除节点

1、删除单个节点

 /**
     * @description:删除节点
     * @author: jie
     * @time: 2022/4/4 12:30
     */
    @Test
    void testDelete() {
        //删除单个节点
        try {
            client.delete().forPath("/app1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

2、删除带有子节点的节点

  @Test
    void testDelete2() {
        try {
            //删除带有子节点的节点
            client.delete().deletingChildrenIfNeeded().forPath("/app4");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

3、保证其删除成功

 @Test
    void testDelete3() {
        //保证其删除成功
        try {
            client.delete().guaranteed().forPath("/app2");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

4、回调

 @Test
    void testDelete4() {
        //回调
        try {
            client.delete().guaranteed().inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                    System.out.println("被删除了");
                    System.out.println(curatorEvent);
                }
            }).forPath("/app1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

4.2.6 Watch事件监听

代码演示:

1、NodeCathe:给指定一个节点注册监听器

  /**
     * @description:演示 NodeCathe:给指定一个节点注册监听器
     * @author: jie
     * @time: 2022/4/4 12:53
     */
    @Test
    void testNodeCathe() throws Exception {
        //1、创建NodeCath对象
        NodeCache nodeCache = new NodeCache(client,"/app1");
        //2、注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点变化了~");
                //获取修改节点后的数据
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println("最新的数据:"+new String(data));
            }
        });
        //3、开启监听 如果设置为True,则开启监听,加载缓冲数据
        nodeCache.start(true);
        while (true){

        }
    }

2、 PathChildrenCache:监听某个节点的所有子节点

 /**
     * @description:演示 PathChildrenCache:监听某个节点的所有子节点
     * @author: jie
     * @time: 2022/4/4 12:53
     */
    @Test
    void testPathChildrenCache() throws Exception {
        //创建监听对象 连接 地址 是否缓存
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
        //绑定监听器
         pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
             @Override
             public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                 System.out.println("子节点变化了");
                 System.out.println(pathChildrenCacheEvent);
                 //监听子节点的 数据变更,并且拿到变更后的数据
                 //1、获取类型
                 PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
                 //2、判断类型是否是update
                 if(type.equals((PathChildrenCacheEvent.Type.CHILD_UPDATED))){
                     System.out.println("数据变了!!!");
                     byte[] data = pathChildrenCacheEvent.getData().getData();
                     System.out.println(new String(data));
                 }
             }
         });
        //开启
        pathChildrenCache.start();
        while (true){
        }
    }

3、ThreeCathe():监听某个节点和所有子节点们

/**
     * @description:ThreeCathe():监听某个节点和所有子节点们
     * @author: jie
     * @time: 2022/4/4 13:33
     */
    @Test
    void testThreeCathe() throws Exception {
        //创建监听器
        TreeCache treeCache = new TreeCache(client,"/app2");

        //注册监听
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("节点变化了");
                System.out.println(treeCacheEvent);
            }
        });
        //开启
        treeCache.start();
        while (true){
        }
    }

4.3 分布式锁

 

 

4.3.1 Zookeeper分布式锁原理

 

这里有三个客户端,它们都和ZooKeeper Server连接起来的,ZooKeeper Server里面现在又三个节点,现在client1如果想获取锁,那他就可以在/lock节点下创建一个节点,就代表获取锁了,用完了之后,释放资源,再把这个节点删除掉。

4.4 模拟12306 售票案例

在Curator中有五种实现分布式锁方案:

接下来是代码演示,我这里为了方便,就不搞好几台机器了,就简单的模拟一下。

12306 实体类

package com.jie.curatorzk;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class Ticket12306 implements Runnable {

    private int tickets = 10;//数据库的票数

    private InterProcessMutex lock;

    public Ticket12306() {
        //重试策略 参数: 每次休眠的时间,最大的重试次数
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.58.150:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).build();
        //2、开启连接
        client.start();
        lock = new InterProcessMutex(client,"/lock");
    }

    @Override
    public void run() {
        while (true) {
            try {
                //获取锁 参数 时间 时间单位
                lock.acquire(3, TimeUnit.SECONDS);
                if (tickets > 0) {
                    System.out.println(Thread.currentThread() + ":" + tickets);
                    Thread.sleep(1000);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

测试类

package com.jie.curatorzk;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class LoockTest {
    public static void main(String[] args) {
        Ticket12306 ticket12306 = new Ticket12306();
        //创建客户端
        Thread t1 = new Thread(ticket12306,"携程");
        Thread t2 = new Thread(ticket12306,"飞猪");

        t1.start();
        t2.start();
    }
}

代码实现还是比较简单的,主要是要知道原理。

5、ZooKeeper集群搭建

 

 

这里有这么多台机器,那么多台机器,那么到底谁说得算呢?那么ZooKeeper搭建集群第一步,就是要从这么多台机器中选举出一台领导者。

Leader选举:

  1. Serverid:服务器ID, 比如有三台服务器,编号分别是1,2,3;编号越大在选择算法中的权重越大。

  2. Zxid:数据ID;服务器中存放的最大数据ID.值越大说明数据 越新,在选举算法中数据越新权重越大。

在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票, 则此ZooKeeper就可以成为Leader了。

搭建教程:

ZooKeeper集群搭建_一切总会归于平淡的博客-CSDN博客

ZooKeeper集群角色介绍

在ZooKeeper集群服中务中有三个角色: Leader 领导者 :

Follower 跟随者 :

Observer 观察者:

6、代码仓库地址

ZooKeeper学习: ZooKeeper学习代码

 

举报

相关推荐

0 条评论