前面几篇文章讲了follower和leader之间如何选举和初始化的,这一篇将以之前描述过的CREATE请求作为例子来描述在集群环境下是如何处理事务的。
关于client和zookeeper server的描述前几篇文章已经涉及了。这里不就不再赘述了。假设client和某一个follower建立了连接,并发送了CREATE请求。在follower端,IO线程拿到请求开始执行处理链,Follower处理链如下
初始化代码:
1. protected void
2. new FinalRequestProcessor(this);
3. new
4. true);
5. commitProcessor.start();
6. new FollowerRequestProcessor(this, commitProcessor);
7. ((FollowerRequestProcessor) firstProcessor).start();
8. new SyncRequestProcessor(this,
9. new
10. syncProcessor.start();
11. }
第一个处理器是FollowerRequestProcessor,处理如下
1. while
2. Request request = queuedRequests.take();
3. if
4. ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
5. 'F', request, "");
6. }
7. if
8. break;
9. }
10. // We want to queue the request to be processed before we submit
11. // the request to the leader so that we are ready to receive
12. // the response
13. //先交给CommitProcessor,最终投票通过后,会通过CommitProcessor的commit方法最终提交事务
14. nextProcessor.processRequest(request);
15.
16. // We now ship the request to the leader. As with all
17. // other quorum operations, sync also follows this code
18. // path, but different from others, we need to keep track
19. // of the sync operations this follower has pending, so we
20. // add it to pendingSyncs.
21. //只有事务请求才转发给leader,进行投票
22. switch
23. case
24. zks.pendingSyncs.add(request);
25. zks.getFollower().request(request);
26. break;
27. case
28. case
29. case
30. case
31. case
32. case
33. case
34. zks.getFollower().request(request);
35. break;
36. }
转发事务请求给leader
1. void request(Request request) throws
2. 反序列化
3. new
4. new
5. oa.writeLong(request.sessionId);
6. oa.writeInt(request.cxid);
7. oa.writeInt(request.type);
8. if (request.request != null) {
9. request.request.rewind();
10. int
11. byte b[] = new byte[len];
12. request.request.get(b);
13. request.request.rewind();
14. oa.write(b);
15. }
16. oa.close();
17. new QuorumPacket(Leader.REQUEST, -1, baos
18. .toByteArray(), request.authInfo);
19. true);
20. }
在CommitProcessor中主要是等待缓存请求,并等待该请求被commit
1. while
2. int
3. //最终的请求处理交给FinalRequestProcessor
4. for (int i = 0; i < len; i++) {
5. nextProcessor.processRequest(toProcess.get(i));
6. }
7. toProcess.clear();
8. synchronized (this) {
9. //如果没有commit请求,则wait,直到commit请求的时候唤醒
10. if ((queuedRequests.size() == 0 || nextPending != null)
11. 0) {
12. wait();
13. continue;
14. }
15. // First check and see if the commit came in for the pending
16. // request
17. //有commit请求,则添加到最终队列,下一轮处理
18. if ((queuedRequests.size() == 0 || nextPending != null)
19. 0) {
20. Request r = committedRequests.remove();
21. /*
22. * We match with nextPending so that we can move to the
23. * next request when it is committed. We also want to
24. * use nextPending because it has the cnxn member set
25. * properly.
26. */
27. //如果是自己的请求,则使用之前的Request,以为之前的Request带client的连接信息,可以写回响应
28. if (nextPending != null
29. && nextPending.sessionId == r.sessionId
30. && nextPending.cxid == r.cxid) {
31. // we want to send our version of the request.
32. // the pointer to the connection in the request
33. nextPending.hdr = r.hdr;
34. nextPending.txn = r.txn;
35. nextPending.zxid = r.zxid;
36. toProcess.add(nextPending);
37. null;
38. }
39. //如果是别人的请求,则使用新的Request,不带连接信息,无法发送响应
40. else
41. // this request came from someone else so just
42. // send the commit packet
43. toProcess.add(r);
44. }
45. }
46. }
47.
48. // We haven't matched the pending requests, so go back to
49. // waiting
50. //有pending请求,但是该请求还未commit,则继续
51. if (nextPending != null) {
52. continue;
53. }
54. //从队列中拿待处理请求
55. synchronized (this) {
56. // Process the next requests in the queuedRequests
57. while (nextPending == null && queuedRequests.size() > 0) {
58. Request request = queuedRequests.remove();
59. switch
60. case
61. case
62. case
63. case
64. case
65. case
66. case
67. nextPending = request;
68. break;
69. case
70. if
71. nextPending = request;
72. else
73. toProcess.add(request);
74. }
75. break;
76. default:
77. toProcess.add(request);
78. }
79. }
80. }
在这个场景中,CREATE请求先到了queuedRequests中,然后nextPending会指向这个请求,但是此时还未commit,所以CommitProcessor会wait,直到该请求投票被通过,然后被commit。
此时leader收到了转发的请求,在LearnerHandler中
1. case
2. //反序列化
3. bb = ByteBuffer.wrap(qp.getData());
4. sessionId = bb.getLong();
5. cxid = bb.getInt();
6. type = bb.getInt();
7. bb = bb.slice();
8. Request si;
9. if(type == OpCode.sync){
10. new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
11. else
12. new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
13. }
14. this);
15. //提交给执行链处理
16. leader.zk.submitRequest(si);
17. break;
Leader端的执行链如下
PrepRequestProcessor在之前的文章已经分析过了,主要是根据请求类型,拼装不同的Request,这里是CreateRequest
接下来ProposalRequestProcessor执行,ProposalRequestProcessor主要是发起投票
1. public void processRequest(Request request) throws
2.
3. ......
4.
5. /* In the following IF-THEN-ELSE block, we process syncs on the leader.
6. * If the sync is coming from a follower, then the follower
7. * handler adds it to syncHandler. Otherwise, if it is a client of
8. * the leader that issued the sync command, then syncHandler won't
9. * contain the handler. In this case, we add it to syncHandler, and
10. * call processRequest on the next processor.
11. */
12.
13. if(request instanceof
14. zks.getLeader().processSync((LearnerSyncRequest)request);
15. else
16. //先交给CommitProcessor处理下,此时还未提交
17. nextProcessor.processRequest(request);
18. if (request.hdr != null) {
19. // We need to sync and get consensus on any transactions
20. try
21. //发起一个投票
22. zks.getLeader().propose(request);
23. catch
24. throw new
25. }
26. //先写日志
27. syncProcessor.processRequest(request);
28. }
29. }
30. }
leader发起投票
1. public Proposal propose(Request request) throws
2. .......
3.
4. new
5. BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
6. try
7. "hdr");
8. if (request.txn != null) {
9. "txn");
10. }
11. baos.close();
12. catch
13. "This really should be impossible", e);
14. }
15. //投票包
16. new
17. null);
18.
19. new
20. p.packet = pp;
21. p.request = request;
22. synchronized (this) {
23. if
24. "Proposing:: "
25. }
26.
27. lastProposed = p.packet.getZxid();
28. //添加到投票箱,后续leader收到选票时会检查这个投票箱里的投票是否满足条件
29. outstandingProposals.put(lastProposed, p);
30. //给每个follower发一个投票包,让他们投票
31. sendPacket(pp);
32. }
33. return
34. }
leader发完投票后,通过SyncRequestProcessor将事务写入日志文件,本地写成功后,投票成功。
SyncRequestProcessor之前文章已经分析过了,主要是将事务顺序写入日志文件。主要看之后的AckRequestProcessor
1. public void
2. QuorumPeer self = leader.self;
3. if(self != null)
4. 本地日志写成功后,认为自己成功了
5. null);
6. else
7. "Null QuorumPeer");
8. }
leader的processAck方法比较关键,之前也有分析,这里再强调下
1. synchronized public void processAck(long sid, long
2. .......
3. //当有选票进来时,先看看是哪个投票的
4. Proposal p = outstandingProposals.get(zxid);
5. if (p == null) {
6. "Trying to commit future proposal: zxid 0x{} from {}",
7. Long.toHexString(zxid), followerAddr);
8. return;
9. }
10. //把票投上
11. p.ackSet.add(sid);
12. if
13. "Count for zxid: 0x{} is {}",
14. Long.toHexString(zxid), p.ackSet.size());
15. }
16. /如果满足投票结束条件,默认是半数server统一,则提交事务
17. if
18. if (zxid != lastCommitted+1) {
19. "Commiting zxid 0x{} from {} not first!",
20. Long.toHexString(zxid), followerAddr);
21. "First is 0x{}", Long.toHexString(lastCommitted + 1));
22. }
23. outstandingProposals.remove(zxid);
24. if (p.request != null) {
25. //先添加到带提交队列
26. toBeApplied.add(p);
27. }
28. // We don't commit the new leader proposal
29. if ((zxid & 0xffffffffL) != 0) {
30. if (p.request == null) {
31. "Going to commmit null request for proposal: {}", p);
32. }
33. //事务提交,通知follower提交事务
34. commit(zxid);
35. //通知Observer
36. inform(p);
37. //leader commit事务
38. zk.commitProcessor.commit(p.request);
39. ......
40. }
41. }
通知follower提交事务
1. public void commit(long
2. synchronized(this){
3. lastCommitted = zxid;
4. }
5. //发送COMMIT包
6. new QuorumPacket(Leader.COMMIT, zxid, null, null);
7. sendPacket(qp);
8. }
此时Follower收到proposal包,follower中处理投票
1. case
2. new
3. Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
4. if (hdr.getZxid() != lastQueued + 1) {
5. "Got zxid 0x"
6. + Long.toHexString(hdr.getZxid())
7. " expected 0x"
8. 1));
9. }
10. lastQueued = hdr.getZxid();
11. //记录事务日志,成功后发送ACK包
12. fzk.logRequest(hdr, txn);
13. break;
1. public void
2. new Request(null, hdr.getClientId(), hdr.getCxid(),
3. null, null);
4. request.hdr = hdr;
5. request.txn = txn;
6. request.zxid = hdr.getZxid();
7. if ((request.zxid & 0xffffffffL) != 0) {
8. pendingTxns.add(request);
9. }
10. 是通过SyncRequestProcessor将事务写入本地文件,再发送ack包
11. syncProcessor.processRequest(request);
12. }
日志写成功后,SendAckRequestProcessor发送ACK包
1. public void
2. if(si.type != OpCode.sync){
3. //ACK包
4. new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
5. null);
6. try
7. //发送
8. false);
9. catch
10. "Closing connection to leader, exception during packet send", e);
11. try
12. if
13. learner.sock.close();
14. }
15. catch
16. // Nothing to do, we are shutting things down, so an exception here is irrelevant
17. "Ignoring error closing the connection", e1);
18. }
19. }
20. }
21. }
此时,leader收到ack包,LearnerHandler线程中
1. case
2. if (this.learnerType == LearnerType.OBSERVER) {
3. if
4. "Received ACK from Observer " + this.sid);
5. }
6. }
7. this.sid, qp.getZxid(), sock.getLocalSocketAddress());
8. break;
还是调用了processAck方法,由于之前已经有了leader自己的投票,此时follower再投一票,3台机器的集群即认为投票成功,leader开始发送commit操作,也就是发送commit包给follower。
follower收到commit包
1. case
2. fzk.commit(qp.getZxid());
3. break;
4.
5. public void commit(long
6. if (pendingTxns.size() == 0) {
7. "Committing "
8. " without seeing txn");
9. return;
10. }
11. long
12. if
13. "Committing zxid 0x"
14. " but next pending txn 0x"
15. + Long.toHexString(firstElementZxid));
16. 12);
17. }
18. //从Pending队列中拿到待commit请求
19. Request request = pendingTxns.remove();
20. //commit这个请求,这个请求将交给FinalRequestProcessor处理
21. commitProcessor.commit(request);
22. }
Commit之后请求将交给FinalRequestProcessor处理,修改最后的内存db结构,如果是本机请求则写回响应,如果不是则不用写回响应