上周组内周会大家讨论到了 MySQL 的主从同步。正好借着这个议题,开启一个新的系列,探讨一下分布式环境中集群数据同步的设计,如 Redis、ZooKeeper、RocketMQ、Eurake、MySQL 等。
我个人理解,其实数据同步主要就关心这么几点:
- 谁接收数据的更新?
- 数据同步触发时机(是我收到了数据更新后主动发给你,还是你定时过来问我?push?pull?)
- 怎么样算这次数据更新同步完成了
开源中间件都很复杂,抓大放小,所以仅分析这三点在各个中间件中的实现设计。
本文探讨的是 ZooKeeper。先说明一个基本概念,ZK 并不是传统意义上的 Master/Slave 模式,而是引入了 Leader、Follower 和 Observer 三种角色概念。
谁接收数据的更新
这个问题其实很好理解,基于 ZK 的集群模式,很容易理解是 Leader “去”更新数据。
注意这里是用“去”在描述,那么 Leader 和 Follower 是不是都可以接收更新请求呢?其实这个问题不用纠结,我们可以想一下,我们在使用客户端脚本去连接 ZK 的时候,是不是无论你连的是哪个角色的机器,你都可以正常的进行增删改操作。
> zkCli.sh -server 127.0.0.1:2181
所以, Leader 和 Follower 都可以接收更新请求。
再想一个问题,如果 Follower 接收到了更新请求会怎么做呢?这个就是本部分探讨的重点。
还是之前一直表达的观点,抓大放小。ZK 通信相关的数据处理可以直接看 ZooKeeperServer
,他有好几个子类:
直接看 org.apache.zookeeper.server.ZooKeeperServer#processPacket
方法:
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if(ap != null) {
try {
authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
} catch(RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn!= KeeperException.Code.OK) {
if (ap == null) {
LOG.warn("No authentication provider for scheme: "
+ scheme + " has "
+ ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: " + scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication succeeded for scheme: "
+ scheme);
}
LOG.info("auth success " + cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
}
return;
} else {
if (h.getType() == OpCode.sasl) {
Record rsp = processSasl(incomingBuffer,cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
return;
}
else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
//走到这里
submitRequest(si);
}
}
cnxn.incrOutstandingRequests(h);
}
这个方法会基于 OpCode
判断很多操作,增删改操作会再调用 org.apache.zookeeper.server.ZooKeeperServer#submitRequest(org.apache.zookeeper.server.Request)
方法,因为要研究的是 Follower,所以再看 org.apache.zookeeper.server.quorum.FollowerRequestProcessor#processRequest
方法:
public void processRequest(Request request) {
if (!finished) {
queuedRequests.add(request);
}
}
这里的 queuedRequests
会被 org.apache.zookeeper.server.quorum.FollowerRequestProcessor#run
方法处理:
@Override
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
//增删改等操作走到了这里
zks.getFollower().request(request);
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("FollowerRequestProcessor exited loop!");
}
会调用 org.apache.zookeeper.server.quorum.Learner#request
方法:
/**
* send a request packet to the leader
*
* @param request
* the request from the client
* @throws IOException
*/
void request(Request request) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream oa = new DataOutputStream(baos);
oa.writeLong(request.sessionId);
oa.writeInt(request.cxid);
oa.writeInt(request.type);
if (request.request != null) {
request.request.rewind();
int len = request.request.remaining();
byte b[] = new byte[len];
request.request.get(b);
request.request.rewind();
oa.write(b);
}
oa.close();
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
.toByteArray(), request.authInfo);
writePacket(qp, true);
}
看注释就可以明白,把包发给 Leader。org.apache.zookeeper.server.quorum.Learner#writePacket
方法:
/**
* write a packet to the leader
*
* @param pp
* the proposal packet to be sent to the leader
* @throws IOException
*/
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
synchronized (leaderOs) {
if (pp != null) {
leaderOs.writeRecord(pp, "packet");
}
if (flush) {
bufferedOutput.flush();
}
}
}
综上来看,Leader 和 Follower 都可以接受数据更新请求,Follower 接收到更新请求后会转发给 Leader 处理。
数据同步触发时机
前面已经提到了,数据更新的操作本质是 Leader 在做。本部分要探讨的点是,数据更新后,是由 Leader 推送最新数据,还是 Follower 向 Leader 拉取最新数据呢。
可以以 Leader 更新数据的处理流程作为切入点来看。
这里有个细节,ZK 的处理是一个链式的设计,可以查看 org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors
方法,它会初始化一些处理器:
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied);
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false);
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
这个方法写的非常简单粗暴,但是一目了然,并没有用那些花里胡哨的设计让研究者晕头转向。很直白的就能看出请求处理有哪些处理器以及对应的顺序。
这里看一个比较重要的处理器 ProposalRequestProcessor
。看名称就知道可能跟集群数据一致性相关。先看 org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
方法:
public void processRequest(Request request) throws RequestProcessorException {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop");
/* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/
if(request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
//结合上面的代码,这里的 nextProcessor 是 CommitProcessor
nextProcessor.processRequest(request);
if (request.hdr != null) {
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
syncProcessor.processRequest(request);
}
}
}
可以看到 ProposalRequestProcessor
会先去调用 org.apache.zookeeper.server.quorum.CommitProcessor#processRequest
方法:
synchronized public void processRequest(Request request) {
// request.addRQRec(">commit");
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
if (!finished) {
queuedRequests.add(request);
notifyAll();
}
}
可以看到请求是放到了 queuedRequests
中,而 queuedRequests
又是由 org.apache.zookeeper.server.quorum.CommitProcessor#run
方法在异步处理,这个方法逻辑还是比较繁琐的,但说白了就是把请求放到了一个队列里面去,直到 Leader 收到了“大多数”或者全部的 Follwer 对 Proposal 的 ACK 后提交。
再回过头接着看 org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
方法,可以发现紧接着又调用了 org.apache.zookeeper.server.quorum.Leader#propose
方法:
/**
* create a proposal and send it out to all the members
*
* @param request
* @return the proposal that is queued to send to all the members
*/
public Proposal propose(Request request) throws XidRolloverException {
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
*/
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg =
"zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
request.hdr.serialize(boa, "hdr");
if (request.txn != null) {
request.txn.serialize(boa, "txn");
}
baos.close();
} catch (IOException e) {
LOG.warn("This really should be impossible", e);
}
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) {
if (LOG.isDebugEnabled()) {
LOG.debug("Proposing:: " + request);
}
lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
sendPacket(pp);
}
return p;
}
再看 org.apache.zookeeper.server.quorum.Leader#sendPacket
方法:
/**
* send a packet to all the followers ready to follow
*
* @param qp
* the packet to be sent
*/
void sendPacket(QuorumPacket qp) {
synchronized (forwardingFollowers) {
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
}
}
到这里就很清晰了,就是循环所有的 Follower,由 LearnerHandler
进行处理(注意,在 ZK 源码中 Learner 其实可以理解为 Follower),org.apache.zookeeper.server.quorum.LearnerHandler#queuePacket
:
void queuePacket(QuorumPacket p) {
queuedPackets.add(p);
}
还是一样的套路,丢到一个队列里面异步处理。
到这里就可以知道,Leader 收到了数据更新请求后,会循环主动通知 Follower 进行数据更新,当然中间还涉及到分布式数据一致性、事务的设计,由于与本次议题无关,就不纠结了。
怎么样算这次数据更新同步完成了
该部分偷个懒,可以参看ZK 中的“大多数”到底是啥意思?。这里直接摘录一下结论:
过半写的时候,只需要一半的 Follower 有 Ack 就可以继续后面的全局 Commit 了。三节点 ZK 集群,Leader 只需要等一台 Follower 的 Ack;四节点的 ZK 集群 Leader 需要等两台 Follower 的 Ack。
References
- https://mp.weixin.qq.com/s?__biz=MzU1OTgyMDc3Mg==&mid=2247484592&idx=1&sn=d28061696f356eb4f6afaba6ef4766e2&chksm=fc103e3bcb67b72de32a78330d16d3703af1efcb847538789d4b51263b8a5815f8ab64b3c02a&token=1958001505&lang=zh_CN#rd
\