0
点赞
收藏
分享

微信扫一扫

nodemanager心跳调度过程

心智的年轮 2023-10-12 阅读 44

1. 背景

在上一篇文章中:https://blog.51cto.com/u_15327484/7834523,介绍了applicationmaster申请资源流程:

  1. 注册applicationmaster。
  2. 发送资源分配请求。
  3. 获取资源,启动container。
  4. 发送剩余所需资源的分配请求。

然而,虽然是通过ApplicationMasterProtocol#allocate进行资源分配,但实际上并不是由该接口直接从资源池里面获取资源,它只是获取分配结果,具体分配逻辑是由nodemanager的心跳进行触发的。本文将介绍nodemanager通过心跳请求,触发resourcemanager资源调度行为的流程。

2. 心跳相关协议

ResourceTracker协议中的三种通信内容如下:

  1. registerNodeManager:NodeManager向ResourceManager注册;
  2. nodeHeartbeat:NodeManager周期性心跳汇报;
  3. unRegisterNodeManager:NodeManager取消注册;

NodeManager通过nodeHeartbeat方法,向ResourceManager汇报自身资源使用情况。例如:当前可用资源、正在使用的资源、已经释放的资源。ResourceManager会调用nodeUpdate方法,进行资源调度,它会从队列中取出合适的应用资源请求,放到该nodemanager执行。

3. Nodemanager发送心跳请求

NodeStatusUpdaterImpl作为Nodemanager的一个Service,在NodeManager启动时,NodeStatusUpdaterImpl会启动一个新线程,用于发起定时的心跳任务。心跳线程主要步骤如下:

  1. 获取节点状态。
  2. 构建心跳请求。
  3. 执行心跳请求。

如下所示:

protected void startStatusUpdater() {

    statusUpdaterRunnable = new Runnable() {
      @Override
      @SuppressWarnings("unchecked")
      public void run() {
        int lastHeartBeatID = 0;
        while (!isStopped) {
          // Send heartbeat
          try {
            NodeHeartbeatResponse response = null;
            //获取当前NM节点的状态
            NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID);
            //构建心跳请求
            NodeHeartbeatRequest request =
                NodeHeartbeatRequest.newInstance(nodeStatus,
                  NodeStatusUpdaterImpl.this.context
                    .getContainerTokenSecretManager().getCurrentKey(),
                  NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
                    .getCurrentKey());

            if (logAggregationEnabled) {
              // pull log aggregation status for application running in this NM
              List<LogAggregationReport> logAggregationReports =
                  getLogAggregationReportsForApps(context
                    .getLogAggregationStatusForApps());
              if (logAggregationReports != null
                  && !logAggregationReports.isEmpty()) {
                request.setLogAggregationReportsForApps(logAggregationReports);
              }
            }
            //执行心跳请求
            response = resourceTracker.nodeHeartbeat(request);
            //get next heartbeat interval from response
            nextHeartBeatInterval = response.getNextHeartBeatInterval();
            updateMasterKeys(response);

            //省略
    }
    //启动定时心跳线程
    statusUpdater =
        new Thread(statusUpdaterRunnable, "Node Status Updater");
    statusUpdater.start();
  }

NM节点的状态信息由proto定义。如下所示,包含节点信息,container状态信息,节点健康信息,application信息。其中,container和application信息都是以数组的形式上报:

message NodeStatusProto {
  optional NodeIdProto node_id = 1;
  optional int32 response_id = 2;
  repeated ContainerStatusProto containersStatuses = 3;
  optional NodeHealthStatusProto nodeHealthStatus = 4;
  repeated ApplicationIdProto keep_alive_applications = 5;
}

4. ResourceManager执行调度行为

在ResourceManager侧,使用ResourceTrackerService#nodeHeartbeat处理心跳请求:

public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
      throws YarnException, IOException {
 
    // Heartbeat response
    NodeHeartbeatResponse nodeHeartBeatResponse =
        YarnServerBuilderUtils.newNodeHeartbeatResponse(
            getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
            NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
    rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
 
.....
    // 4. Send status to RMNode, saving the latest response.
    RMNodeStatusEvent nodeStatusEvent =
        new RMNodeStatusEvent(nodeId, remoteNodeStatus);
    if (request.getLogAggregationReportsForApps() != null
        && !request.getLogAggregationReportsForApps().isEmpty()) {
      nodeStatusEvent.setLogAggregationReportsForApps(request
        .getLogAggregationReportsForApps());
    }
    this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);
......
 
    return nodeHeartBeatResponse;
  }

依次进行如下流程处理:

  1. StatusUpdateWhenHealthyTransition处理RMNodeStatusEvent.STATUS_UPDATE事件。
  2. ResourceScheduler处理SchedulerEvent.NODE_UPDATE事件。

本文以ResourceScheduler的实现类FifoScheduler为例子进行讲解。依次调用nodeUpdate→assignContainer方法进行资源分配。

assignContainer方法中,调用FiCaSchedulerApp#allocate分配资源。可以看到,allocate方法直接将创建的资源放到newlyAllocatedContainers队列中。AppMaster调用FifoScheduler#allocate获取资源,其实就是获取FiCaSchedulerApp放到newlyAllocatedContainers队列中的资源。如下所示:

public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
      Priority priority, ResourceRequest request, 
      Container container) {

     try {
       writeLock.lock();
       if (isStopped) {
         return null;
       }

       // Required sanity check - AM can call 'allocate' to update resource
       // request without locking the scheduler, hence we need to check
       if (getTotalRequiredResources(priority) <= 0) {
         return null;
       }

       // Create RMContainer
       RMContainer rmContainer = new RMContainerImpl(container, this
           .getApplicationAttemptId(), node.getNodeID(),
           appSchedulingInfo.getUser(), this.rmContext);

       // Add it to allContainers list.
       //直接分配资源,并将资源放到newlyAllocatedContainers中
       newlyAllocatedContainers.add(rmContainer);
       liveContainers.put(container.getId(), rmContainer);

       // Update consumption and track allocations
       List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
           type, node, priority, request, container);
       Resources.addTo(currentConsumption, container.getResource());

       // Update resource requests related to "request" and store in RMContainer
       ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);

       // Inform the container
       rmContainer.handle(
           new RMContainerEvent(container.getId(), RMContainerEventType.START));

       if (LOG.isDebugEnabled()) {
         LOG.debug("allocate: applicationAttemptId="
             + container.getId().getApplicationAttemptId()
             + " container=" + container.getId() + " host="
             + container.getNodeId().getHost() + " type=" + type);
       }
       RMAuditLogger.logSuccess(getUser(),
           AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
           getApplicationId(), container.getId());

       return rmContainer;
     } finally {
       writeLock.unlock();
     }
  }

5. 总结

  1. NodeManager启动时,会创建线程调用startStatusUpdater方法向ResourceManager定期发送心跳。
  2. ResourceManager中最终使用ResouceScheduler的实现类的nodeUpdate方法处理心跳。该方法中,会给NodeManager上分配资源,分配的资源进行封装,放到newlyAllocatedContainers队列中。
  3. ApplicationMaster会定时向ResourceManager发送资源请求,ResourceManager最终执行ResourceScheduler#allocate方法从newlyAllocatedContainers队列中获取给当前app分配的资源。并向NodeManager发送请求启动容器。
举报

相关推荐

0 条评论