0
点赞
收藏
分享

微信扫一扫

Zookeeper开源客户端框架Curator简介与示例


简介

Curator最初由Netflix的Jordan Zimmerman开发, Curator提供了一套Java类库, 可以更容易的使用ZooKeeper.

        所谓ZooKeeper技巧(ZooKeeper Recipes),也可以称之为解决方案, 或者叫实现方案, 是指ZooKeeper的使用方法, 比如分布式的配置管理, Leader选举等

Curator作为Apache ZooKeeper天生配套的组件。ZooKeeper的Java开发者自然而然的会选择它在项目中使用。

        官网链接:http://curator.apache.org/

Zookeeper开源客户端框架Curator简介与示例_apache

提供的功能组件

  1. Framework 提供了一套高级的API, 简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制
  2. Client是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法
  3. Recipes实现了通用ZooKeeper的recipe, 该组件建立在Framework的基础之上
  4. Utilities各种工具类
  5. Errors异常处理, 连接, 恢复等.
  6. Extensionscurator-recipes包实现了通用的技巧,这些技巧在ZooKeeper文档中有介绍。为了避免是这个包(package)变得巨大, recipes/applications将会放入一个独立的extension包下。并使用命名规则curator-x-name.

Curator 编译好的类库被发布到Maven Center中。Curator包含几个artifact. 你可以根据你的需要在你的项目中加入相应的依赖。对于大多数开发者来说,引入curator-recipes这一个就足够了



< dependency >
     < groupId >org.apache.curator</ groupId >
     < artifactId >curator-recipes</ artifactId >
     < version >2.6.0</ version >
</ dependency > 
< dependency >
     < groupId >org.apache.curator</ groupId >
     < artifactId >curator-client</ artifactId >
     < version >2.6.0</ version >
</ dependency >
< dependency >
     < groupId >org.apache.curator</ groupId >
     < artifactId >curator-framework</ artifactId >
     < version >2.6.0</ version >
</ dependency >




 

代码示例

 

import  java.util.List;
import  java.util.Map;
import  java.util.concurrent.TimeUnit;
 
import  org.apache.curator.framework.CuratorFramework;
import  org.apache.curator.framework.CuratorFrameworkFactory;
import  org.apache.curator.framework.api.CuratorEvent;
import  org.apache.curator.framework.api.CuratorListener;
import  org.apache.curator.framework.api.CuratorWatcher;
import  org.apache.curator.framework.api.GetChildrenBuilder;
import  org.apache.curator.framework.api.GetDataBuilder;
import  org.apache.curator.retry.ExponentialBackoffRetry;
import  org.apache.curator.utils.ZKPaths;
import  org.apache.zookeeper.WatchedEvent;
import  org.apache.zookeeper.Watcher;
import  org.apache.zookeeper.Watcher.Event.KeeperState;
import  org.apache.zookeeper.data.Stat;
 
import  com.google.common.base.Charsets;
import  com.google.common.base.Objects;
import  com.google.common.base.Strings;
import  com.google.common.collect.Lists;
import  com.google.common.collect.Maps;
 
 
/**
  * DateTime: 2015年1月9日 上午9:14:08
  *
  */
public  class  CuratorTest {
     private  static  String zkAddress =  "hadoop02:2181,hadoop03:2181,hadoop04:2181" ;
 
 
     public  static  void  main(String[] args)  throws  Exception {
         CuratorUtil curator =  new  CuratorUtil(zkAddress);
         curator.createNode( "/zkroot/test1" ,  "你好abc11" );
         curator.createNode( "/zkroot/test2" ,  "你好abc22" );
         curator.updateNode( "/zkroot/test2" ,  "你好abc22" );
         List<String> list = curator.listChildren( "/zkroot" );
         Map<String, String> map = curator.listChildrenDetail( "/zkroot" );
         // curator.deleteNode("/zkroot");
         // curator.destory();
         System.out.println( "=========================================" );
         for  (String str : list) {
             System.out.println(str);
         }
 
         System.out.println( "=========================================" );
         for  (Map.Entry<String, String> entry : map.entrySet()) {
             System.out.println(entry.getKey() +  "=>"  + entry.getValue());
         }
 
         // 增加监听
         curator.addWatch( "/zkroot" ,  false );
 
         TimeUnit.SECONDS.sleep( 600 );
     }
 
}
 
 
class  CuratorUtil {
     private  CuratorFramework client;
 
 
     public  CuratorUtil(String zkAddress) {
         client = CuratorFrameworkFactory.newClient(zkAddress,  new  ExponentialBackoffRetry( 1000 ,  3 ));
         client.getCuratorListenable().addListener( new  NodeEventListener());
         client.start();
     }
 
 
     /**
      * 创建node
      * 
      * @param nodeName
      * @param value
      * @return
      */
     public  boolean  createNode(String nodeName, String value) {
         boolean  suc =  false ;
         try  {
             Stat stat = getClient().checkExists().forPath(nodeName);
             if  (stat ==  null ) {
                 String opResult =  null ;
                 if  (Strings.isNullOrEmpty(value)) {
                     opResult = getClient().create().creatingParentsIfNeeded().forPath(nodeName);
                 }
                 else  {
                     opResult =
                             getClient().create().creatingParentsIfNeeded()
                                 .forPath(nodeName, value.getBytes(Charsets.UTF_8));
                 }
                 suc = Objects.equal(nodeName, opResult);
             }
         }
         catch  (Exception e) {
             e.printStackTrace();
         }
         return  suc;
     }
 
 
     /**
      * 更新节点
      * 
      * @param nodeName
      * @param value
      * @return
      */
     public  boolean  updateNode(String nodeName, String value) {
         boolean  suc =  false ;
         try  {
             Stat stat = getClient().checkExists().forPath(nodeName);
             if  (stat !=  null ) {
                 Stat opResult = getClient().setData().forPath(nodeName, value.getBytes(Charsets.UTF_8));
                 suc = opResult !=  null ;
             }
         }
         catch  (Exception e) {
             e.printStackTrace();
         }
         return  suc;
     }
 
 
     /**
      * 删除节点
      * 
      * @param nodeName
      */
     public  void  deleteNode(String nodeName) {
         try  {
             getClient().delete().deletingChildrenIfNeeded().forPath(nodeName);
         }
         catch  (Exception e) {
             e.printStackTrace();
         }
     }
 
 
     /**
      * 找到指定节点下所有子节点的名称与值
      * 
      * @param node
      * @return
      */
     public  Map<String, String> listChildrenDetail(String node) {
         Map<String, String> map = Maps.newHashMap();
         try  {
             GetChildrenBuilder childrenBuilder = getClient().getChildren();
             List<String> children = childrenBuilder.forPath(node);
             GetDataBuilder dataBuilder = getClient().getData();
             if  (children !=  null ) {
                 for  (String child : children) {
                     String propPath = ZKPaths.makePath(node, child);
                     map.put(child,  new  String(dataBuilder.forPath(propPath), Charsets.UTF_8));
                 }
             }
         }
         catch  (Exception e) {
             e.printStackTrace();
         }
         return  map;
     }
 
 
     /**
      * 列出子节点的名称
      * 
      * @param node
      * @return
      */
     public  List<String> listChildren(String node) {
         List<String> children = Lists.newArrayList();
         try  {
             GetChildrenBuilder childrenBuilder = getClient().getChildren();
             children = childrenBuilder.forPath(node);
         }
         catch  (Exception e) {
             e.printStackTrace();
         }
         return  children;
     }
 
 
     /**
      * 增加监听
      * 
      * @param node
      * @param isSelf
      *            true 为node本身增加监听 false 为node的子节点增加监听
      * @throws Exception
      */
     public  void  addWatch(String node,  boolean  isSelf)  throws  Exception {
         if  (isSelf) {
             getClient().getData().watched().forPath(node);
         }
         else  {
             getClient().getChildren().watched().forPath(node);
         }
     }
 
 
     /**
      * 增加监听
      * 
      * @param node
      * @param isSelf
      *            true 为node本身增加监听 false 为node的子节点增加监听
      * @param watcher
      * @throws Exception
      */
     public  void  addWatch(String node,  boolean  isSelf, Watcher watcher)  throws  Exception {
         if  (isSelf) {
             getClient().getData().usingWatcher(watcher).forPath(node);
         }
         else  {
             getClient().getChildren().usingWatcher(watcher).forPath(node);
         }
     }
 
 
     /**
      * 增加监听
      * 
      * @param node
      * @param isSelf
      *            true 为node本身增加监听 false 为node的子节点增加监听
      * @param watcher
      * @throws Exception
      */
     public  void  addWatch(String node,  boolean  isSelf, CuratorWatcher watcher)  throws  Exception {
         if  (isSelf) {
             getClient().getData().usingWatcher(watcher).forPath(node);
         }
         else  {
             getClient().getChildren().usingWatcher(watcher).forPath(node);
         }
     }
 
 
     /**
      * 销毁资源
      */
     public  void  destory() {
         if  (client !=  null ) {
             client.close();
         }
     }
 
 
     /**
      * 获取client
      * 
      * @return
      */
     public  CuratorFramework getClient() {
         return  client;
     }
 
}
 
 
// 监听器
final  class  NodeEventListener  implements  CuratorListener {
     @Override
     public  void  eventReceived(CuratorFramework client, CuratorEvent event)  throws  Exception {
         System.out.println(event.toString() +  "......................." );
         final  WatchedEvent watchedEvent = event.getWatchedEvent();
         if  (watchedEvent !=  null ) {
             System.out.println(watchedEvent.getState() +  "======================="  + watchedEvent.getType());
             if  (watchedEvent.getState() == KeeperState.SyncConnected) {
                 switch  (watchedEvent.getType()) {
                 case  NodeChildrenChanged:
                     // TODO
                     break ;
                 case  NodeDataChanged:
                     // TODO
                     break ;
                 default :
                     break ;
                 }
             }






举报

相关推荐

0 条评论