0
点赞
收藏
分享

微信扫一扫

深入浅出Zookeeper之三Exists请求和处理


前一篇介绍了zookeeper的client和server之间session是如何建立的。在DataMonitor的例子中,DataMonitor通过exists异步接口和server端交互,本文将介绍exists操作是如何完成。

dataMonitor开始exist操作

 



1. public void exists(final
2.            StatCallback cb, Object ctx)  
3.    {  
4.         ......  
5. //exist请求头
6. new
7.        h.setType(ZooDefs.OpCode.exists);  
8. //请求体
9. new
10.        request.setPath(serverPath);  
11. null);  
12. new
13. //添加到发送队列
14. new
15.                clientPath, serverPath, ctx, wcb);  
16.    }


 

 添加过程

 


1. Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,  
2.             Record response, AsyncCallback cb, String clientPath,  
3.             String serverPath, Object ctx, WatchRegistration watchRegistration)  
4.     {  
5. null;  
6.   
7. // Note that we do not generate the Xid for the packet yet. It is
8. // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
9. // where the packet is actually sent.
10. synchronized
11. //转换成Packet
12. new
13.             packet.cb = cb;  
14.             packet.ctx = ctx;  
15.             packet.clientPath = clientPath;  
16.             packet.serverPath = serverPath;  
17. if
18.                 conLossPacket(packet);  
19. else
20. // If the client is asking to close the session then
21. // mark as closing
22. if
23. true;  
24.                 }  
25. //添加到发送队列
26.                 outgoingQueue.add(packet);  
27.             }  
28.         }  
29. //唤醒下Selector,快点处理
30.         sendThread.getClientCnxnSocket().wakeupCnxn();  
31. return
32.     }



 

 接下来还是回到SendThread的发送过程,之前Session建立时已经分析过,这里有一点要注意下:

 


1. //重要的业务请求,需要设置事务id
2. if ((p.requestHeader != null) &&  
3.                                 (p.requestHeader.getType() != OpCode.ping) &&  
4.                                 (p.requestHeader.getType() != OpCode.auth)) {  
5.                             p.requestHeader.setXid(cnxn.getXid());  
6.                         }



 接下来Server端IO线程拿到请求,处理,过程和之前session建立时一样,就不赘述了。变化的是后续的处理链。



PrepRequestProcessor预处理


1. //All the rest don't need to create a Txn - just verify session
2. /读请求,不需要创建事务,只是检查了下session是否还在,此时事务头和事务体都是null
3. case
4. case
5. case
6. case
7. case
8. case
9. case
10. case
11.                zks.sessionTracker.checkSession(request.sessionId,  
12.                        request.getOwner());  
13. break;

 SyncRequestProcessor处理逻辑之前已经分析过了,这里就挑重点说一下



1. //试图将其写log,由于ExistsRequest并不是一个事务型请求,所以这里直接返回false,也就是说ExistsRequest不会被记录到log文件中
2. zks.getZKDatabase().append(si)



 


1. case
2. "EXIS";  
3. // TODO we need to figure out the security requirement for this!
4. new
5. //反序列化
6.                 ByteBufferInputStream.byteBuffer2Record(request.request,  
7.                         existsRequest);  
8.                 String path = existsRequest.getPath();  
9. if (path.indexOf('\0') != -1) {  
10. throw new
11.                 }  
12. //拿对应node的状态,并设置是否watch
13.                 Stat stat = zks.getZKDatabase().statNode(path, existsRequest  
14. null);  
15. //结果
16. new
17. break;  
18.             }  
19. ......  
20. //当前处理zxid
21. long
22. //构造相应头
23.         ReplyHeader hdr =  
24. new
25.   
26.         zks.serverStats().updateLatency(request.createTime);  
27.         cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,  
28.                     request.createTime, System.currentTimeMillis());  
29.   
30. try
31. //写回响应
32. "response");


 

 statNode过程

 



1. //此处的watcher就是对应ServerCnxn,代表是哪个client,server端需要notify的时候,直接往对应ServerCnxn写数据即可
2. public
3. throws
4. new
5.         DataNode n = nodes.get(path);  
6. if (watcher != null) {  
7. //对于exists请求,需要监听data变化事件,添加watcher
8.             dataWatches.addWatch(path, watcher);  
9.         }  
10. if (n == null) {  
11. throw new
12.         }  
13. synchronized
14.             n.copyStat(stat);  
15. return
16.         }  
17.     }

 

好了,以上server端就完成了ExistsRequest的处理了。接下来client端SendThread收到ExistsResponse进行处理



1. if
2. int
3. ......              
4. else if
5.                   readConnectResult();  
6.                   enableRead();  
7. if
8. null) {  
9. // Since SASL authentication has completed (if client is configured to do so),
10. // outgoing packets waiting in the outgoingQueue can now be sent.
11.                       enableWrite();  
12.                   }  
13.                   lenBuffer.clear();  
14.                   incomingBuffer = lenBuffer;  
15.                   updateLastHeard();  
16. true;  
17.               }   
18. //此时client session已经建立,init完成       
19. else
20.                   sendThread.readResponse(incomingBuffer);  
21.                   lenBuffer.clear();  
22.                   incomingBuffer = lenBuffer;  
23.                   updateLastHeard();  
24.               }  
25.           }  
26.       }


  具体读取:

 



1. void readResponse(ByteBuffer incomingBuffer) throws
2. new
3.                     incomingBuffer);  
4.             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);  
5. new
6. //先读响应头,先对特殊的xid进行处理
7. "header");  
8.             ......  
9. //从头拿之前入队的ExistsRequest请求Packet,由于client和server都是单线程处理,多队列处理,所以认为全局有序
10.             Packet packet;  
11. synchronized
12. if (pendingQueue.size() == 0) {  
13. throw new IOException("Nothing in the queue, but got "
14.                             + replyHdr.getXid());  
15.                 }  
16.                 packet = pendingQueue.remove();  
17.             }  
18. /*
19.              * Since requests are processed in order, we better get a response
20.              * to the first request!
21.              */
22. try
23. //检查一下是否一样
24. if
25.                     packet.replyHeader.setErr(  
26.                             KeeperException.Code.CONNECTIONLOSS.intValue());  
27. throw new IOException("Xid out of order. Got Xid "
28. " with err "
29.                             + replyHdr.getErr() +  
30. " expected Xid "
31.                             + packet.requestHeader.getXid()  
32. " for a packet with details: "
33.                             + packet );  
34.                 }  
35.   
36.                 packet.replyHeader.setXid(replyHdr.getXid());  
37.                 packet.replyHeader.setErr(replyHdr.getErr());  
38.                 packet.replyHeader.setZxid(replyHdr.getZxid());  
39. //更新client端的最新zxid
40. if (replyHdr.getZxid() > 0) {  
41.                     lastZxid = replyHdr.getZxid();  
42.                 }  
43. //反序列化响应
44. if (packet.response != null && replyHdr.getErr() == 0) {  
45. "response");  
46.                 }  
47.   
48. if
49. "Reading reply sessionid:0x"
50. ", packet:: "
51.                 }  
52. finally
53. //处理packet,主要是注册watcher,回调,触发事件
54.                 finishPacket(packet);  
55.             }  
56.         }



  finishPacket过程

 



1. private void
2. //注册watcher,如果是exists请求,则注册到dataWatches中
3. if (p.watchRegistration != null) {  
4.            p.watchRegistration.register(p.replyHeader.getErr());  
5.        }  
6. //如果是同步接口,则唤醒等待的业务线程
7. if (p.cb == null) {  
8. synchronized
9. true;  
10.                p.notifyAll();  
11.            }  
12.        }   
13. //如果是异步请求,则发送异步事件
14. else
15. true;  
16.            eventThread.queuePacket(p);  
17.        }  
18.    }


 EventThread端

 



深入浅出Zookeeper之三Exists请求和处理_hive


1. else if (p.response instanceof
2. instanceof
3. instanceof
4.                       StatCallback cb = (StatCallback) p.cb;  
5. //如果成功,增加node stat的回调参数
6. if (rc == 0) {  
7. if (p.response instanceof
8.                               cb.processResult(rc, clientPath, p.ctx,  
9.                                       ((ExistsResponse) p.response)  
10.                                               .getStat());  
11. else if (p.response instanceof
12.                               cb.processResult(rc, clientPath, p.ctx,  
13.                                       ((SetDataResponse) p.response)  
14.                                               .getStat());  
15. else if (p.response instanceof
16.                               cb.processResult(rc, clientPath, p.ctx,  
17.                                       ((SetACLResponse) p.response)  
18.                                               .getStat());  
19.                           }  
20.                       }   
21. //如果响应失败,stat为null
22. else
23. null);  
24.                       }  
25.                   }


在DataMonitor例子中,它本身就是一个StatCallback



深入浅出Zookeeper之三Exists请求和处理_hive


1. public void processResult(int
2. boolean
3. switch
4. case
5. true;  
6. break;  
7. case
8. false;  
9. break;  
10. case
11. case
12. true;  
13.             listener.closing(rc);  
14. return;  
15. default:  
16. // Retry errors
17. true, this, null);  
18. return;  
19.         }


 Exists过程大致就是上面描述的,主要注意点:

1.客户端Request发送完之后会进入Pending队列,等待响应之后拿出来继续处理

2.同步接口是使用Packet.wait()实现的

3.server端exists操作不是事务型的操作,不会写入log

4.server端的watcher就是一个客户端连接ServerCxcn,代表一个客户端,notify的时候直接往连接里写数据即可


 

举报

相关推荐

0 条评论