0
点赞
收藏
分享

微信扫一扫

Yarn Active ResourceManager启动框架分析

知年_7740 2022-02-24 阅读 71

1. 前言

通过第一篇文章Yarn Service设计思想知道,YARN为了处理繁多的功能,将每个功能封装成一个Service,通过实现CompositeService统一管理子服务的初始化/启动/停止流程。ResourceManager作为CompositeService的实现类,也遵循这套流程。更重要的是,在启动ResourceManager时,还会通过Zookeeper选举出Active ResourceManager。本篇文章以ResourceManager服务启动为基础,深入探究Active ResourceManager选举框架

2. ResourceManager启动哪些服务

对于ResourceManager提供的服务,分为两种:常驻服务和"活动"服务active service:

  • 常驻服务在两台ResourceManager上都提供。具体提供的服务如下:

image.png

  • Active Service只能在一台Active ResourceManager上提供。具体提供的服务如下:

image.png

对于ActiveServices的初始化/启动/管理,这里先写结论,具体的分析在后面:

ActiveServices并没有作为ResourceManager的子服务统一进行管理,ResourceManager服务在初始化/停止时,单独增加逻辑对Active Serivces进行初始化/停止;Active Serivces启动流程由ResourceManager的子服务EmbeddedElector管理。如下图所示:

image.png

3. 为什么不将ActiveServices服务作为ResourceManager子服务

由于ActiveServices只能在一台ResourceManager上启动,因此ActiveServices的start启动逻辑与ResourceManager的启动逻辑不同:ResourceManager无差别启动所有子服务,此时由于还不确定哪一台ResourceManager作为Active ResourceManager,因此此时如果ActiveServices加入ResourceManager子服务,那么两台ResourceManager上都会启动ActiveServices,这显然不符合ActiveServices服务的唯一性。

4. 何时启动ActiveServices服务

在两台ResourceManager中,为了选举出Active ResourceManager,从而提供唯一的ActiveServices服务。ResourceManager提供了常驻子服务EmbeddedElector。EmbeddedElector内部连接zookeeper,当启动EmbeddedElector时,互斥地争抢当前ResourceManager对应的锁,抢到锁后,当前ResourceManager状态切换成Active ResourceManager,并启动ActiveServices服务;否则当前ResourceManager状态切换成Standby ResourceManager,只维护常驻服务。同时,EmbeddedElector服务还向zookeeper注册watcher,一旦Active ResourceManager状态发生变化,watcher的回调函数会立即切换ResourceManager状态。

5. ResourceManager子服务初始化&启动流程

在针对Active ResourceManager启动方面,其重要的子服务初始化/启动流程如下:

  • EmbeddedElector选举服务初始化
  • ActiveServices初始化
  • EmbeddedElector选举服务启动
  • ActiveServices启动

对应的代码如下:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
public static void main(String argv[]) {
  //省略
  ResourceManager resourceManager = new ResourceManager();
  // 这里就是调用AbstractService.init,然后调用ResourceManager.serviceInit
  resourceManager.init(conf);
  // 和上面类似,调用ResourceManager.serviceStart
  resourceManager.start();
  //省略
}

5.1 ResourceManager初始化流程

ResourceManager#init初始化方法是继承自AbstractService#init方法,最终调用ResourceManager#serviceInit:

public abstract class AbstractService implements Service {
  public void init(Configuration conf) {
    if (conf == null) {
      throw new ServiceStateException("Cannot initialize service "
                                      + getName() + ": null configuration");
    }
    if (isInState(STATE.INITED)) {
      return;
    }
    synchronized (stateChangeLock) {
      if (enterState(STATE.INITED) != STATE.INITED) {
        setConfig(conf);
        try {
          serviceInit(config);
          if (isInState(STATE.INITED)) {
            //if the service ended up here during init,
            //notify the listeners
            notifyListeners();
          }
        } catch (Exception e) {
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        }
      }
    }
  }
}

注意:只有AbstractService实现了init方法,它负责调用子类(例如:ResourceManager) 的serviceInit方法,如果子类没有实现serviceInit方法,就调用AbstractService默认的serviceInit方法。CompositeService只实现serviceInit方法,它负责初始化所有子服务,serviceInit内部还是调用AbstractService#init方法,调用init方法流程同上。start和stop方法同理。

ResourceManager#serviceInit负责创建EmbeddedElector服务作为子服务,并调用ResourceManager#createAndInitActiveServices方法创建并初始化独立的ActiveServices服务:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
  protected void serviceInit(Configuration conf) throws Exception {
    //省略
    //注册常驻服务,例如AdminService
    adminService = createAdminService();
    addService(adminService);
    rmContext.setRMAdminService(adminService);
    //省略
    if (this.rmContext.isHAEnabled()) {
      // If the RM is configured to use an embedded leader elector,
      // initialize the leader elector.
      if (HAUtil.isAutomaticFailoverEnabled(conf)
          && HAUtil.isAutomaticFailoverEmbedded(conf)) {
        EmbeddedElector elector = createEmbeddedElector();
        addIfService(elector);
        rmContext.setLeaderElectorService(elector);
      }
    }
    //省略
    createAndInitActiveServices(false);
    //省略
  }
}

5.2 创建选举服务EmbeddedElector

ResourceManager根据yarn.resourcemanager.ha.curator-leader-elector.enabled配置确定EmbeddedElector的具体实现类。如果为true,就确定具体实现类为CuratorBasedElectorService,该实现类基于curator框架,curator框架是zk客户端框架,它在zookeeper原生API接口上进行了包装。默认的实现类为ActiveStandbyElectorBasedElectorService,它基于原生zookeeper API接口:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
  protected EmbeddedElector createEmbeddedElector() throws IOException {
    EmbeddedElector elector;
    curatorEnabled =
        conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
            YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
    if (curatorEnabled) {
      this.zkManager = createAndStartZKManager(conf);
      elector = new CuratorBasedElectorService(this);
    } else {
      elector = new ActiveStandbyElectorBasedElectorService(this);
    }
    return elector;
  }
}

5.3 ActiveStandbyElectorBasedElectorService选举服务初始化

本文章基于ActiveStandbyElectorBasedElectorService选举服务讨论Active ResourceManager选举流程。ActiveStandbyElectorBasedElectorService的初始化方法中,定义了zookeeper路径/yarn-leader-election/ActiveStandbyElectorLock,每台ResourceManager的ElectorService都会尝试在zookeeper中创建该临时路径。一旦路径创建成功,该ResourceManager最终会被选举成为Active ResourceManager。

最重要的是,ActiveStandbyElectorBasedElectorService初始化时,创建成员变量ActiveStandbyElector实例:

public class ActiveStandbyElectorBasedElectorService extends AbstractService implements EmbeddedElector, ActiveStandbyElector.ActiveStandbyElectorCallback {
  protected void serviceInit(Configuration conf) throws Exception {
    //省略
    String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
    //省略
    String rmId = HAUtil.getRMHAId(conf);
    String clusterId = YarnConfiguration.getClusterId(conf);
    localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);

    String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
    String electionZNode = zkBasePath + "/" + clusterId;

    zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);

    List<ACL> zkAcls = ZKCuratorManager.getZKAcls(conf);
    List<ZKUtil.ZKAuthInfo> zkAuths = ZKCuratorManager.getZKAuths(conf);
    //省略
    //创建选举对象
    elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
        electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);

    elector.ensureParentZNode();
    //省略
    super.serviceInit(conf);
  }
}

ActiveStandbyElector负责连接Zookeeper服务端,维持watcher,监听互斥锁/yarn-leader-election/ActiveStandbyElectorLock的状态,根据其状态切换ResourceManager的状态。

5.3.1 zookeeper连接

在ActiveStandbyElector初始化时,会创建与zookeeper的连接:

  public ActiveStandbyElector(String zookeeperHostPorts, int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl, List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app, int maxRetryNum, boolean failFast) throws IOException, HadoopIllegalArgumentException, KeeperException {
    if (app == null || acl == null || parentZnodeName == null
        || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
      throw new HadoopIllegalArgumentException("Invalid argument");
    }
    zkHostPort = zookeeperHostPorts;
    zkSessionTimeout = zookeeperSessionTimeout;
    zkAcl = acl;
    zkAuthInfo = authInfo;
    appClient = app;
    znodeWorkingDir = parentZnodeName;
    zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
    zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
    this.maxRetryNum = maxRetryNum;

    // establish the ZK Connection for future API calls
    if (failFast) {
      createConnection();
    } else {
      reEstablishSession();
    }
  }

reEstablishSession在createConnection方法外,包装了一层错误重试。这里直接看ActiveStandbyElector#createConnection方法:

  private void createConnection() throws IOException, KeeperException {
    //省略
    zkClient = connectToZooKeeper();
    //省略
  }

注意:ActiveStandbyElectorBasedElectorService在构建ActiveStandbyElector时,将自身实例传给ActiveStandbyElector构造函数的ActiveStandbyElectorCallback类型参数,最终赋值给ActiveStandbyElector的appClient成员。ActiveStandbyElector的appClient非常重要,文章后面会提到。

ActiveStandbyElector#connectToZooKeeper负责创建Watcher对象,对zookeeper进行监听:

  protected synchronized ZooKeeper connectToZooKeeper() throws IOException, KeeperException {
    watcher = new WatcherWithClientRef();
    //把watcher注册到zookeeper中
    ZooKeeper zk = createZooKeeper();
    watcher.setZooKeeperRef(zk);
    //省略
    watcher.waitForZKConnectionEvent(zkSessionTimeout);
    //省略
    return zk;
  }

WatcherWithClientRef#process方法负责处理zk事件,真实处理事件的是ActiveStandbyElector#processWatchEvent方法:

  private final class WatcherWithClientRef implements Watcher {
    private ZooKeeper zk;

    //只有收到zk服务端的返回的连接事件后,才允许处理其它事件
    private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
    //只有等待watcher设置了zookeeper引用,才能处理事件
    private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
    //省略普通方法

    //process是watcher处理zk事件的方法
    @Override
    public void process(WatchedEvent event) {
      //省略
      ActiveStandbyElector.this.processWatchEvent(zk, event);
      //省略
    }
  }

5.3.2 zookeeper监听处理

ActiveStandbyElector#processWatchEvent负责处理监听事件,zk状态和事件类型对应关系如下:
image.png

根据zk状态和事件类型的不同,对ResourceManager状态的调整策略也不同。具体处理逻辑如下所示:

public class ActiveStandbyElector implements StatCallback, StringCallback {
  private ConnectionState zkConnectionState = ConnectionState.TERMINATED;
  synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
    Event.EventType eventType = event.getType();

    //处理连接状态下的事件
    if (eventType == Event.EventType.None) {
      // the connection state has changed
      switch (event.getState()) {
      case SyncConnected:
        LOG.info("Session connected.");
        ConnectionState prevConnectionState = zkConnectionState;
        zkConnectionState = ConnectionState.CONNECTED;
        if (prevConnectionState == ConnectionState.DISCONNECTED &&
            wantToBeInElection) {
          monitorActiveStatus();
        }
        break;
      case Disconnected:
        LOG.info("Session disconnected. Entering neutral mode...");

        // ask the app to move to safe state because zookeeper connection
        // is not active and we dont know our state
        zkConnectionState = ConnectionState.DISCONNECTED;
        enterNeutralMode();
        break;
      case Expired:
        // the connection got terminated because of session timeout
        // call listener to reconnect
        LOG.info("Session expired. Entering neutral mode and rejoining...");
        enterNeutralMode();
        reJoinElection(0);
        break;
      case SaslAuthenticated:
        LOG.info("Successfully authenticated to ZooKeeper using SASL.");
        break;
      default:
        fatalError("Unexpected Zookeeper watch event state: "
            + event.getState());
        break;
      }

      return;
    }

    //监听节点发生修改
    String path = event.getPath();
    if (path != null) {
      switch (eventType) {
      case NodeDeleted:
        if (state == State.ACTIVE) {
          enterNeutralMode();
        }
        joinElectionInternal();
        break;
      case NodeDataChanged:
        monitorActiveStatus();
        break;
      default:
        if (LOG.isDebugEnabled()) {
          LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
        }
        monitorActiveStatus();
      }

      return;
    }

    // some unexpected error has occurred
    fatalError("Unexpected watch error from Zookeeper");
  }

对于ActiveStandbyElector处理zk事件的方法,无非就是ResourceManager进入active状态/standby状态/neutral状态。这里讨论一下它们的转换逻辑。

5.3.2.1 竞争active状态

检查是否存在节点,不存在就进入standby状态,并重新注册watcher:

public class ActiveStandbyElector implements StatCallback, StringCallback {
  private void monitorLockNodeAsync() {
    monitorLockNodePending = true;
    monitorLockNodeClient = zkClient;
    zkClient.exists(zkLockFilePath, watcher, this, zkClient);
  }
}

ActiveStandbyElector重写了exists回调函数,会根据分布式锁的获取情况转换ResourceManager的主备状态:

public class ActiveStandbyElector implements StatCallback, StringCallback {
  public synchronized void processResult(int rc, String path, Object ctx, Stat stat) {
      //如果当前ResourceManager获取到了zk分布式锁,就进入activce状态,否则就进入standby状态
      if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
        //进入active状态
        if (!becomeActive()) {
          reJoinElectionAfterFailureToBecomeActive();
        }
      } else {
        //进入standby状态
        becomeStandby();
      }

    }
    //节点不存在就进入中立状态,并尝试创建zk分布式锁
    if (isNodeDoesNotExist(code)) {
      enterNeutralMode();
      //尝试重新创建zk分布式锁
      joinElectionInternal();
      return;
    }
  }
}

5.3.2.2 中立状态处理

如果断连,就进入NEUTRAL状态:

public class ActiveStandbyElector implements StatCallback, StringCallback {
  private void enterNeutralMode() {
    if (state != State.NEUTRAL) {
      state = State.NEUTRAL;
      appClient.enterNeutralMode();
    }
  }
}

ActiveStandbyElector#enterNeutralMode调用appClient成员的enterNeutralMode方法。而appClient的实例类型其实就是ActiveStandbyElectorBasedElectorService,即调用ActiveStandbyElectorBasedElectorService#enterNeutralMode进入中立状态。中立状态下ResourceManager丢失与ZK的连接,尝试先进入standby状态:

public class ActiveStandbyElectorBasedElectorService extends AbstractService implements EmbeddedElector, ActiveStandbyElector.ActiveStandbyElectorCallback {
  public void enterNeutralMode() {
        //省略
        zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
        zkDisconnectTimer.schedule(new TimerTask() {
          @Override
          public void run() {
            synchronized (zkDisconnectLock) {
                becomeStandby();
            }
          }
        }, zkSessionTimeout);
      }
      //省略
   }
}

5.3.2.3 连接过期处理

如果过期,就重新尝试进入Active状态:

public class ActiveStandbyElector implements StatCallback, StringCallback {
  private void reJoinElection(int sleepTime) {
    sessionReestablishLockForTests.lock();
    try {
      terminateConnection();
      sleepFor(sleepTime);
      if (appData != null) {
        joinElectionInternal();
    } finally {
      sessionReestablishLockForTests.unlock();
    }
  }
}

对于初次链接zookeeper场景。初始状态是ConnectionState.TERMINATED,当客户端与zookeeper服务端成功创建会话时,客户端收到zookeeper服务端返回的状态是SyncConnected,其对应的事件类型是Event.EventType.None。按照zookeeper事件处理方法processWatchEvent,此时直接break跳出switch分支。这表示,当客户端成功与服务端建立连接,客户端不需要进行任何处理。

5.4 Active Service初始化

在ResourceManager初始化时,会额外调用方法初始化ActiveServices。Active Service不属于ResourceManager的子服务,即ResourceManager的初始化/启动/停止流程与Active Service无关:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
    protected void serviceInit(Configuration conf) throws Exception {
        //省略
        createAndInitActiveServices(false);
        //省略
    }
}

ResourceManager#createAndInitActiveServices调用activeServices的初始化逻辑:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
  protected void createAndInitActiveServices(boolean fromActive) {
    activeServices = new RMActiveServices(this);
    activeServices.fromActive = fromActive;
    activeServices.init(conf);
  }
}

activeServices具体类型为RMActiveServices,其初始化过程就是创建子服务,并添加子服务:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
  public class RMActiveServices extends CompositeService {
    protected void serviceInit(Configuration configuration) throws Exception {
      standByTransitionRunnable = new StandByTransitionRunnable();
      //忽略
      xxxService = createXxxService();
      addService(xxxService);
      //忽略
      super.serviceInit(conf);
    }
  }
}

5.5 选举服务启动

创建ActiveStandbyElectorBasedElectorService对象后,所有Resourcemanager都创建了zkClient,与zkServer创建连接。启动EmbeddedElector的调用流如下:
ActiveStandbyElectorBasedElectorService#serviceStart -> ActiveStandbyElector#joinElection -> ActiveStandbyElector#joinElectionInternal -> ActiveStandbyElector#createLockNodeAsync。ActiveStandbyElector#createLockNodeAsync负责获取获取active ResourceManager的锁:

public class ActiveStandbyElector implements StatCallback, StringCallback {
  private void createLockNodeAsync() {
    zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient);
  }
}

createLockNodeAsync调用Zookeeper#create尝试获取分布式锁,以进入Active状态:

public class ZooKeeper {
    public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode,  StringCallback cb, Object ctx)
    {
        //省略
    }
}

StringCallback是异步回调,表示当客户端向服务端发送创建节点的请求时,服务端异步返回响应消息给客户端后,客户端通过StringCallback#processResult处理该响应。

注意:Zookeeper 客户端中Watcher和AsyncCallback都是异步回调的方式,但它们回调的时机是不一样的,前者是由服务器发送事件触发客户端回调,后者是在执行了请求后得到响应后客户端主动触发的。

对于上述create方法,ActiveStandbyElector实现了Zookeeper提供的回调接口。当create方法执行完,异步执行ActiveStandbyElector#processResult方法:

public class ActiveStandbyElector implements StatCallback, StringCallback {
  public synchronized void processResult(int rc, String path, Object ctx,
      String name) {
    //省略
    if (isSuccess(code)) {
      // we successfully created the znode. we are the leader. start monitoring
      //尝试进入Active状态
      if (becomeActive()) {
        //验证
        monitorActiveStatus();
      } else {
        //否则重新尝试创建zookeeper节点,以获得Active状态
        reJoinElectionAfterFailureToBecomeActive();
      }
      return;
    }
    //如果创建节点失败,但是节点已经存在,就进入standby状态
    if (isNodeExists(code)) {
      if (createRetryCount == 0) {
        becomeStandby();
      }
      monitorActiveStatus();
      return;
    }

    //如果创建节点失败,节点尚未存在,就重试
    if (shouldRetry(code)) {
      if (createRetryCount < maxRetryNum) {
        ++createRetryCount;
        createLockNodeAsync();
        return;
      }
    //省略
    } 

  }

5.6 Active Service 启动

正常情况下,调用ActiveStandbyElector#becomeActive方法使ResourceManager进入active状态:

public class ActiveStandbyElector implements StatCallback, StringCallback {
  private boolean becomeActive() {
      //省略
      appClient.becomeActive();
      //省略
  }
}

appClient正是初始化ActiveStandbyElector对象时传入的ActiveStandbyElectorBasedElectorService实例:

public class ActiveStandbyElectorBasedElectorService extends AbstractService
    implements EmbeddedElector,
    ActiveStandbyElector.ActiveStandbyElectorCallback {
  public void becomeActive() throws ServiceFailedException {
    cancelDisconnectTimer();

    try {
      rm.getRMContext().getRMAdminService().transitionToActive(req);
    } catch (Exception e) {
      throw new ServiceFailedException("RM could not transition to Active", e);
    }
  }

调用AdminService#transitionToActive使当前ResourceManager进入Active状态:

public class AdminService extends CompositeService implements HAServiceProtocol, ResourceManagerAdministrationProtocol {
  public synchronized void transitionToActive(
      HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
      //省略
      rm.transitionToActive();
      //省略
  }
}

AdminService内部调用ResourceManager#startActiveServices方法使ResourceManager进入active状态:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
  synchronized void transitionToActive() throws Exception {
    if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) {
      LOG.info("Already in active state");
      return;
    }
    LOG.info("Transitioning to active state");

    this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
      @Override
      public Void run() throws Exception {
        try {
          startActiveServices();
          return null;
        } catch (Exception e) {
          reinitialize(true);
          throw e;
        }
      }
    });

    rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
    LOG.info("Transitioned to active state");
  }
}

ResourceManager#startActiveServices真正启动active services服务:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
  void startActiveServices() throws Exception {
    if (activeServices != null) {
      clusterTimeStamp = System.currentTimeMillis();
      activeServices.start();
    }
  }
}

5.7 切换standby状态

当选举失败时,ResourceManager会进入standby状态;如果此时ResourceManager已经处于active状态,会停止RMActiveServices服务:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
  synchronized void transitionToStandby(boolean initialize) throws Exception {
    if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.STANDBY) {
      LOG.info("Already in standby state");
      return;
    }

    LOG.info("Transitioning to standby state");
    HAServiceState state = rmContext.getHAServiceState();
    rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
    if (state == HAServiceProtocol.HAServiceState.ACTIVE) {
      stopActiveServices();
      reinitialize(initialize);
    }
    LOG.info("Transitioned to standby state");
  }
}

6. 总结

RMActiveServices不属于ResourceManager的子服务,初始化/启动/停止流程都独立于ResourceManager子服务流程:

  • 启动流程由ActiveStandbyElectorBasedElectorService选举服务负责。
  • 状态切换流程由Zookeeper监听器服务实现。
  • 初始化/停止流程由ResourceManager的额外的方法调用实现。
举报

相关推荐

0 条评论