1. 背景
场景一
对于运行在AWS上的nodemanager,为了节省成本,选择了不定期回收的机器类型。在这些节点上运行的作业会失败重试,对于一些重要作业而言,这种行为不可接受。而Capacity Scheduler提供Node Label节点标签功能,可以给一些不可回收节点上的nodemanager打上标签,让重要作业运行在打过特定标签的节点上,保证作业顺利运行完成。
场景二
对于实时集群,其taskMananger会在不同节点间产生大量的流量通信。如果实时集群跨机房,机房间的流量会非常高,甚至打满带宽,导致其他正常使用贷款的服务产生延迟,发生严重后果。这种可以将不同机房的节点打上不同的标签,实时作业只允许运行在同一机房的Nodemanager执行,避免产生跨机房流量。
场景三
同一集群中,有些是高性能的机器,他的CPU和内存资源较高;有些则是低性能的老机器。对于重要的作业,为了避免它们运行在老机器上,可以给不同规格的机器打上标签,进行区别。
场景四
实时作业对网络带宽非常高,如果有作业占用了大量带宽,可能造成kafka数据堆积的情况。因此,对于实时性要求较高的作业,通过打标签的方式,只将资源提供给实时作业。来避免延迟问题。
2. NodeLabel原理
为了实现NodeLabel功能,需要实现以下3步:
- 给每个队列打上标签。
- 给每个nodemanager节点打上标签。那么该节点就属于同标签的队列下的资源。相同标签的节点属于同分区。
- 作业提交时指定标签和队列。那么作业指挥提交到同标签的相应队列上。最终运行到同标签的相应机器上。
在ResourceManager中,定义了RMNodeLabel表示nodel label,它继承了AbstractLabel类。RMNodeLabel它包含label名,资源总量,对应的nodemanager节点等信息:
public class RMNodeLabel extends AbstractLabel
implements Comparable<RMNodeLabel> {
//如果为true,则容器只能分配和其指定标签一致的节点上
private boolean exclusive;
//label名称
private NodeLabel nodeLabel;
//label对应的所有nodemanager节点
private Set<NodeId> nodeIds;
}
public abstract class AbstractLabel {
//Resource定义了label分区的总vcores和内存信息
private Resource resource;
//在线nodemanager数量
private int numActiveNMs;
private String labelName;
}
yarn_protos.proto定义了NodeLabelProto内容,它就表示lable名称:
message NodeLabelProto {
optional string name = 1;
optional bool isExclusive = 2 [default = true];
}
在进行队列选择时,会检查是否开启了全局调度。不开启yarn.scheduler.capacity.multi-node-placement-enabled全局调度时,candidates就只有一个节点。如果开启了,那么可以在同标签下的所有节点中进行资源调度:
private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
FiCaSchedulerNode node) {
CandidateNodeSet<FiCaSchedulerNode> candidates = null;
//默认情况下,只设置选中节点分配资源
candidates = new SimpleCandidateNodeSet<>(node);
//是否开启了全局调度
if (multiNodePlacementEnabled) {
Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
List<FiCaSchedulerNode> nodes = nodeTracker
.getNodesPerPartition(node.getPartition());
if (nodes != null && !nodes.isEmpty()) {
//同分区所有节点加入队列中待分配资源
nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
nodesByPartition, node.getPartition());
}
}
return candidates;
}
在确定好调度节点后,执行ParentQueue.assignContainers
和ParentQueue.assignContainers
时都会检查该节点标签是否和队列标签一致,如果不一致,直接退出调度:
// if our queue cannot access this node, just return
//作业指挥调度到同标签的节点,如果没有设置标签,只能调度到没有设置标签的节点上
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
//检查作业是否能够访问分区
&& !accessibleToPartition(candidates.getPartition())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueuePath(), ActivityState.REJECTED,
ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION);
if (rootQueue) {
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
node);
}
//如果节点和队列标签不匹配,退出本次调度
return CSAssignment.NULL_ASSIGNMENT;
}
//后续会进行调度
accessibleToPartition检测节点标签和队列标签是否匹配。注意,一个队列可以设置多个标签:
Set<String> accessibleLabels;
public boolean accessibleToPartition(String nodePartition) {
// if queue's label is *, it can access any node
if (accessibleLabels != null
&& accessibleLabels.contains(RMNodeLabelsManager.ANY)) {
return true;
}
// any queue can access to a node without label
if (nodePartition == null
|| nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
return true;
}
// a queue can access to a node only if it contains any label of the node
if (accessibleLabels != null && accessibleLabels.contains(nodePartition)) {
return true;
}
// sorry, you cannot access
return false;
}
经过节点以及队列的标签匹配后,队列下的作业也必然指定的是同种标签,无需再进行判断。至此就是Nodel Label标签核心逻辑了。
CapacityScheduler还提供接口修改标签。例如,CapacityScheduler处理队列标签更新事件:
public void handle(SchedulerEvent event) {
case NODE_LABELS_UPDATE:
{
NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
(NodeLabelsUpdateSchedulerEvent) event;
updateNodeLabelsAndQueueResource(labelUpdateEvent);
}
3. MapReduce计算组件适配
Hadoop3上线Node Label功能,但是Hadoop2.6.0还没有支持,需要提供相应支持。社区已经提供了相关patch:https://issues.apache.org/jira/browse/MAPREDUCE-6304
其大致思路是提供相关配置:
在发送资源请求时,加上nodelabel:
4. NodeLabel实践
在capacity-scheduler.xml中为指定queue队列设置标签:
<property>
<name>yarn.scheduler.capacity.root.queue.accessible-node-labels</name>
<value>alabel</value>
<description>hadoop队列应用可用的节点标签</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.queue.default-node-label-expression</name>
<value>alabel</value>
<description>hadoop队列应用默认节点标签. 默认是 空,默认队列</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.queue.accessible-node-labels.alabel.capacity</name>
<description>hadoop队列对alabel标签节点可用的百分比</description>
<value>100</value>
在yarn-site.xml中增加相关配置:
<property>
<name>yarn.node-labels.enabled</name>
<value>true</value>
</property>
<property>
//nodelabel信息持久化保存到hdfs中
<name>yarn.node-labels.fs-store.root-dir</name>
<value>hdfs://集群名/yarn/node-labels</value>
</property>
<property>
#集群定义标签,适用于大集群
<name>yarn.node-labels.configuration-type</name>
<value>centralized</value>
</property>
centralized模式下,label相关命令:
# 添加label exclusive 默认true
yarn rmadmin -addToClusterNodeLabels "<label1>(exclusive=<true|false>),<label2>(exclusive=<true|false>)"
# 关联node到label
yarn rmadmin -replaceLabelsOnNode "<node1>:<port>=<label1> <node2>:<port>=<label2>"
# 删除label
yarn rmadmin -removeFromClusterNodeLabels "<label>[,<label>,...]"
# 取消node 关联label
yarn rmadmin -replaceLabelsOnNode "<node1>"
# 查看label
yarn cluster --list-node-labels
# 刷新队列
yarn rmadmin -refreshQueues
测试执行效果
# 新建标签 alabel
yarn rmadmin -addToClusterNodeLabels "alabel"
# 清空节点gdc-dn01-nntest.i.nease.net 标签
yarn rmadmin -replaceLabelsOnNode "gdc-dn01-test"
# 关联节点gdc-dn01-nntest.i.nease.net 到alabel
yarn rmadmin -replaceLabelsOnNode "gdc-dn01-test=alabel"
设置生效,新增了alabel标签:
alabel下有该台nodemanager:
如下,不同标签下可以有相同对了:
节点标签如下:
作业测试。最终他们都运行在对应标签的节点上:
# 指定所有container的nodelabel为dynamic
/home/hadoop/hadoop2/bin/hadoop jar /home/hadoop/hadoop2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.6.0.jar pi -Dmapreduce.job.node-label-expression=dynamic 10 10
# 指定AM container的nodelabel为dynamic
/home/hadoop/hadoop2/bin/hadoop jar /home/hadoop/hadoop2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.6.0.jar pi -Dmapreduce.job.am.node-label-expression=dynamic 10 10
# 指定map container的nodelabel为dynamic
/home/hadoop/hadoop2/bin/hadoop jar /home/hadoop/hadoop2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.6.0.jar pi -Dmapreduce.map.node-label-expression=dynamic 10 10
# 指定reduce container的nodelabel为dynamic
/home/hadoop/hadoop2/bin/hadoop jar /home/hadoop/hadoop2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.6.0.jar pi -Dmapreduce.reduce.node-label-expression=dynamic 10 10
5. 指标问题
上线后,发现jmx中没有没有nodelabel分区指标,backport社区patch即可解决:https://issues.apache.org/jira/browse/YARN-6492。