前一篇介绍了zookeeper的client和server之间session是如何建立的。在DataMonitor的例子中,DataMonitor通过exists异步接口和server端交互,本文将介绍exists操作是如何完成。
dataMonitor开始exist操作
1. public void exists(final
2. StatCallback cb, Object ctx)
3. {
4. ......
5. //exist请求头
6. new
7. h.setType(ZooDefs.OpCode.exists);
8. //请求体
9. new
10. request.setPath(serverPath);
11. null);
12. new
13. //添加到发送队列
14. new
15. clientPath, serverPath, ctx, wcb);
16. }
添加过程
1. Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
2. Record response, AsyncCallback cb, String clientPath,
3. String serverPath, Object ctx, WatchRegistration watchRegistration)
4. {
5. null;
6.
7. // Note that we do not generate the Xid for the packet yet. It is
8. // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
9. // where the packet is actually sent.
10. synchronized
11. //转换成Packet
12. new
13. packet.cb = cb;
14. packet.ctx = ctx;
15. packet.clientPath = clientPath;
16. packet.serverPath = serverPath;
17. if
18. conLossPacket(packet);
19. else
20. // If the client is asking to close the session then
21. // mark as closing
22. if
23. true;
24. }
25. //添加到发送队列
26. outgoingQueue.add(packet);
27. }
28. }
29. //唤醒下Selector,快点处理
30. sendThread.getClientCnxnSocket().wakeupCnxn();
31. return
32. }
接下来还是回到SendThread的发送过程,之前Session建立时已经分析过,这里有一点要注意下:
1. //重要的业务请求,需要设置事务id
2. if ((p.requestHeader != null) &&
3. (p.requestHeader.getType() != OpCode.ping) &&
4. (p.requestHeader.getType() != OpCode.auth)) {
5. p.requestHeader.setXid(cnxn.getXid());
6. }
接下来Server端IO线程拿到请求,处理,过程和之前session建立时一样,就不赘述了。变化的是后续的处理链。
PrepRequestProcessor预处理
1. //All the rest don't need to create a Txn - just verify session
2. /读请求,不需要创建事务,只是检查了下session是否还在,此时事务头和事务体都是null
3. case
4. case
5. case
6. case
7. case
8. case
9. case
10. case
11. zks.sessionTracker.checkSession(request.sessionId,
12. request.getOwner());
13. break;
SyncRequestProcessor处理逻辑之前已经分析过了,这里就挑重点说一下
1. //试图将其写log,由于ExistsRequest并不是一个事务型请求,所以这里直接返回false,也就是说ExistsRequest不会被记录到log文件中
2. zks.getZKDatabase().append(si)
1. case
2. "EXIS";
3. // TODO we need to figure out the security requirement for this!
4. new
5. //反序列化
6. ByteBufferInputStream.byteBuffer2Record(request.request,
7. existsRequest);
8. String path = existsRequest.getPath();
9. if (path.indexOf('\0') != -1) {
10. throw new
11. }
12. //拿对应node的状态,并设置是否watch
13. Stat stat = zks.getZKDatabase().statNode(path, existsRequest
14. null);
15. //结果
16. new
17. break;
18. }
19. ......
20. //当前处理zxid
21. long
22. //构造相应头
23. ReplyHeader hdr =
24. new
25.
26. zks.serverStats().updateLatency(request.createTime);
27. cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
28. request.createTime, System.currentTimeMillis());
29.
30. try
31. //写回响应
32. "response");
statNode过程
1. //此处的watcher就是对应ServerCnxn,代表是哪个client,server端需要notify的时候,直接往对应ServerCnxn写数据即可
2. public
3. throws
4. new
5. DataNode n = nodes.get(path);
6. if (watcher != null) {
7. //对于exists请求,需要监听data变化事件,添加watcher
8. dataWatches.addWatch(path, watcher);
9. }
10. if (n == null) {
11. throw new
12. }
13. synchronized
14. n.copyStat(stat);
15. return
16. }
17. }
好了,以上server端就完成了ExistsRequest的处理了。接下来client端SendThread收到ExistsResponse进行处理
1. if
2. int
3. ......
4. else if
5. readConnectResult();
6. enableRead();
7. if
8. null) {
9. // Since SASL authentication has completed (if client is configured to do so),
10. // outgoing packets waiting in the outgoingQueue can now be sent.
11. enableWrite();
12. }
13. lenBuffer.clear();
14. incomingBuffer = lenBuffer;
15. updateLastHeard();
16. true;
17. }
18. //此时client session已经建立,init完成
19. else
20. sendThread.readResponse(incomingBuffer);
21. lenBuffer.clear();
22. incomingBuffer = lenBuffer;
23. updateLastHeard();
24. }
25. }
26. }
具体读取:
1. void readResponse(ByteBuffer incomingBuffer) throws
2. new
3. incomingBuffer);
4. BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
5. new
6. //先读响应头,先对特殊的xid进行处理
7. "header");
8. ......
9. //从头拿之前入队的ExistsRequest请求Packet,由于client和server都是单线程处理,多队列处理,所以认为全局有序
10. Packet packet;
11. synchronized
12. if (pendingQueue.size() == 0) {
13. throw new IOException("Nothing in the queue, but got "
14. + replyHdr.getXid());
15. }
16. packet = pendingQueue.remove();
17. }
18. /*
19. * Since requests are processed in order, we better get a response
20. * to the first request!
21. */
22. try
23. //检查一下是否一样
24. if
25. packet.replyHeader.setErr(
26. KeeperException.Code.CONNECTIONLOSS.intValue());
27. throw new IOException("Xid out of order. Got Xid "
28. " with err "
29. + replyHdr.getErr() +
30. " expected Xid "
31. + packet.requestHeader.getXid()
32. " for a packet with details: "
33. + packet );
34. }
35.
36. packet.replyHeader.setXid(replyHdr.getXid());
37. packet.replyHeader.setErr(replyHdr.getErr());
38. packet.replyHeader.setZxid(replyHdr.getZxid());
39. //更新client端的最新zxid
40. if (replyHdr.getZxid() > 0) {
41. lastZxid = replyHdr.getZxid();
42. }
43. //反序列化响应
44. if (packet.response != null && replyHdr.getErr() == 0) {
45. "response");
46. }
47.
48. if
49. "Reading reply sessionid:0x"
50. ", packet:: "
51. }
52. finally
53. //处理packet,主要是注册watcher,回调,触发事件
54. finishPacket(packet);
55. }
56. }
finishPacket过程
1. private void
2. //注册watcher,如果是exists请求,则注册到dataWatches中
3. if (p.watchRegistration != null) {
4. p.watchRegistration.register(p.replyHeader.getErr());
5. }
6. //如果是同步接口,则唤醒等待的业务线程
7. if (p.cb == null) {
8. synchronized
9. true;
10. p.notifyAll();
11. }
12. }
13. //如果是异步请求,则发送异步事件
14. else
15. true;
16. eventThread.queuePacket(p);
17. }
18. }
EventThread端
1. else if (p.response instanceof
2. instanceof
3. instanceof
4. StatCallback cb = (StatCallback) p.cb;
5. //如果成功,增加node stat的回调参数
6. if (rc == 0) {
7. if (p.response instanceof
8. cb.processResult(rc, clientPath, p.ctx,
9. ((ExistsResponse) p.response)
10. .getStat());
11. else if (p.response instanceof
12. cb.processResult(rc, clientPath, p.ctx,
13. ((SetDataResponse) p.response)
14. .getStat());
15. else if (p.response instanceof
16. cb.processResult(rc, clientPath, p.ctx,
17. ((SetACLResponse) p.response)
18. .getStat());
19. }
20. }
21. //如果响应失败,stat为null
22. else
23. null);
24. }
25. }
在DataMonitor例子中,它本身就是一个StatCallback
1. public void processResult(int
2. boolean
3. switch
4. case
5. true;
6. break;
7. case
8. false;
9. break;
10. case
11. case
12. true;
13. listener.closing(rc);
14. return;
15. default:
16. // Retry errors
17. true, this, null);
18. return;
19. }
Exists过程大致就是上面描述的,主要注意点:
1.客户端Request发送完之后会进入Pending队列,等待响应之后拿出来继续处理
2.同步接口是使用Packet.wait()实现的
3.server端exists操作不是事务型的操作,不会写入log
4.server端的watcher就是一个客户端连接ServerCxcn,代表一个客户端,notify的时候直接往连接里写数据即可