0
点赞
收藏
分享

微信扫一扫

Zookeeper(7):ZooKeeper的分布式锁案例

yeamy 2022-04-13 阅读 63

目录

0. 相关文章链接

1. 什么是分布式锁

2. 原生 Zookeeper 实现分布式锁案例

2.1. 分布式锁实现

2.2. 分布式锁测试

3. Curator 框架实现分布式锁案例

3.1. 原生的 Java API 开发存在的问题

3.2. Curator 是一个专门解决分布式锁的框架,解决了原生 Java API 开发分布式遇到的问题

3.3. Curator 案例实操


0. 相关文章链接

大数据基础知识点 文章汇总

1. 什么是分布式锁

        什么叫做分布式锁呢?比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

2. 原生 Zookeeper 实现分布式锁案例

2.1. 分布式锁实现

package com.ouyang.zookeeper.case3;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @ date: 2022/4/7
 * @ author: yangshibiao
 * @ desc: 项目描述
 */
public class DistributedLock {

    // zookeeper server 列表
    private String connectString = "yangshibiao:2181";

    // 超时时间
    private int sessionTimeout = 2000;

    private ZooKeeper zk;
    private String rootNode = "locks";
    private String subNode = "seq-";

    // 当前 client 等待的子节点
    private String waitPath;

    //ZooKeeper 连接
    private CountDownLatch connectLatch = new CountDownLatch(1);

    //ZooKeeper 节点等待
    private CountDownLatch waitLatch = new CountDownLatch(1);

    // 当前 client 创建的子节点
    private String currentNode;

    // 和 zk 服务建立连接,并创建根节点
    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

            @Override
            public void process(WatchedEvent event) {

                // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
                if (event.getState() ==
                        Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // 发生了 waitPath 的删除事件
                if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        // 等待连接建立
        connectLatch.await();

        //获取根节点状态
        Stat stat = zk.exists("/" + rootNode, false);

        //如果根节点不存在,则创建根节点,根节点类型为永久节点
        if (stat == null) {
            System.out.println("根节点不存在");
            zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

    }

    // 加锁方法
    public void zkLock() {
        try {

            //在根节点下创建临时顺序节点,返回值为创建的节点路径
            currentNode = zk.create(
                    "/" + rootNode + "/" + subNode,
                    null,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL
            );

            // wait 一小会, 让结果更清晰一些
            Thread.sleep(10);

            // 注意, 没有必要监听"/locks"的子节点的变化情况
            List<String> childrenNodes = zk.getChildren("/" + rootNode, false);

            // 列表中只有一个子节点, 那肯定就是 currentNode , 说明client 获得锁
            if (childrenNodes.size() == 1) {
                return;
            } else {

                //对根节点下的所有临时顺序节点进行从小到大排序
                Collections.sort(childrenNodes);

                //当前节点名称
                String thisNode = currentNode.substring(("/" + rootNode + "/").length());

                //获取当前节点的位置
                int index = childrenNodes.indexOf(thisNode);
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    // index == 0, 说明 thisNode 在列表中最小, 当前client 获得锁
                    return;
                } else {

                    // 获得排名比 currentNode 前 1 位的节点
                    this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);

                    // 在 waitPath 上注册监听器, 当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
                    zk.getData(waitPath, true, new Stat());

                    //进入等待锁状态
                    waitLatch.await();

                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 解锁方法
    public void zkUnlock() {
        try {
            zk.delete(this.currentNode, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
}

2.2. 分布式锁测试

1)创建两个线程

package com.ouyang.zookeeper.case3;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

/**
 * @ date: 2022/4/7
 * @ author: yangshibiao
 * @ desc: 项目描述
 */
public class DistributedLockTest {
    public static void main(String[] args) throws
            InterruptedException, IOException, KeeperException {

        // 创建分布式锁 1
        final DistributedLock lock1 = new DistributedLock();

        // 创建分布式锁 2
        final DistributedLock lock2 = new DistributedLock();

        new Thread(new Runnable() {
            @Override
            public void run() {

                // 获取锁对象
                try {
                    lock1.zkLock();
                    System.out.println("线程 1 获取锁");
                    Thread.sleep(5 * 1000);
                    lock1.zkUnlock();
                    System.out.println("线程 1 释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {

                // 获取锁对象
                try {
                    lock2.zkLock();
                    System.out.println("线程 2 获取锁");
                    Thread.sleep(5 * 1000);
                    lock2.zkUnlock();
                    System.out.println("线程 2 释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

2)观察控制台变化:

线程 1 获取锁
线程 1 释放锁
线程 2 获取锁
线程 2 释放锁

3. Curator 框架实现分布式锁案例

3.1. 原生的 Java API 开发存在的问题

3.2. Curator 是一个专门解决分布式锁的框架,解决了原生 Java API 开发分布式遇到的问题

详情请查看官方文档: https://curator.apache.org/index.html

3.3. Curator 案例实操

1) 添加依赖

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.3.0</version>
        </dependency>

2) 代码实现

package com.ouyang.zookeeper.case4;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @ date: 2022/4/7
 * @ author: yangshibiao
 * @ desc: 项目描述
 */
public class CuratorLockTest {

    private String rootNode = "/locks";

    // zookeeper server 列表
    private String connectString = "yangshibiao:2181";

    // connection 超时时间
    private int connectionTimeout = 2000;

    // session 超时时间
    private int sessionTimeout = 2000;

    public static void main(String[] args) {
        new CuratorLockTest().test();
    }

    // 测试
    private void test() {

        // 创建分布式锁 1
        final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);

        // 创建分布式锁 2
        final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);

        new Thread(new Runnable() {
            @Override
            public void run() {

                // 获取锁对象
                try {
                    lock1.acquire();
                    System.out.println("线程 1 获取锁");

                    // 测试锁重入
                    lock1.acquire();
                    System.out.println("线程 1 再次获取锁");
                    Thread.sleep(5 * 1000);
                    lock1.release();
                    System.out.println("线程 1 释放锁");
                    lock1.release();
                    System.out.println("线程 1 再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {

                // 获取锁对象
                try {
                    lock2.acquire();
                    System.out.println("线程 2 获取锁");

                    // 测试锁重入
                    lock2.acquire();

                    System.out.println("线程 2 再次获取锁");
                    Thread.sleep(5 * 1000);
                    lock2.release();
                    System.out.println("线程 2 释放锁");
                    lock2.release();
                    System.out.println("线程 2 再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    // 分布式锁初始化
    public CuratorFramework getCuratorFramework() {

        //重试策略,初试时间 3 秒,重试 3 次
        RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);

        //通过工厂创建 Curator
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectString)
                        .connectionTimeoutMs(connectionTimeout)
                        .sessionTimeoutMs(sessionTimeout)
                        .retryPolicy(policy).build();

        //开启连接
        client.start();
        System.out.println("zookeeper 初始化完成...");
        return client;
    }
}

3)观察控制台变化

线程 1 获取锁
线程 1 再次获取锁
线程 1 释放锁
线程 1 再次释放锁
线程 2 获取锁
线程 2 再次获取锁
线程 2 释放锁
线程 2 再次释放锁

注:其他相关文章链接由此进(包括Zookeeper在内的各大数据组件相关博文) -> 大数据基础知识点 文章汇总


举报

相关推荐

0 条评论