一、集成
<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-test</artifactId>
        <version>2.4.2</version>
    </dependency>
</dependencies>二、简单应用
2.1、同步
public class zkCliTest {
    private static CuratorFramework client;
    private String path = "/zk-book/c1";
    static {
        //重试策略,可以通过实现RetryPolicy接口来定义自己的策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base") //指定命名空间,则下面所有操作都是基于base来进行的
                .build();
        client.start();
    }
    /*withMode, 这里区分临时节点和持久化节点*/
    
    public void nodeCreate() throws Exception {
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
    }
    
    public void nodeRead() throws Exception {
        Stat stat = new Stat();
        System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
    }
    
    public void nodeDel() throws Exception {
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        client.delete().deletingChildrenIfNeeded()
                .withVersion(stat.getVersion()).forPath(path);
    }
public void nodeSet() throws Exception {
    Stat stat = new Stat();
    client.getData().storingStatIn(stat).forPath(path);
    System.out.println("Success set node for : " + path + ", new version: "
            + client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());
    try {
        client.setData().withVersion(stat.getVersion()).forPath(path);
    } catch (Exception e) {
        System.out.println("Fail set node due to " + e.getMessage());
    }
}
}2.2、异步
在Zookeeper中,所有的异步通知事件处理都是由EventThread这个线程来处理的,EventThread线程用于串行处理所有的事件通知,其可以保证对事件处理的顺序性,但是一旦碰上复杂的处理单元,会消耗过长的处理时间,从而影响其他事件的处理,Curator允许用户传入Executor实例,这样可以将比较复杂的事件处理放到一个专门的线程池中去。
public class ZkBackTest {
    private static String path = "/zk-book/c1";
    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
    public static void main(String[] args) throws Exception {
        client.start();
        /*
        * Main thread: main
            event[code: -110, type: CREATE]
            Thread of processResult: main-EventThread
            event1[code: -110, type: CREATE]
            Thread of processResult: pool-3-thread-1
        * */
        System.out.println("Main thread: " + Thread.currentThread().getName());
        final CountDownLatch semaphore = new CountDownLatch(2);
        ExecutorService tp = Executors.newFixedThreadPool(2);
        /**
         * BackgroundCallback接口 用来处理异步接口调用之后服务端返回的结果
         * */
        // 此处传入了自定义的Executor
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event1[code: " + event.getResultCode() + ", type: " + event.getType() + "]");
                System.out.println("Thread of processResult: " + Thread.currentThread().getName());
                semaphore.countDown();
            }
        }, tp).forPath(path, "init".getBytes());
        // 此处没有传入自定义的Executor
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");
                System.out.println("Thread of processResult: " + Thread.currentThread().getName());
                semaphore.countDown();
            }
        }).forPath(path, "init".getBytes());
        semaphore.await();
        tp.shutdown();
    }
}2.3、ZKPaths
//其提供了简单的API来构建znode路径、递归创建、删除节点等。
public class ZKPaths_Sample {
   static String path = "/curator_zkpath_sample";
   static CuratorFramework client = CuratorFrameworkFactory.builder()
         .connectString( "domain1.book.zookeeper:2181" )
         .sessionTimeoutMs( 5000 )
         .retryPolicy( new ExponentialBackoffRetry( 1000, 3 ) )
         .build();
   
   public static void main(String[] args) throws Exception {
      client.start();
      ZooKeeper zookeeper = client.getZookeeperClient().getZooKeeper();
      
      System.out.println(ZKPaths.fixForNamespace(path,"sub"));
      System.out.println(ZKPaths.makePath(path, "sub"));
      System.out.println( ZKPaths.getNodeFromPath( "/curator_zkpath_sample/sub1" ) );
      
      PathAndNode pn = ZKPaths.getPathAndNode( "/curator_zkpath_sample/sub1" );
      System.out.println(pn.getPath());
      System.out.println(pn.getNode());
      
      String dir1 = path + "/child1";
      String dir2 = path + "/child2";
      ZKPaths.mkdirs(zookeeper, dir1);
      ZKPaths.mkdirs(zookeeper, dir2);
      System.out.println(ZKPaths.getSortedChildren( zookeeper, path ));
      
      ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
   }
}2.4、EnsurePath
//其提供了一种能够确保数据节点存在的机制,当上层业务希望对一个数据节点进行操作时,操作前需要确保该节点存在
public class EnsurePathDemo {
    static String path = "/zk-book/c1";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
   public static void main(String[] args) throws Exception {
      
      client.start();
      client.usingNamespace( "zk-book" );
      
      EnsurePath ensurePath = new EnsurePath(path);
      ensurePath.ensure(client.getZookeeperClient());
      ensurePath.ensure(client.getZookeeperClient());   
      
      EnsurePath ensurePath2 = client.newNamespaceAwareEnsurePath("/c1");
      ensurePath2.ensure(client.getZookeeperClient());
   }
}三、高级应用
3.1、分布式锁
//使用Curator实现分布式锁功能
public class Recipes_Lock {
   static String lock_path = "/curator_recipes_lock_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
   public static void main(String[] args) throws Exception {
      client.start();
      final InterProcessMutex lock = new InterProcessMutex(client,lock_path);
      //关卡
      final CountDownLatch down = new CountDownLatch(1);
      for(int i = 0; i < 30; i++){
         new Thread(new Runnable() {
            public void run() {
               try {
                  down.await();//阻塞
                  lock.acquire();
               } catch ( Exception e ) {}
               SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
               String orderNo = sdf.format(new Date());
               System.out.println("生成的订单号是 : "+orderNo);
               try {
                  lock.release();
               } catch ( Exception e ) {}
            }
         }).start();
      }
      down.countDown();//+1
   }
}3.2、计时器
public class Recipes_DistAtomicInt {
   static String distatomicint_path = "/curator_recipes_distatomicint_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
   public static void main( String[] args ) throws Exception {
      client.start();
      DistributedAtomicInteger atomicInteger = 
      new DistributedAtomicInteger( client, distatomicint_path, 
                           new RetryNTimes( 3, 1000 ) );
      AtomicValue<Integer> rc = atomicInteger.add( 8 );
      System.out.println( "Result: " + rc.succeeded() );
   }
}3.3、分布式Barrier
//使用Curator实现分布式Barrier
public class Recipes_Barrier {
   static String barrier_path = "/curator_recipes_barrier_path";
   static DistributedBarrier barrier;
   public static void main(String[] args) throws Exception {
      for (int i = 0; i < 5; i++) {
         new Thread(new Runnable() {
            public void run() {
               try {
                  CuratorFramework client = CuratorFrameworkFactory.builder()
                           .connectString("domain1.book.zookeeper:2181")
                           .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
                  client.start();
                  barrier = new DistributedBarrier(client, barrier_path);
                  System.out.println(Thread.currentThread().getName() + "号barrier设置" );
                  barrier.setBarrier();
                  barrier.waitOnBarrier();
                  System.err.println("启动...");
               } catch (Exception e) {}
            }
         }).start();
      }
      Thread.sleep( 2000 );
      barrier.removeBarrier();
   }
}public class Recipes_Barrier2 {
   static String barrier_path = "/curator_recipes_barrier_path";
   public static void main(String[] args) throws Exception {
      
      for (int i = 0; i < 5; i++) {
         new Thread(new Runnable() {
            public void run() {
               try {
                  CuratorFramework client = CuratorFrameworkFactory.builder()
                           .connectString("domain1.book.zookeeper:2181")
                           .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
                  client.start();
                  DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, barrier_path,5);
                  Thread.sleep( Math.round(Math.random() * 3000) );
                  System.out.println(Thread.currentThread().getName() + "号进入barrier" );
                  barrier.enter();
                  System.out.println("启动...");
                  Thread.sleep( Math.round(Math.random() * 3000) );
                  barrier.leave();
                  System.out.println( "退出..." );
               } catch (Exception e) {}
            }
         }).start();
      }
   }
}public class Recipes_CyclicBarrier {
   public static CyclicBarrier barrier = new CyclicBarrier( 3 );
   public static void main( String[] args ) throws IOException, InterruptedException {
      ExecutorService executor = Executors.newFixedThreadPool( 3 );
      executor.submit( new Thread( new Runner( "1号选手" ) ) );
      executor.submit( new Thread( new Runner( "2号选手" ) ) );
      executor.submit( new Thread( new Runner( "3号选手" ) ) );
      executor.shutdown();
   }
}
class Runner implements Runnable {
   private String name;
   public Runner( String name ) {
      this.name = name;
   }
   public void run() {
      System.out.println( name + " 准备好了." );
      try {
         Recipes_CyclicBarrier.barrier.await();
      } catch ( Exception e ) {}
      System.out.println( name + " 起跑!" );
   }
}四、节点管理
4.1、监听
final NodeCache cache = new NodeCache(client,path,false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
    public void nodeChanged() throws Exception {
        System.out.println("Node data update, new data: " +
                new String(cache.getCurrentData().getData()));
    }
});PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
    public void childEvent(CuratorFramework client,
                           PathChildrenCacheEvent event) throws Exception {
        switch (event.getType()) {
            case CHILD_ADDED:
                System.out.println("CHILD_ADDED," + event.getData().getPath());
                break;
            case CHILD_UPDATED:
                System.out.println("CHILD_UPDATED," + event.getData().getPath());
                break;
            case CHILD_REMOVED:
                System.out.println("CHILD_REMOVED," + event.getData().getPath());
                break;
            default:
                break;
        }
    }
});4.2、选举
String master_path = "/curator_recipes_master_path";
LeaderSelector selector = new LeaderSelector(client,
        master_path,
        new LeaderSelectorListenerAdapter() {
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println("成为Master角色");
                Thread.sleep( 3000 );
                System.out.println( "完成Master操作,释放Master权利" );
            }
        });
selector.autoRequeue();
selector.start();
Thread.sleep( Integer.MAX_VALUE );五、测试管理
5.1、单机模拟
public class TestingServer_Sample {
   static String path = "/zookeeper";
   public static void main(String[] args) throws Exception {
      TestingServer server = new TestingServer(2181,new File("/home/admin/zk-book-data"));
      
      CuratorFramework client = CuratorFrameworkFactory.builder()
               .connectString(server.getConnectString())
               .sessionTimeoutMs(5000)
               .retryPolicy(new ExponentialBackoffRetry(1000, 3))
               .build();
        client.start();
        System.out.println( client.getChildren().forPath( path ));
        server.close();
   }
}5.2、集群模拟
public class TestingCluster_Sample {
   public static void main(String[] args) throws Exception {
      TestingCluster cluster = new TestingCluster(3);
      cluster.start();
      Thread.sleep(2000);
      
      TestingZooKeeperServer leader = null;
      for(TestingZooKeeperServer zs : cluster.getServers()){
         System.out.print(zs.getInstanceSpec().getServerId()+"-");
         System.out.print(zs.getQuorumPeer().getServerState()+"-");  
         System.out.println(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());
         if( zs.getQuorumPeer().getServerState().equals( "leading" )){
            leader = zs;
         }
      }
        leader.kill();
        System.out.println( "--After leader kill:" );
        for(TestingZooKeeperServer zs : cluster.getServers()){
         System.out.print(zs.getInstanceSpec().getServerId()+"-");
         System.out.print(zs.getQuorumPeer().getServerState()+"-");  
         System.out.println(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());
      }
        cluster.stop();
   }
}








