0
点赞
收藏
分享

微信扫一扫

深入浅出Zookeeper之七分布式CREATE事务处理


前面几篇文章讲了follower和leader之间如何选举和初始化的,这一篇将以之前描述过的CREATE请求作为例子来描述在集群环境下是如何处理事务的。

关于client和zookeeper server的描述前几篇文章已经涉及了。这里不就不再赘述了。假设client和某一个follower建立了连接,并发送了CREATE请求。在follower端,IO线程拿到请求开始执行处理链,Follower处理链如下

初始化代码:



1. protected void
2. new FinalRequestProcessor(this);  
3. new
4. true);  
5.     commitProcessor.start();  
6. new FollowerRequestProcessor(this, commitProcessor);  
7.     ((FollowerRequestProcessor) firstProcessor).start();  
8. new SyncRequestProcessor(this,  
9. new
10.     syncProcessor.start();  
11. }


 第一个处理器是FollowerRequestProcessor,处理如下



1. while
2.                 Request request = queuedRequests.take();  
3. if
4.                     ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,  
5. 'F', request, "");  
6.                 }  
7. if
8. break;  
9.                 }  
10. // We want to queue the request to be processed before we submit
11. // the request to the leader so that we are ready to receive
12. // the response
13. //先交给CommitProcessor,最终投票通过后,会通过CommitProcessor的commit方法最终提交事务
14.                 nextProcessor.processRequest(request);  
15.                   
16. // We now ship the request to the leader. As with all
17. // other quorum operations, sync also follows this code
18. // path, but different from others, we need to keep track
19. // of the sync operations this follower has pending, so we
20. // add it to pendingSyncs.
21. //只有事务请求才转发给leader,进行投票
22. switch
23. case
24.                     zks.pendingSyncs.add(request);  
25.                     zks.getFollower().request(request);  
26. break;  
27. case
28. case
29. case
30. case
31. case
32. case
33. case
34.                     zks.getFollower().request(request);  
35. break;  
36.                 }



 转发事务请求给leader



    1. void request(Request request) throws
    2. 反序列化  
    3. new
    4. new
    5.      oa.writeLong(request.sessionId);  
    6.      oa.writeInt(request.cxid);  
    7.      oa.writeInt(request.type);  
    8. if (request.request != null) {  
    9.          request.request.rewind();  
    10. int
    11. byte b[] = new byte[len];  
    12.          request.request.get(b);  
    13.          request.request.rewind();  
    14.          oa.write(b);  
    15.      }  
    16.      oa.close();  
    17. new QuorumPacket(Leader.REQUEST, -1, baos  
    18.              .toByteArray(), request.authInfo);  
    19. true);  
    20.  }



     在CommitProcessor中主要是等待缓存请求,并等待该请求被commit


    1. while
    2. int
    3. //最终的请求处理交给FinalRequestProcessor
    4. for (int i = 0; i < len; i++) {  
    5.                    nextProcessor.processRequest(toProcess.get(i));  
    6.                }  
    7.                toProcess.clear();  
    8. synchronized (this) {  
    9. //如果没有commit请求,则wait,直到commit请求的时候唤醒
    10. if ((queuedRequests.size() == 0 || nextPending != null)  
    11. 0) {  
    12.                        wait();  
    13. continue;  
    14.                    }  
    15. // First check and see if the commit came in for the pending
    16. // request
    17. //有commit请求,则添加到最终队列,下一轮处理
    18. if ((queuedRequests.size() == 0 || nextPending != null)  
    19. 0) {  
    20.                        Request r = committedRequests.remove();  
    21. /*
    22.                         * We match with nextPending so that we can move to the
    23.                         * next request when it is committed. We also want to
    24.                         * use nextPending because it has the cnxn member set
    25.                         * properly.
    26.                         */
    27. //如果是自己的请求,则使用之前的Request,以为之前的Request带client的连接信息,可以写回响应
    28. if (nextPending != null
    29.                                && nextPending.sessionId == r.sessionId  
    30.                                && nextPending.cxid == r.cxid) {  
    31. // we want to send our version of the request.
    32. // the pointer to the connection in the request
    33.                            nextPending.hdr = r.hdr;  
    34.                            nextPending.txn = r.txn;  
    35.                            nextPending.zxid = r.zxid;  
    36.                            toProcess.add(nextPending);  
    37. null;  
    38.                        }  
    39. //如果是别人的请求,则使用新的Request,不带连接信息,无法发送响应
    40. else
    41. // this request came from someone else so just
    42. // send the commit packet
    43.                            toProcess.add(r);  
    44.                        }  
    45.                    }  
    46.                }  
    47.   
    48. // We haven't matched the pending requests, so go back to
    49. // waiting
    50. //有pending请求,但是该请求还未commit,则继续
    51. if (nextPending != null) {  
    52. continue;  
    53.                }  
    54. //从队列中拿待处理请求
    55. synchronized (this) {  
    56. // Process the next requests in the queuedRequests
    57. while (nextPending == null && queuedRequests.size() > 0) {  
    58.                        Request request = queuedRequests.remove();  
    59. switch
    60. case
    61. case
    62. case
    63. case
    64. case
    65. case
    66. case
    67.                            nextPending = request;  
    68. break;  
    69. case
    70. if
    71.                                nextPending = request;  
    72. else
    73.                                toProcess.add(request);  
    74.                            }  
    75. break;  
    76. default:  
    77.                            toProcess.add(request);  
    78.                        }  
    79.                    }  
    80.                }

     在这个场景中,CREATE请求先到了queuedRequests中,然后nextPending会指向这个请求,但是此时还未commit,所以CommitProcessor会wait,直到该请求投票被通过,然后被commit。

    此时leader收到了转发的请求,在LearnerHandler中




      1. case
      2. //反序列化
      3.                     bb = ByteBuffer.wrap(qp.getData());  
      4.                     sessionId = bb.getLong();  
      5.                     cxid = bb.getInt();  
      6.                     type = bb.getInt();  
      7.                     bb = bb.slice();  
      8.                     Request si;  
      9. if(type == OpCode.sync){  
      10. new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());  
      11. else
      12. new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());  
      13.                     }  
      14. this);  
      15. //提交给执行链处理
      16.                     leader.zk.submitRequest(si);  
      17. break;



       Leader端的执行链如下

      PrepRequestProcessor在之前的文章已经分析过了,主要是根据请求类型,拼装不同的Request,这里是CreateRequest

      接下来ProposalRequestProcessor执行,ProposalRequestProcessor主要是发起投票



      1. public void processRequest(Request request) throws
      2.   
      3.     ......                  
      4.           
      5. /* In the following IF-THEN-ELSE block, we process syncs on the leader. 
      6.          * If the sync is coming from a follower, then the follower
      7.          * handler adds it to syncHandler. Otherwise, if it is a client of
      8.          * the leader that issued the sync command, then syncHandler won't 
      9.          * contain the handler. In this case, we add it to syncHandler, and 
      10.          * call processRequest on the next processor.
      11.          */
      12.           
      13. if(request instanceof
      14.             zks.getLeader().processSync((LearnerSyncRequest)request);  
      15. else
      16. //先交给CommitProcessor处理下,此时还未提交
      17.                 nextProcessor.processRequest(request);  
      18. if (request.hdr != null) {  
      19. // We need to sync and get consensus on any transactions
      20. try
      21. //发起一个投票
      22.                     zks.getLeader().propose(request);  
      23. catch
      24. throw new
      25.                 }  
      26. //先写日志
      27.                 syncProcessor.processRequest(request);  
      28.             }  
      29.         }  
      30.     }


       leader发起投票


      1. public Proposal propose(Request request) throws
      2.        .......  
      3.   
      4. new
      5.        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);  
      6. try
      7. "hdr");  
      8. if (request.txn != null) {  
      9. "txn");  
      10.            }  
      11.            baos.close();  
      12. catch
      13. "This really should be impossible", e);  
      14.        }  
      15. //投票包
      16. new
      17. null);  
      18.          
      19. new
      20.        p.packet = pp;  
      21.        p.request = request;  
      22. synchronized (this) {  
      23. if
      24. "Proposing:: "
      25.            }  
      26.   
      27.            lastProposed = p.packet.getZxid();  
      28. //添加到投票箱,后续leader收到选票时会检查这个投票箱里的投票是否满足条件
      29.            outstandingProposals.put(lastProposed, p);  
      30. //给每个follower发一个投票包,让他们投票
      31.            sendPacket(pp);  
      32.        }  
      33. return
      34.    }


       leader发完投票后,通过SyncRequestProcessor将事务写入日志文件,本地写成功后,投票成功。

      SyncRequestProcessor之前文章已经分析过了,主要是将事务顺序写入日志文件。主要看之后的AckRequestProcessor


      1. public void
      2.     QuorumPeer self = leader.self;  
      3. if(self != null)  
      4. 本地日志写成功后,认为自己成功了  
      5. null);  
      6. else
      7. "Null QuorumPeer");  
      8. }


       leader的processAck方法比较关键,之前也有分析,这里再强调下




      1. synchronized public void processAck(long sid, long
      2.      .......  
      3. //当有选票进来时,先看看是哪个投票的
      4.       Proposal p = outstandingProposals.get(zxid);  
      5. if (p == null) {  
      6. "Trying to commit future proposal: zxid 0x{} from {}",  
      7.                   Long.toHexString(zxid), followerAddr);  
      8. return;  
      9.       }  
      10. //把票投上
      11.       p.ackSet.add(sid);  
      12. if
      13. "Count for zxid: 0x{} is {}",  
      14.                   Long.toHexString(zxid), p.ackSet.size());  
      15.       }  
      16. /如果满足投票结束条件,默认是半数server统一,则提交事务  
      17. if
      18. if (zxid != lastCommitted+1) {  
      19. "Commiting zxid 0x{} from {} not first!",  
      20.                       Long.toHexString(zxid), followerAddr);  
      21. "First is 0x{}", Long.toHexString(lastCommitted + 1));  
      22.           }  
      23.           outstandingProposals.remove(zxid);  
      24. if (p.request != null) {  
      25. //先添加到带提交队列
      26.               toBeApplied.add(p);  
      27.           }  
      28. // We don't commit the new leader proposal
      29. if ((zxid & 0xffffffffL) != 0) {  
      30. if (p.request == null) {  
      31. "Going to commmit null request for proposal: {}", p);  
      32.               }  
      33. //事务提交,通知follower提交事务
      34.               commit(zxid);  
      35. //通知Observer
      36.               inform(p);  
      37. //leader commit事务
      38. zk.commitProcessor.commit(p.request);  
      39.           ......  
      40.       }  
      41.   }


       通知follower提交事务




      1. public void commit(long
      2. synchronized(this){  
      3.            lastCommitted = zxid;  
      4.        }  
      5. //发送COMMIT包
      6. new QuorumPacket(Leader.COMMIT, zxid, null, null);  
      7.        sendPacket(qp);  
      8.    }


       此时Follower收到proposal包,follower中处理投票



      1. case
      2. new
      3.             Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);  
      4. if (hdr.getZxid() != lastQueued + 1) {  
      5. "Got zxid 0x"
      6.                         + Long.toHexString(hdr.getZxid())  
      7. " expected 0x"
      8. 1));  
      9.             }  
      10.             lastQueued = hdr.getZxid();  
      11. //记录事务日志,成功后发送ACK包
      12.             fzk.logRequest(hdr, txn);  
      13. break;


       



      1. public void
      2. new Request(null, hdr.getClientId(), hdr.getCxid(),  
      3. null, null);  
      4.     request.hdr = hdr;  
      5.     request.txn = txn;  
      6.     request.zxid = hdr.getZxid();  
      7. if ((request.zxid & 0xffffffffL) != 0) {  
      8.         pendingTxns.add(request);  
      9.     }  
      10. 是通过SyncRequestProcessor将事务写入本地文件,再发送ack包  
      11.     syncProcessor.processRequest(request);  
      12. }


       日志写成功后,SendAckRequestProcessor发送ACK包



      1. public void
      2. if(si.type != OpCode.sync){  
      3. //ACK包
      4. new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,  
      5. null);  
      6. try
      7. //发送
      8. false);  
      9. catch
      10. "Closing connection to leader, exception during packet send", e);  
      11. try
      12. if
      13.                       learner.sock.close();  
      14.                   }  
      15. catch
      16. // Nothing to do, we are shutting things down, so an exception here is irrelevant
      17. "Ignoring error closing the connection", e1);  
      18.               }  
      19.           }  
      20.       }  
      21.   }


       此时,leader收到ack包,LearnerHandler线程中


      1. case
      2. if (this.learnerType == LearnerType.OBSERVER) {  
      3. if
      4. "Received ACK from Observer  " + this.sid);  
      5.                      }  
      6.                  }  
      7. this.sid, qp.getZxid(), sock.getLocalSocketAddress());  
      8. break;


       还是调用了processAck方法,由于之前已经有了leader自己的投票,此时follower再投一票,3台机器的集群即认为投票成功,leader开始发送commit操作,也就是发送commit包给follower。

      follower收到commit包




        1. case
        2.             fzk.commit(qp.getZxid());  
        3. break;  
        4.   
        5. public void commit(long
        6. if (pendingTxns.size() == 0) {  
        7. "Committing "
        8. " without seeing txn");  
        9. return;  
        10.         }  
        11. long
        12. if
        13. "Committing zxid 0x"
        14. " but next pending txn 0x"
        15.                     + Long.toHexString(firstElementZxid));  
        16. 12);  
        17.         }  
        18. //从Pending队列中拿到待commit请求
        19.         Request request = pendingTxns.remove();  
        20. //commit这个请求,这个请求将交给FinalRequestProcessor处理
        21.         commitProcessor.commit(request);  
        22.     }


         Commit之后请求将交给FinalRequestProcessor处理,修改最后的内存db结构,如果是本机请求则写回响应,如果不是则不用写回响应

        举报

        相关推荐

        0 条评论