0
点赞
收藏
分享

微信扫一扫

Yarn NodeLabel原理与实践

勇敢乌龟 2023-10-20 阅读 42

1. 背景

场景一

对于运行在AWS上的nodemanager,为了节省成本,选择了不定期回收的机器类型。在这些节点上运行的作业会失败重试,对于一些重要作业而言,这种行为不可接受。而Capacity Scheduler提供Node Label节点标签功能,可以给一些不可回收节点上的nodemanager打上标签,让重要作业运行在打过特定标签的节点上,保证作业顺利运行完成。

场景二

对于实时集群,其taskMananger会在不同节点间产生大量的流量通信。如果实时集群跨机房,机房间的流量会非常高,甚至打满带宽,导致其他正常使用贷款的服务产生延迟,发生严重后果。这种可以将不同机房的节点打上不同的标签,实时作业只允许运行在同一机房的Nodemanager执行,避免产生跨机房流量。

场景三

同一集群中,有些是高性能的机器,他的CPU和内存资源较高;有些则是低性能的老机器。对于重要的作业,为了避免它们运行在老机器上,可以给不同规格的机器打上标签,进行区别。

场景四

实时作业对网络带宽非常高,如果有作业占用了大量带宽,可能造成kafka数据堆积的情况。因此,对于实时性要求较高的作业,通过打标签的方式,只将资源提供给实时作业。来避免延迟问题。

2. NodeLabel原理

为了实现NodeLabel功能,需要实现以下3步:

  1. 给每个队列打上标签。
  2. 给每个nodemanager节点打上标签。那么该节点就属于同标签的队列下的资源。相同标签的节点属于同分区。
  3. 作业提交时指定标签和队列。那么作业指挥提交到同标签的相应队列上。最终运行到同标签的相应机器上。

在ResourceManager中,定义了RMNodeLabel表示nodel label,它继承了AbstractLabel类。RMNodeLabel它包含label名,资源总量,对应的nodemanager节点等信息:

public class RMNodeLabel extends AbstractLabel
    implements Comparable<RMNodeLabel> {
  //如果为true,则容器只能分配和其指定标签一致的节点上
  private boolean exclusive;
  //label名称
  private NodeLabel nodeLabel;
  //label对应的所有nodemanager节点
  private Set<NodeId> nodeIds;
}

public abstract class AbstractLabel {
  //Resource定义了label分区的总vcores和内存信息
  private Resource resource;
  //在线nodemanager数量
  private int numActiveNMs;
  private String labelName;
}

yarn_protos.proto定义了NodeLabelProto内容,它就表示lable名称:

message NodeLabelProto {
  optional string name = 1;
  optional bool isExclusive = 2 [default = true]; 
}

在进行队列选择时,会检查是否开启了全局调度。不开启yarn.scheduler.capacity.multi-node-placement-enabled全局调度时,candidates就只有一个节点。如果开启了,那么可以在同标签下的所有节点中进行资源调度:

private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
      FiCaSchedulerNode node) {
    CandidateNodeSet<FiCaSchedulerNode> candidates = null;
    //默认情况下,只设置选中节点分配资源
    candidates = new SimpleCandidateNodeSet<>(node);
    //是否开启了全局调度
    if (multiNodePlacementEnabled) {
      Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
      List<FiCaSchedulerNode> nodes = nodeTracker
          .getNodesPerPartition(node.getPartition());
      if (nodes != null && !nodes.isEmpty()) {
        //同分区所有节点加入队列中待分配资源
        nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
        candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
            nodesByPartition, node.getPartition());
      }
    }
    return candidates;
  }

在确定好调度节点后,执行ParentQueue.assignContainersParentQueue.assignContainers时都会检查该节点标签是否和队列标签一致,如果不一致,直接退出调度:

// if our queue cannot access this node, just return
    //作业指挥调度到同标签的节点,如果没有设置标签,只能调度到没有设置标签的节点上
    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
        //检查作业是否能够访问分区
        && !accessibleToPartition(candidates.getPartition())) {

      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
          getParentName(), getQueuePath(), ActivityState.REJECTED,
          ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION);
      if (rootQueue) {
        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
            node);
      }
      //如果节点和队列标签不匹配,退出本次调度
      return CSAssignment.NULL_ASSIGNMENT;
    }

//后续会进行调度

accessibleToPartition检测节点标签和队列标签是否匹配。注意,一个队列可以设置多个标签:

Set<String> accessibleLabels;

public boolean accessibleToPartition(String nodePartition) {
    // if queue's label is *, it can access any node
    if (accessibleLabels != null
        && accessibleLabels.contains(RMNodeLabelsManager.ANY)) {
      return true;
    }
    // any queue can access to a node without label
    if (nodePartition == null
        || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
      return true;
    }
    // a queue can access to a node only if it contains any label of the node
    if (accessibleLabels != null && accessibleLabels.contains(nodePartition)) {
      return true;
    }
    // sorry, you cannot access
    return false;
  }

经过节点以及队列的标签匹配后,队列下的作业也必然指定的是同种标签,无需再进行判断。至此就是Nodel Label标签核心逻辑了。

CapacityScheduler还提供接口修改标签。例如,CapacityScheduler处理队列标签更新事件:

public void handle(SchedulerEvent event) {
  case NODE_LABELS_UPDATE:
    {
      NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
          (NodeLabelsUpdateSchedulerEvent) event;

      updateNodeLabelsAndQueueResource(labelUpdateEvent);
    }

3. MapReduce计算组件适配

Hadoop3上线Node Label功能,但是Hadoop2.6.0还没有支持,需要提供相应支持。社区已经提供了相关patch:https://issues.apache.org/jira/browse/MAPREDUCE-6304

其大致思路是提供相关配置:

Untitled.png

在发送资源请求时,加上nodelabel:

Untitled 1.png

4. NodeLabel实践

在capacity-scheduler.xml中为指定queue队列设置标签:

<property>
    <name>yarn.scheduler.capacity.root.queue.accessible-node-labels</name>
    <value>alabel</value>
    <description>hadoop队列应用可用的节点标签</description>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.queue.default-node-label-expression</name>
    <value>alabel</value>
    <description>hadoop队列应用默认节点标签. 默认是 空,默认队列</description>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.queue.accessible-node-labels.alabel.capacity</name>
    <description>hadoop队列对alabel标签节点可用的百分比</description>
    <value>100</value>

在yarn-site.xml中增加相关配置:

<property>
  <name>yarn.node-labels.enabled</name>
  <value>true</value>
</property>
<property>
  //nodelabel信息持久化保存到hdfs中
  <name>yarn.node-labels.fs-store.root-dir</name>
  <value>hdfs://集群名/yarn/node-labels</value>
</property>
<property>
  #集群定义标签,适用于大集群
  <name>yarn.node-labels.configuration-type</name>
  <value>centralized</value>
</property>

centralized模式下,label相关命令:


# 添加label   exclusive 默认true
yarn rmadmin -addToClusterNodeLabels "<label1>(exclusive=<true|false>),<label2>(exclusive=<true|false>)"

# 关联node到label
yarn rmadmin -replaceLabelsOnNode "<node1>:<port>=<label1> <node2>:<port>=<label2>"

# 删除label
yarn rmadmin -removeFromClusterNodeLabels "<label>[,<label>,...]"

# 取消node 关联label
yarn rmadmin -replaceLabelsOnNode "<node1>"

# 查看label
yarn cluster --list-node-labels

# 刷新队列
yarn rmadmin -refreshQueues 

测试执行效果

 # 新建标签 alabel
 yarn rmadmin -addToClusterNodeLabels "alabel"
 # 清空节点gdc-dn01-nntest.i.nease.net 标签
 yarn rmadmin -replaceLabelsOnNode "gdc-dn01-test"
 # 关联节点gdc-dn01-nntest.i.nease.net 到alabel
 yarn rmadmin -replaceLabelsOnNode "gdc-dn01-test=alabel"

设置生效,新增了alabel标签:

Untitled 2.png

alabel下有该台nodemanager:

Untitled 3.png

如下,不同标签下可以有相同对了:

Untitled 4.png

节点标签如下:

Untitled 5.png

作业测试。最终他们都运行在对应标签的节点上:

# 指定所有container的nodelabel为dynamic
/home/hadoop/hadoop2/bin/hadoop jar /home/hadoop/hadoop2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.6.0.jar pi -Dmapreduce.job.node-label-expression=dynamic 10 10
# 指定AM container的nodelabel为dynamic
/home/hadoop/hadoop2/bin/hadoop jar /home/hadoop/hadoop2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.6.0.jar pi -Dmapreduce.job.am.node-label-expression=dynamic 10 10
# 指定map container的nodelabel为dynamic
/home/hadoop/hadoop2/bin/hadoop jar /home/hadoop/hadoop2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.6.0.jar pi -Dmapreduce.map.node-label-expression=dynamic 10 10
# 指定reduce container的nodelabel为dynamic
/home/hadoop/hadoop2/bin/hadoop jar /home/hadoop/hadoop2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.6.0.jar pi -Dmapreduce.reduce.node-label-expression=dynamic 10 10

5. 指标问题

上线后,发现jmx中没有没有nodelabel分区指标,backport社区patch即可解决:https://issues.apache.org/jira/browse/YARN-6492。

举报

相关推荐

0 条评论