0
点赞
收藏
分享

微信扫一扫

2021-06-28 zookeeper的防止脑裂组件zkfc

流计算Alink 2022-09-16 阅读 33


摘要:

说明zkfc的思想, 评估其设计的优劣

zkfc说明:

全称是ZKFailoverController, hadoop中使用zk的客户端的封装, 所以得从hadoop项目里找zkfc

​​https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html​​

The ZKFailoverController (ZKFC) is a new component which is a ZooKeeper client which also monitors and manages the state of the NameNode. Each of the machines which runs a NameNode also runs a ZKFC, and that ZKFC is responsible for:

  • Health monitoring - the ZKFC pings its local NameNode on a periodic basis with a health-check command. So long as the NameNode responds in a timely fashion with a healthy status, the ZKFC considers the node healthy. If the node has crashed, frozen, or otherwise entered an unhealthy state, the health monitor will mark it as unhealthy.
  • ZooKeeper session management - when the local NameNode is healthy, the ZKFC holds a session open in ZooKeeper. If the local NameNode is active, it also holds a special “lock” znode. This lock uses ZooKeeper’s support for “ephemeral” nodes; if the session expires, the lock node will be automatically deleted.
  • ZooKeeper-based election - if the local NameNode is healthy, and the ZKFC sees that no other node currently holds the lock znode, it will itself try to acquire the lock. If it succeeds, then it has “won the election”, and is responsible for running a failover to make its local NameNode active. The failover process is similar to the manual failover described above: first, the previous active is fenced if necessary, and then the local NameNode transitions to active state.

For more details on the design of automatic failover, refer to the design document attached to HDFS-2185 on the Apache HDFS JIRA.

文章参考:

​​https://issues.apache.org/jira/browse/HDFS-3042​​

​​https://issues.apache.org/jira/browse/HDFS-2185​​

​​https://issues.apache.org/jira/secure/attachment/12521279/zkfc-design.pdf​​

2021-06-28 zookeeper的防止脑裂组件zkfc_java

代码:

​​https://github.com/c9n/hadoop/tree/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha​​

主要关注一下三个模块:

​​https://github.com/c9n/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java​​

​​https://github.com/c9n/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java​​

​​https://github.com/c9n/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java​​

分析:

此处不再将paxos协议详细的展开, 仅列出关注点

升级为新主:

ActiveStandbyElector模块

public synchronized void processResult(int rc, String path, Object ctx,
Stat stat) {
if (isStaleClient(ctx)) return;
monitorLockNodePending = false;

assert wantToBeInElection :
"Got a StatNode result after quitting election";

if (LOG.isDebugEnabled()) {
LOG.debug("StatNode result: " + rc + " for path: " + path
+ " connectionState: " + zkConnectionState + " for " + this);
}

Code code = Code.get(rc);
if (isSuccess(code)) {
// the following owner check completes verification in case the lock znode
// creation was retried
if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
// we own the lock znode. so we are the leader
if (!becomeActive()) {
reJoinElectionAfterFailureToBecomeActive();
}
} else {
// we dont own the lock znode. so we are a standby.
becomeStandby();
}
// the watch set by us will notify about changes
return;
}

if (isNodeDoesNotExist(code)) {
// the lock znode disappeared before we started monitoring it
enterNeutralMode();
joinElectionInternal();
return;
}

private boolean becomeActive() {
assert wantToBeInElection;
if (state == State.ACTIVE) {
// already active
return true;
}
try {
Stat oldBreadcrumbStat = fenceOldActive();
writeBreadCrumbNode(oldBreadcrumbStat);

LOG.debug("Becoming active for {}", this);

appClient.becomeActive();
state = State.ACTIVE;
return true;
} catch (Exception e) {
LOG.warn("Exception handling the winning of election", e);
// Caller will handle quitting and rejoining the election.
return false;
}
}

这块代码有意思的是

Stat oldBreadcrumbStat = fenceOldActive();

 函数如下:

/**
* If there is a breadcrumb node indicating that another node may need
* fencing, try to fence that node.
* @return the Stat of the breadcrumb node that was read, or null
* if no breadcrumb node existed
*/
private Stat fenceOldActive() throws InterruptedException, KeeperException {
final Stat stat = new Stat();
byte[] data;
LOG.info("Checking for any old active which needs to be fenced...");
try {
data = zkDoWithRetries(new ZKAction<byte[]>() {
@Override
public byte[] run() throws KeeperException, InterruptedException {
return zkClient.getData(zkBreadCrumbPath, false, stat);
}
});
} catch (KeeperException ke) {
if (isNodeDoesNotExist(ke.code())) {
LOG.info("No old node to fence");
return null;
}

// If we failed to read for any other reason, then likely we lost
// our session, or we don't have permissions, etc. In any case,
// we probably shouldn't become active, and failing the whole
// thing is the best bet.
throw ke;
}

LOG.info("Old node exists: {}", StringUtils.byteToHexString(data));
if (Arrays.equals(data, appData)) {
LOG.info("But old node has our own data, so don't need to fence it.");
} else {
appClient.fenceOldActive(data);
}
return stat;
}

在ZKFailoverController模块中

private synchronized void fenceOldActive(byte[] data) {
HAServiceTarget target = dataToTarget(data);

try {
doFence(target);
} catch (Throwable t) {
recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
throw t;
}
}

private void doFence(HAServiceTarget target) {
LOG.info("Should fence: " + target);
boolean gracefulWorked = new FailoverController(conf,
RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
if (gracefulWorked) {
// It's possible that it's in standby but just about to go into active,
// no? Is there some race here?
LOG.info("Successfully transitioned " + target + " to standby " +
"state without fencing");
return;
}

try {
target.checkFencingConfigured();
} catch (BadFencingConfigurationException e) {
LOG.error("Couldn't fence old active " + target, e);
recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active"));
throw new RuntimeException(e);
}

if (!target.getFencer().fence(target)) {
throw new RuntimeException("Unable to fence " + target);
}
}

进入NodeFencer模块

public boolean fence(HAServiceTarget fromSvc) {
return fence(fromSvc, null);
}

public boolean fence(HAServiceTarget fromSvc, HAServiceTarget toSvc) {
LOG.info("====== Beginning Service Fencing Process... ======");
int i = 0;
for (FenceMethodWithArg method : methods) {
LOG.info("Trying method " + (++i) + "/" + methods.size() +": " + method);

try {
// only true when target node is given, AND fencing on it failed
boolean toSvcFencingFailed = false;
// if target is given, try to fence on target first. Only if fencing
// on target succeeded, do fencing on source node.
if (toSvc != null) {
toSvcFencingFailed = !method.method.tryFence(toSvc, method.arg);
}
if (toSvcFencingFailed) {
LOG.error("====== Fencing on target failed, skipping fencing "
+ "on source ======");
} else {
if (method.method.tryFence(fromSvc, method.arg)) {
LOG.info("====== Fencing successful by method "
+ method + " ======");
return true;
}
}
} catch (BadFencingConfigurationException e) {
LOG.error("Fencing method " + method + " misconfigured", e);
continue;
} catch (Throwable t) {
LOG.error("Fencing method " + method + " failed with an unexpected error.", t);
continue;
}
LOG.warn("Fencing method " + method + " was unsuccessful.");
}

LOG.error("Unable to fence service by any configured method.");
return false;
}

至于FenceMethod, 是在读配置之后,用java的反射机制生成出来的

private static FenceMethodWithArg parseMethod(Configuration conf, String line)
throws BadFencingConfigurationException {
Matcher m;
if ((m = CLASS_WITH_ARGUMENT.matcher(line)).matches()) {
String className = m.group(1);
String arg = m.group(2);
return createFenceMethod(conf, className, arg);
} else if ((m = CLASS_WITHOUT_ARGUMENT.matcher(line)).matches()) {
String className = m.group(1);
return createFenceMethod(conf, className, null);
} else {
throw new BadFencingConfigurationException(
"Unable to parse line: '" + line + "'");
}
}

private static FenceMethodWithArg createFenceMethod(
Configuration conf, String clazzName, String arg)
throws BadFencingConfigurationException {

Class<?> clazz;
try {
// See if it's a short name for one of the built-in methods
clazz = STANDARD_METHODS.get(clazzName);
if (clazz == null) {
// Try to instantiate the user's custom method
clazz = Class.forName(clazzName);
}
} catch (Exception e) {
throw new BadFencingConfigurationException(
"Could not find configured fencing method " + clazzName,
e);
}

// Check that it implements the right interface
if (!FenceMethod.class.isAssignableFrom(clazz)) {
throw new BadFencingConfigurationException("Class " + clazzName +
" does not implement FenceMethod");
}

FenceMethod method = (FenceMethod)ReflectionUtils.newInstance(
clazz, conf);
method.checkArgs(arg);
return new FenceMethodWithArg(method, arg);
}

查看下配置

<!-- start interface org.apache.hadoop.ha.FenceMethod -->
<interface name="FenceMethod" abstract="true"
static="false" final="false" visibility="public"
deprecated="not deprecated">
<method name="checkArgs"
abstract="true" native="false" synchronized="false"
static="false" final="false" visibility="public"
deprecated="not deprecated">
<param name="args" type="java.lang.String"/>
<exception name="BadFencingConfigurationException" type="org.apache.hadoop.ha.BadFencingConfigurationException"/>
<doc>
<![CDATA[Verify that the given fencing method's arguments are valid.
@param args the arguments provided in the configuration. This may
be null if the operator did not configure any arguments.
@throws BadFencingConfigurationException if the arguments are invalid]]>
</doc>
</method>
<method name="tryFence" return="boolean"
abstract="true" native="false" synchronized="false"
static="false" final="false" visibility="public"
deprecated="not deprecated">
<param name="target" type="org.apache.hadoop.ha.HAServiceTarget"/>
<param name="args" type="java.lang.String"/>
<exception name="BadFencingConfigurationException" type="org.apache.hadoop.ha.BadFencingConfigurationException"/>
<doc>
<![CDATA[Attempt to fence the target node.
@param target the target of the service to fence
@param args the configured arguments, which were checked at startup by
{@link #checkArgs(String)}
@return true if fencing was successful, false if unsuccessful or
indeterminate
@throws BadFencingConfigurationException if the configuration was
determined to be invalid only at runtime]]>
</doc>
</method>
<doc>
<![CDATA[A fencing method is a method by which one node can forcibly prevent
another node from making continued progress. This might be implemented
by killing a process on the other node, by denying the other node's
access to shared storage, or by accessing a PDU to cut the other node's
power.
<p>
Since these methods are often vendor- or device-specific, operators
may implement this interface in order to achieve fencing.
<p>
Fencing is configured by the operator as an ordered list of methods to
attempt. Each method will be tried in turn, and the next in the list
will only be attempted if the previous one fails. See {@link NodeFencer}
for more information.
<p>
If an implementation also implements {@link Configurable} then its
<code>setConf</code> method will be called upon instantiation.]]>
</doc>
</interface>
<!-- end interface org.apache.hadoop.ha.FenceMethod -->

总结:

Hadoop在使用zookeeper的过程,failover的过程相比redis而言,在将自己提升为主master前,先向旧的主节点发送fencing。

这个过程有很多细节值得思考。

脑裂问题:

此处仅分析,当客户端-主节点,主节点和其他节点,发生网路分区。也就是整个集群的Accepter和旧主节点失去网络连接。那么当经过投票选举出新主后,客户端因为没有和旧主发生网络分区,也没有断开,反而正常的写入数据。

zkfc此处添加了一个fence步骤,通过fence通道强制杀死了旧主节点,避免旧数据继续写入。对此有如下疑问:

  1. 如果fence通道也发生网络故障,岂不是起不到作用了?
  1. fence假设物理主机还正常运行
  2. fence的新的网络通信方式可以正常连接? 比如使用ssh服务做网络交互?
  1. 为什么会发生,旧主和集群所有节点失去连接,但是却和客户端依然保持网络链接? 在现实中具体为什么会这样?
举报

相关推荐

0 条评论