大名鼎鼎的Zookeeper是解决分布式问题的神器。小编最近简单阅读了代码,分享一下。有不对之处,还请大家指出。
整篇文章将分多个系列完成,因为涉及点比较多,很难在一片文章内搞定。关于zookeeper的使用场景,大家参考http://rdc.taobao.com/team/jm/archives/1232。api使用参考官网手http://zookeeper.apache.org/doc/trunk/。这里以最新的zookeeper3.4.5为例。
这个系列的第一篇来说说zookeeper server端的启动,以单机为例,分布式zookeeper将在后续专门分析。
单机版启动类ZooKeeperServerMain
1. protected void
2. throws
3. {
4. try
5. ManagedUtil.registerLog4jMBeans();
6. catch
7. "Unable to register log4j JMX control", e);
8. }
9. //解析配置文件zoo.cfg
10. new
11. if (args.length == 1) {
12. 0]);
13. else
14. config.parse(args);
15. }
16. //启动
17. runFromConfig(config);
18. }
具体解析:
1. public void parse(String path) throws
2. new
3. config.parse(path);
4.
5. // let qpconfig parse the file and then pull the stuff we are
6. // interested in
7. readFrom(config);
8. }
启动
1. public void runFromConfig(ServerConfig config) throws
2. "Starting server");
3. try
4. // Note that this thread isn't going to be doing anything else,
5. // so rather than spawning another thread, we will just call
6. // run() in this thread.
7. // create a file logger url from the command line args
8. new
9.
10. //2个文件,log和data文件
11. new FileTxnSnapLog(new
12. new
13. zkServer.setTxnLogFactory(ftxn);
14. zkServer.setTickTime(config.tickTime);
15. zkServer.setMinSessionTimeout(config.minSessionTimeout);
16. zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
17. //连接工厂,默认NIOServerCnxnFactory
18. cnxnFactory = ServerCnxnFactory.createFactory();
19. //初始化主线程,打开selector,并bind端口,打开NIO的ACCEPT通知
20. cnxnFactory.configure(config.getClientPortAddress(),
21. config.getMaxClientCnxns());
22. //并生成最新的snapshot文件,启动IO主线程,从snapshot文件和log文件中恢复内存database结构和session结构
23. cnxnFactory.startup(zkServer);
24. //启动线程等待之前启动的主线程结束
25. cnxnFactory.join();
26. if
27. zkServer.shutdown();
28. }
29. catch
30. // warn, but generally this is ok
31. "Server interrupted", e);
32. }
33. }
具体startup流程:
1. public void startup(ZooKeeperServer zks) throws
2. InterruptedException {
3. //启动IO主线程
4. start();
5. //从log和snapshot回复database和session,并重新生成一个最新的snapshot文件
6. zks.startdata();
7. //启动sessionTracker线程,初始化IO请求的处理链,并启动每个processor线程
8. zks.startup();
9. setZooKeeperServer(zks);
10. }
具体恢复过程:
1. public void
2. throws
3. //check to see if zkDb is not null
4. if (zkDb == null) {
5. //初始化database
6. new ZKDatabase(this.txnLogFactory);
7. }
8. if
9. loadData();
10. }
11. }
DataTree用Map实现,key是节点名称,value是DataNode,DataNode从有parent指向父亲节点,有children指向所有孩子节点
1. public
2. /* Rather than fight it, let root have an alias */
3. //'/','/zookeeper','/zookeeper/quota'3个系统节点初始化
4. "", root);
5. nodes.put(rootZookeeper, root);
6.
7. /** add the proc node and quota node */
8. root.addChild(procChildZookeeper);
9. nodes.put(procZookeeper, procDataNode);
10.
11. procDataNode.addChild(quotaChildZookeeper);
12. nodes.put(quotaZookeeper, quotaDataNode);
13. }
具体恢复数据
1. public void loadData() throws
2. //执行恢复,并返回最新的事务ID
3. setZxid(zkDb.loadDataBase());
4. // Clean up dead sessions
5. //清理session
6. new
7. for
8. if (zkDb.getSessionWithTimeOuts().get(session) == null) {
9. deadSessions.add(session);
10. }
11. }
12. true);
13. for (long
14. // XXX: Is lastProcessedZxid really the best thing to use?
15. killSession(session, zkDb.getDataTreeLastProcessedZxid());
16. }
17. //生成最新的snapshot文件
18. // Make a clean snapshot
19. takeSnapshot();
20. }
load过程:
1. public long loadDataBase() throws
2. oad过程中,发起分布式提议,对于单机版,先不考虑
3. new
4. public void
5. new Request(null, 0, hdr.getCxid(),hdr.getType(),
6. null, null);
7. r.txn = txn;
8. r.hdr = hdr;
9. r.zxid = hdr.getZxid();
10. addCommittedProposal(r);
11. }
12. };
13. //load数据
14. long
15. true;
16.
restore过程:
1. public long
2. throws
3. //从FileSnap中恢复
4. snapLog.deserialize(dt, sessions);
5. new
6. 1);
7. long
8. TxnHeader hdr;
9. //从snapshot中记录的最新的事务开始处理,将log中的事务merge到datatree中
10. while (true) {
11. // iterator points to
12. // the first valid txn when initialized
13. hdr = itr.getHeader();
14. if (hdr == null) {
15. //empty logs
16. return
17. }
18. if (hdr.getZxid() < highestZxid && highestZxid != 0) {
19. "(higestZxid) > "
20. "(next log) for type "
21. + hdr.getType());
22. else
23. highestZxid = hdr.getZxid();
24. }
25. try
26. processTransaction(hdr,dt,sessions, itr.getTxn());
27. catch(KeeperException.NoNodeException e) {
28. throw new IOException("Failed to process transaction type: "
29. " error: "
30. }
31. listener.onTxnLoaded(hdr, itr.getTxn());
32. if
33. break;
34. }
35. return
36. }
FileSnap恢复过程:
1. public long
2. throws
3. // we run through 100 snapshots (not all of them)
4. // if we cannot get it running within 100 snapshots
5. // we should give up
6. //找前100个snapshot文件,降序,最新的文件在最前面
7. 100);
8. if (snapList.size() == 0) {
9. return
10. }
11. //从最新的文件开始恢复,如果反序列化ok而且checksum也ok,则恢复结束
12. null;
13. boolean foundValid = false;
14. for (int i = 0; i < snapList.size(); i++) {
15. snap = snapList.get(i);
16. null;
17. null;
18. try
19. "Reading snapshot "
20. new BufferedInputStream(new
21. new CheckedInputStream(snapIS, new
22. InputArchive ia = BinaryInputArchive.getArchive(crcIn);
23. deserialize(dt,sessions, ia);
24. long
25. long val = ia.readLong("val");
26. if
27. throw new IOException("CRC corruption in snapshot : "
28. }
29. true;
30. break;
31. catch(IOException e) {
32. "problem reading snap file "
33. finally
34. if (snapIS != null)
35. snapIS.close();
36. if (crcIn != null)
37. crcIn.close();
38. }
39. }
40. if
41. throw new IOException("Not able to find valid snapshots in "
42. }
43. //snapshot文件名就记录着最新的zxid
44. "snapshot");
45. return
46. }
单个事务处理:
1. public void
2. Map<Long, Integer> sessions, Record txn)
3. throws
4. ProcessTxnResult rc;
5. switch
6. 创建session
7. case
8. sessions.put(hdr.getClientId(),
9. ((CreateSessionTxn) txn).getTimeOut());
10. ......
11. // give dataTree a chance to sync its lastProcessedZxid
12. rc = dt.processTxn(hdr, txn);
13. break;
14. case
15. sessions.remove(hdr.getClientId());
16. if
17. ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
18. "playLog --- close session in log: 0x"
19. + Long.toHexString(hdr.getClientId()));
20. }
21. rc = dt.processTxn(hdr, txn);
22. break;
23. default:
24. rc = dt.processTxn(hdr, txn);
25. }
26.
27. ......
28. }
DataTree处理单个事务
1. public
2. {
3. new
4.
5. try
6. rc.clientId = header.getClientId();
7. rc.cxid = header.getCxid();
8. rc.zxid = header.getZxid();
9. rc.type = header.getType();
10. 0;
11. null;
12. switch
13. case
14. CreateTxn createTxn = (CreateTxn) txn;
15. rc.path = createTxn.getPath();
16. createNode(
17. createTxn.getPath(),
18. createTxn.getData(),
19. createTxn.getAcl(),
20. 0,
21. createTxn.getParentCVersion(),
22. header.getZxid(), header.getTime());
23. break;
24. case
25. DeleteTxn deleteTxn = (DeleteTxn) txn;
26. rc.path = deleteTxn.getPath();
27. deleteNode(deleteTxn.getPath(), header.getZxid());
28. break;
29. case
30. SetDataTxn setDataTxn = (SetDataTxn) txn;
31. rc.path = setDataTxn.getPath();
32. rc.stat = setData(setDataTxn.getPath(), setDataTxn
33. .getData(), setDataTxn.getVersion(), header
34. .getZxid(), header.getTime());
35. break;
36. 》 ......
37. /*
38. * A snapshot might be in progress while we are modifying the data
39. * tree. If we set lastProcessedZxid prior to making corresponding
40. * change to the tree, then the zxid associated with the snapshot
41. * file will be ahead of its contents. Thus, while restoring from
42. * the snapshot, the restore method will not apply the transaction
43. * for zxid associated with the snapshot file, since the restore
44. * method assumes that transaction to be present in the snapshot.
45. *
46. * To avoid this, we first apply the transaction and then modify
47. * lastProcessedZxid. During restore, we correctly handle the
48. * case where the snapshot contains data ahead of the zxid associated
49. * with the file.
50. */
51. //处理完事务后,再修改最新Zxid,如果是先修改Zxid再处理事务,修改完Zxid后,正好异步线程flush datatree
52. //此时由于事务并没有被处理,导致snapshot中的zxid比content新,而restore的时候是从最新zxid+1开始恢复的,从而
53. //导致丢数据
54. if
55. lastProcessedZxid = rc.zxid;
56. }
57.
58. ......
59. return
60. }
以上就完成了server的数据恢复过程,LSM的精华所在。
接下来server启动sessionTracker线程和请求处理链
1. protected void
2. new FinalRequestProcessor(this);
3. new SyncRequestProcessor(this,
4. finalProcessor);
5. ((SyncRequestProcessor)syncProcessor).start();
6. new PrepRequestProcessor(this, syncProcessor);
7. ((PrepRequestProcessor)firstProcessor).start();
8. }
核心IO线程
1. public void
2. while
3. try
4. //select过程
5. 1000);
6. Set<SelectionKey> selected;
7. synchronized (this) {
8. selected = selector.selectedKeys();
9. }
10. new
11. selected);
12. //打乱顺序
13. Collections.shuffle(selectedList);
14. for
15. //新连接进来,accept之
16. if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
17. SocketChannel sc = ((ServerSocketChannel) k
18. .channel()).accept();
19. InetAddress ia = sc.socket().getInetAddress();
20. int
21. //校验同个client连接数是否超过限制
22. if (maxClientCnxns > 0
23. "Too many connections from "
24. " - max is "
25. sc.close();
26. else
27. "Accepted socket connection from "
28. + sc.socket().getRemoteSocketAddress());
29. //异步模式
30. false);
31. //监听read事件
32. SelectionKey sk = sc.register(selector,
33. SelectionKey.OP_READ);
34. //创建内部连接
35. NIOServerCnxn cnxn = createConnection(sc, sk);
36. sk.attach(cnxn);
37. //添加到连接表,方便后续统计
38. addCnxn(cnxn);
39. }
40. }
41. //如果是read和write事件,则处理之
42. else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
43. NIOServerCnxn c = (NIOServerCnxn) k.attachment();
44. c.doIO(k);
45. else
46. if
47. "Unexpected ops in select "
48. + k.readyOps());
49. }
50. }
51. }
52. //准备下次IO
53. selected.clear();
54. catch
55. "Ignoring unexpected runtime exception", e);
56. catch
57. "Ignoring exception", e);
58. }
59. }
60. closeAll();
61. "NIOServerCnxn factory exited run method");
62. }
具体io处理过程,将在后续结合实例来讲解。
至此server启动完成,就等待client去连接了。server启动核心功能就是从snapshot和log文件中恢复datatree,其核心就是zxid,典型的LSM应用。