0
点赞
收藏
分享

微信扫一扫

深入浅出Zookeeper之一Server启动


大名鼎鼎的Zookeeper是解决分布式问题的神器。小编最近简单阅读了代码,分享一下。有不对之处,还请大家指出。

整篇文章将分多个系列完成,因为涉及点比较多,很难在一片文章内搞定。关于zookeeper的使用场景,大家参考http://rdc.taobao.com/team/jm/archives/1232。api使用参考官网手http://zookeeper.apache.org/doc/trunk/。这里以最新的zookeeper3.4.5为例。

这个系列的第一篇来说说zookeeper server端的启动,以单机为例,分布式zookeeper将在后续专门分析。

单机版启动类ZooKeeperServerMain

 



1. protected void
2. throws
3.     {  
4. try
5.             ManagedUtil.registerLog4jMBeans();  
6. catch
7. "Unable to register log4j JMX control", e);  
8.         }  
9. //解析配置文件zoo.cfg
10. new
11. if (args.length == 1) {  
12. 0]);  
13. else
14.             config.parse(args);  
15.         }  
16. //启动
17.         runFromConfig(config);  
18.     }


 

 具体解析:

 



1. public void parse(String path) throws
2. new
3.       config.parse(path);  
4.   
5. // let qpconfig parse the file and then pull the stuff we are
6. // interested in
7.       readFrom(config);  
8.   }


 

 启动

 


1. public void runFromConfig(ServerConfig config) throws
2. "Starting server");  
3. try
4. // Note that this thread isn't going to be doing anything else,
5. // so rather than spawning another thread, we will just call
6. // run() in this thread.
7. // create a file logger url from the command line args
8. new
9.   
10. //2个文件,log和data文件
11. new FileTxnSnapLog(new
12. new
13.           zkServer.setTxnLogFactory(ftxn);  
14.           zkServer.setTickTime(config.tickTime);  
15.           zkServer.setMinSessionTimeout(config.minSessionTimeout);  
16.           zkServer.setMaxSessionTimeout(config.maxSessionTimeout);  
17. //连接工厂,默认NIOServerCnxnFactory
18.           cnxnFactory = ServerCnxnFactory.createFactory();  
19. //初始化主线程,打开selector,并bind端口,打开NIO的ACCEPT通知
20.           cnxnFactory.configure(config.getClientPortAddress(),  
21.                   config.getMaxClientCnxns());  
22. //并生成最新的snapshot文件,启动IO主线程,从snapshot文件和log文件中恢复内存database结构和session结构
23.           cnxnFactory.startup(zkServer);  
24. //启动线程等待之前启动的主线程结束
25.           cnxnFactory.join();  
26. if
27.               zkServer.shutdown();  
28.           }  
29. catch
30. // warn, but generally this is ok
31. "Server interrupted", e);  
32.       }  
33.   }



 

 具体startup流程:

 



1. public void startup(ZooKeeperServer zks) throws
2.            InterruptedException {  
3. //启动IO主线程
4.        start();  
5. //从log和snapshot回复database和session,并重新生成一个最新的snapshot文件
6.        zks.startdata();  
7. //启动sessionTracker线程,初始化IO请求的处理链,并启动每个processor线程
8.        zks.startup();  
9.        setZooKeeperServer(zks);  
10.    }


 

 具体恢复过程:

 



1. public void
2. throws
3. //check to see if zkDb is not null
4. if (zkDb == null) {  
5. //初始化database
6. new ZKDatabase(this.txnLogFactory);  
7.       }    
8. if
9.           loadData();  
10.       }  
11.   }


 

 DataTree用Map实现,key是节点名称,value是DataNode,DataNode从有parent指向父亲节点,有children指向所有孩子节点

 



1. public
2. /* Rather than fight it, let root have an alias */
3. //'/','/zookeeper','/zookeeper/quota'3个系统节点初始化
4. "", root);  
5.        nodes.put(rootZookeeper, root);  
6.   
7. /** add the proc node and quota node */
8.        root.addChild(procChildZookeeper);  
9.        nodes.put(procZookeeper, procDataNode);  
10.   
11.        procDataNode.addChild(quotaChildZookeeper);  
12.        nodes.put(quotaZookeeper, quotaDataNode);  
13.    }


 具体恢复数据

 



1. public void loadData() throws
2. //执行恢复,并返回最新的事务ID
3.        setZxid(zkDb.loadDataBase());  
4. // Clean up dead sessions
5. //清理session
6. new
7. for
8. if (zkDb.getSessionWithTimeOuts().get(session) == null) {  
9.                deadSessions.add(session);  
10.            }  
11.        }  
12. true);  
13. for (long
14. // XXX: Is lastProcessedZxid really the best thing to use?
15.            killSession(session, zkDb.getDataTreeLastProcessedZxid());  
16.        }  
17. //生成最新的snapshot文件
18. // Make a clean snapshot
19.        takeSnapshot();  
20.    }



 load过程:

 


1. public long loadDataBase() throws
2. oad过程中,发起分布式提议,对于单机版,先不考虑  
3. new
4. public void
5. new Request(null, 0, hdr.getCxid(),hdr.getType(),  
6. null, null);  
7.             r.txn = txn;  
8.             r.hdr = hdr;  
9.             r.zxid = hdr.getZxid();  
10.             addCommittedProposal(r);  
11.         }  
12.     };  
13. //load数据
14. long
15. true;  
16.


 

restore过程:

 



1. public long
2. throws
3. //从FileSnap中恢复
4.        snapLog.deserialize(dt, sessions);  
5. new
6. 1);  
7. long
8.        TxnHeader hdr;  
9. //从snapshot中记录的最新的事务开始处理,将log中的事务merge到datatree中
10. while (true) {  
11. // iterator points to 
12. // the first valid txn when initialized
13.            hdr = itr.getHeader();  
14. if (hdr == null) {  
15. //empty logs 
16. return
17.            }  
18. if (hdr.getZxid() < highestZxid && highestZxid != 0) {  
19. "(higestZxid) > "
20. "(next log) for type "
21.                        + hdr.getType());  
22. else
23.                highestZxid = hdr.getZxid();  
24.            }  
25. try
26.                processTransaction(hdr,dt,sessions, itr.getTxn());  
27. catch(KeeperException.NoNodeException e) {  
28. throw new IOException("Failed to process transaction type: "
29. " error: "
30.            }  
31.            listener.onTxnLoaded(hdr, itr.getTxn());  
32. if
33. break;  
34.        }  
35. return
36.    }



 FileSnap恢复过程:

 



1. public long
2. throws
3. // we run through 100 snapshots (not all of them)
4. // if we cannot get it running within 100 snapshots
5. // we should  give up
6. //找前100个snapshot文件,降序,最新的文件在最前面
7. 100);  
8. if (snapList.size() == 0) {  
9. return
10.        }  
11. //从最新的文件开始恢复,如果反序列化ok而且checksum也ok,则恢复结束
12. null;  
13. boolean foundValid = false;  
14. for (int i = 0; i < snapList.size(); i++) {  
15.            snap = snapList.get(i);  
16. null;  
17. null;  
18. try
19. "Reading snapshot "
20. new BufferedInputStream(new
21. new CheckedInputStream(snapIS, new
22.                InputArchive ia = BinaryInputArchive.getArchive(crcIn);  
23.                deserialize(dt,sessions, ia);  
24. long
25. long val = ia.readLong("val");  
26. if
27. throw new IOException("CRC corruption in snapshot :  "
28.                }  
29. true;  
30. break;  
31. catch(IOException e) {  
32. "problem reading snap file "
33. finally
34. if (snapIS != null)   
35.                    snapIS.close();  
36. if (crcIn != null)   
37.                    crcIn.close();  
38.            }   
39.        }  
40. if
41. throw new IOException("Not able to find valid snapshots in "
42.        }  
43. //snapshot文件名就记录着最新的zxid
44. "snapshot");  
45. return
46.    }



 单个事务处理:

 



1. public void
2.          Map<Long, Integer> sessions, Record txn)  
3. throws
4.      ProcessTxnResult rc;  
5. switch
6. 创建session  
7. case
8.          sessions.put(hdr.getClientId(),  
9.                  ((CreateSessionTxn) txn).getTimeOut());  
10.          ......  
11. // give dataTree a chance to sync its lastProcessedZxid
12.          rc = dt.processTxn(hdr, txn);  
13. break;  
14. case
15.          sessions.remove(hdr.getClientId());  
16. if
17.              ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,  
18. "playLog --- close session in log: 0x"
19.                              + Long.toHexString(hdr.getClientId()));  
20.          }  
21.          rc = dt.processTxn(hdr, txn);  
22. break;  
23. default:  
24.          rc = dt.processTxn(hdr, txn);  
25.      }  
26.   
27.      ......  
28.  }


 DataTree处理单个事务

 

1. public
2.     {  
3. new
4.   
5. try
6.             rc.clientId = header.getClientId();  
7.             rc.cxid = header.getCxid();  
8.             rc.zxid = header.getZxid();  
9.             rc.type = header.getType();  
10. 0;  
11. null;  
12. switch
13. case
14.                     CreateTxn createTxn = (CreateTxn) txn;  
15.                     rc.path = createTxn.getPath();  
16.                     createNode(  
17.                             createTxn.getPath(),  
18.                             createTxn.getData(),  
19.                             createTxn.getAcl(),  
20. 0,  
21.                             createTxn.getParentCVersion(),  
22.                             header.getZxid(), header.getTime());  
23. break;  
24. case
25.                     DeleteTxn deleteTxn = (DeleteTxn) txn;  
26.                     rc.path = deleteTxn.getPath();  
27.                     deleteNode(deleteTxn.getPath(), header.getZxid());  
28. break;  
29. case
30.                     SetDataTxn setDataTxn = (SetDataTxn) txn;  
31.                     rc.path = setDataTxn.getPath();  
32.                     rc.stat = setData(setDataTxn.getPath(), setDataTxn  
33.                             .getData(), setDataTxn.getVersion(), header  
34.                             .getZxid(), header.getTime());  
35. break;  
36.    》    ......  
37. /*
38.          * A snapshot might be in progress while we are modifying the data
39.          * tree. If we set lastProcessedZxid prior to making corresponding
40.          * change to the tree, then the zxid associated with the snapshot
41.          * file will be ahead of its contents. Thus, while restoring from
42.          * the snapshot, the restore method will not apply the transaction
43.          * for zxid associated with the snapshot file, since the restore
44.          * method assumes that transaction to be present in the snapshot.
45.          *
46.          * To avoid this, we first apply the transaction and then modify
47.          * lastProcessedZxid.  During restore, we correctly handle the
48.          * case where the snapshot contains data ahead of the zxid associated
49.          * with the file.
50.          */
51. //处理完事务后,再修改最新Zxid,如果是先修改Zxid再处理事务,修改完Zxid后,正好异步线程flush datatree
52. //此时由于事务并没有被处理,导致snapshot中的zxid比content新,而restore的时候是从最新zxid+1开始恢复的,从而
53. //导致丢数据
54. if
55.             lastProcessedZxid = rc.zxid;  
56.         }  
57.   
58.         ......  
59. return
60.     }


以上就完成了server的数据恢复过程,LSM的精华所在。

接下来server启动sessionTracker线程和请求处理链

 



1. protected void
2. new FinalRequestProcessor(this);  
3. new SyncRequestProcessor(this,  
4.                finalProcessor);  
5.        ((SyncRequestProcessor)syncProcessor).start();  
6. new PrepRequestProcessor(this, syncProcessor);  
7.        ((PrepRequestProcessor)firstProcessor).start();  
8.    }


 核心IO线程

 

1. public void
2. while
3. try
4. //select过程
5. 1000);  
6.                 Set<SelectionKey> selected;  
7. synchronized (this) {  
8.                     selected = selector.selectedKeys();  
9.                 }  
10. new
11.                         selected);  
12. //打乱顺序
13.                 Collections.shuffle(selectedList);  
14. for
15. //新连接进来,accept之
16. if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {  
17.                         SocketChannel sc = ((ServerSocketChannel) k  
18.                                 .channel()).accept();  
19.                         InetAddress ia = sc.socket().getInetAddress();  
20. int
21. //校验同个client连接数是否超过限制
22. if (maxClientCnxns > 0
23. "Too many connections from "
24. " - max is "
25.                             sc.close();  
26. else
27. "Accepted socket connection from "
28.                                      + sc.socket().getRemoteSocketAddress());  
29. //异步模式
30. false);  
31. //监听read事件
32.                             SelectionKey sk = sc.register(selector,  
33.                                     SelectionKey.OP_READ);  
34. //创建内部连接
35.                             NIOServerCnxn cnxn = createConnection(sc, sk);  
36.                             sk.attach(cnxn);  
37. //添加到连接表,方便后续统计
38.                             addCnxn(cnxn);  
39.                         }  
40.                     }   
41. //如果是read和write事件,则处理之
42. else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {  
43.                         NIOServerCnxn c = (NIOServerCnxn) k.attachment();  
44.                         c.doIO(k);  
45. else
46. if
47. "Unexpected ops in select "
48.                                       + k.readyOps());  
49.                         }  
50.                     }  
51.                 }  
52. //准备下次IO
53.                 selected.clear();  
54. catch
55. "Ignoring unexpected runtime exception", e);  
56. catch
57. "Ignoring exception", e);  
58.             }  
59.         }  
60.         closeAll();  
61. "NIOServerCnxn factory exited run method");  
62.     }


  具体io处理过程,将在后续结合实例来讲解。

至此server启动完成,就等待client去连接了。server启动核心功能就是从snapshot和log文件中恢复datatree,其核心就是zxid,典型的LSM应用。

举报

相关推荐

0 条评论