0
点赞
收藏
分享

微信扫一扫

FairScheduler迁移Capacity Scheduler实践

小美人鱼失去的腿 2023-10-18 阅读 39

1. 背景

需求一

当前线上集群A还是使用fair scheduler,当调度请求量大时,经长发生pending:

Untitled.png

观察队列使用情况,发现队列并没有用满,这说明是调度性能问题。

Capacity Scheduler由于使用了细粒度锁、异步调度等特性,吞吐量能够提升5-10倍。因此希望集群能够升级调度器为Capacity Scheduler。

需求二

对于运行在AWS上的nodemanager,为了节省成本,选择了不定期回收的机器类型。在这些节点上运行的作业会失败重试,对于一些重要作业而言,这种行为不可接受。而Capacity Scheduler提供Node Label节点标签功能,可以给一些不可回收节点上的nodemanager打上标签,让重要作业运行在打过特定标签的节点上,保证作业顺利运行完成。

2.步骤

  1. 禁止fair-scheduler增删改操作。
  2. 将fair-scheduler.xml转换成为capacity-scheduler.xml。
  3. 修改yarn-site.xml配置,使用CapacityScheduler。
  4. 滚动重启resourcemanager。
  5. 未来3天持续观察yarn调度相关指标,检查是否有异常。

3. 转换fair-scheduler.xml成为capacity-scheduler.xml

FairScheduler和CapacityScheduler使用的配置文件的格式不同。对于fair-scheduler.xml,它的属性名放在了标签中:

<allocations>
    <pool name="root">
      <minSharePreemptionTimeout>600</minSharePreemptionTimeout>
      <weight>1</weight>
      <aclSubmitApps>hadoop</aclSubmitApps>
      <aclAdministerApps>hadoop,hive</aclAdministerApps>
      <maxResources>36864 mb,18 vcores</maxResources>
      <fairSharePreemptionTimeout>60</fairSharePreemptionTimeout>
      <userMaxJobsDefault>1000</userMaxJobsDefault>
      <minResources>18432 mb,9 vcores</minResources>
  </pool>
</allocations>

而capacity-scheduler.xml则使用固定的标签<name></name>指定属性名:

<configuration>
    <property>
        <name>yarn.scheduler.capacity.root.hadoop.ordering-policy</name>
        <value>fair</value>
        <final>false</final>
        <source>programmatically</source>
    </property>
    <property>
        <name>yarn.scheduler.capacity.root.maximum-capacity</name>
        <value>100</value>
        <final>false</final>
        <source>programmatically</source>
    </property>
</configuration>

因此,切换到CapacityScheduler前,需要将fair-scheduler.xml的文件内容转换成capacity-scheduler.xml能够解析的格式。

Hadoop3.3.0以上的版本中,引入了https://issues.apache.org/jira/browse/YARN-9698,它提供fs2cs 工具进行转换工作。如下所示,指定yarn-site.xml和fair-scheduler.xml,并制定输出目录,该工具就会在输出目录中生成capacity-scheduler.xml和yarn-site.xml,注意生成的yarn-site.xml中包含了CapacityScheduler相关的配置信息,将这些信息粘贴进线上的yarn-site.xml中即可。

其命令如下所示:

yarn fs2cs -y /home/hadoop/tmp/fair/yarn-site.xml -f /home/hadoop/tmp/fair/fair-scheduler.xml -o /home/hadoop/tmp/capacity/ -s

在执行工具时,查看日志,发现该工具不支持转换userMaxAppsDefault、minResources、maxResources等队列属性,需要手动调整:

Untitled 1.png

由于fs2cs工具存在缺陷,如果队列数量成百上千,那么手动调整的工作量非常大,因此选择自己编写转换工具。其思路如下:

  1. 读取fair-scheduler.xml内容到内存中。
  2. 将队列信息导出到capacity-scheduler.xml文件中。

代码片段如下:

#读取fair-scheduler.xml内容
Element root = document.getRootElement();
Element rootPool = root.element("pool");
List<CsQueue> csQueues = new ArrayList<>();
String rootPoolName = rootPool.attributeValue("name");
String rootMinResources = rootPool.elementText("minResources");
String rootMaxResources = rootPool.elementText("maxResources");
String rootAclSubmitApps = rootPool.elementText("aclSubmitApps");
String rootAclAdministerApps = rootPool.elementText("aclAdministerApps");

#转化成为capacity-scheduler.xml格式,保存到文件中
Element property = configuration.addElement("property");
Element name = property.addElement("name");
Element value = property.addElement("value");
name.setText(CAPACITY_CONFIG_PREFIX + queuePath + "." + propertyMap.getString(entry.getKey()));
value.setText(entry.getValue().toString());
File file = new File(csFile);
XMLWriter writer = new XMLWriter(new FileOutputStream(file), format);
writer.write(doc);

4. 修改yarn-site.xml配置

切换前:

<property>
        <name>yarn.resourcemanager.scheduler.class</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
    </property>  
    <!-- YARN-1458 -->
    <property>
        #禁止基于任务的权重大小对应用进行调度。如果打开,权重越大的任务越先执行
        <name>yarn.scheduler.fair.sizebasedweight</name>
        <value>false</value>
    </property>
    <property>
        #允许批量调度。心跳调度时每个nodemanager资源可以分配多个container,默认进行一次调度会导致nodemanager资源没有分配完,资源利用率不高
        <name>yarn.scheduler.fair.assignmultiple</name>
        <value>true</value>
    </property>
    <property>
        #应用如果没有满足本地行,可以设置跳过调度机会,跳过机会的阈值=跳过次数/节点总数。-1表示不跳过任何调度机会,存算分离时,这个配置应该关闭。
        <name>yarn.scheduler.fair.locality.threshold.node</name>
        <value>0.05</value>
    </property>
    <property>
        #批量调度时,单次最高可以分配container的上限,默认-1表示不设限
        <name>yarn.scheduler.fair.max.assign</name>
        <value>3</value>
    </property>

切换后:

<property>
        <name>yarn.resourcemanager.scheduler.class</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
    </property>
    <property>
        #基于capacity-scheduler.xml文件保存队列配置信息
        <name>yarn.scheduler.configuration.store.class</name>
        <value>file</value>
    </property>
    <property>
        #一个应用只有一个appMaster,该配置限制所有appMaster占用资源最大能够占用集群资源的比例,用于限制并发。设置为1时,表示100%,不限制并发。
        <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
        <value>1.0</value>
    </property>
    <property>
        #最大应用数量
        <name>yarn.scheduler.capacity.maximum-applications</name>
        <value>1000000</value>
    </property>
    <property>
        #禁止自定义队列映射规则,使用默认的队列映射规则
        <name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
        <value>false</value>
    </property>
    <property>
        #队列映射规则。u表示针对用户,第一个user是提交程序的用户,第二个user是对应的队列
        <name>yarn.scheduler.capacity.queue-mappings</name>
        <value>u:%user:%user</value>
    </property>
    <property>
        #基于内存和CPU对内存进行调度器资源进行比较。默认只基于内存进行比较。
        <name>yarn.scheduler.capacity.resource-calculator</name>
        <value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
    </property>
    <property>
        #启动节点标签功能
        <name>yarn.node-labels.enabled</name>
        <value>true</value>
    </property>
    <property>
        #在hdfs中保存节点标签内容
        <name>yarn.node-labels.fs-store.root-dir</name>
        <value>hdfs://集群代号/yarn/node-labels</value>
    </property>
    <property>
        #通过rm提供的命令设置每个nodemanager的标签,集中管理。
        <name>yarn.node-labels.configuration-type</name>
        <value>centralized</value>
    </property>
    <property>
      #禁止节点本地性执行
      <name>yarn.scheduler.capacity.node-locality-delay</name>
      <value>-1</value>
    </property>
    <property>
      #禁止机架本地行执行
      <name>yarn.scheduler.capacity.rack-locality-additional-delay</name>
      <value>-1</value>
    </property>

5. 指标观察即作业运行情况

集群A切换到CapacityScheduler后,作业有大量堆积,用户反馈作业执行明显变慢,AM资源等待较长时间进行分配:

Untitled 2.png

第二天观察调度耗时,container分配吞吐量等指标。对于已经切换完成的3个集群,它们的平均调度耗时对比如下:

AM平均调度耗时 container平均调度耗时
集群A 6 s 3.8 ms
集群B 155 ms 940 微秒
集群C 44 ms 676 微秒

三个集群指标差距过大,特别是刚刚切换的集群A,平均调度耗时已经达到6s:

Untitled 3.png

集群A的吞吐量已经从396下降到231:

Untitled 4.png

对CapacityScheduler调度逻辑进行排查,发现其在调度时,会根据每个队列的usedCapacity进行排序,选择usedCapacity最小的队列进行调度,这个过程耗时与队列数量有关:

队列数量 平均调度耗时
集群A 2706 3.8 ms
集群D 1700 3.7 ms
集群B 620 940 微秒
集群C 399 676 微秒

6. CapacityScheduler队列排序问题

在https://blog.51cto.com/u_15327484/7894282文章中,讲解了Hadoop3.3.0之前,使用queueComparator基于队列资源使用率对队列进行排序。

在Hadoop3.0.0后,使用queueOrderingPolicy表示队列排序策略:

queueOrderingPolicy = csContext.getConfiguration().getQueueOrderingPolicy(
          getQueuePath(), parent == null ?
              null :
              ((ParentQueue) parent).getQueueOrderingPolicyConfigName());
      queueOrderingPolicy.setQueues(childQueues);

提供两种排序方法:utilization和priority-utilization。前者是指考虑资源使用率,后者还考虑了队列优先级。默认就是utilization,这和Hadoop2.6.0中的queueComparator规则一致。

utilization和priority-utilization都适用PriorityQueueComparator类进行排序,区别在于respectPriority参数,如果该参数为false,排序规则则是utilization;如果该参数是true,排序规则则是priority-utilization:

private int compare(PriorityQueueResourcesForSorting q1Sort,
        PriorityQueueResourcesForSorting q2Sort, float q1Used,
                        float q2Used, int q1Prior, int q2Prior) {

      int p1 = 0;
      int p2 = 0;
      if (respectPriority) {
        p1 = q1Prior;
        p2 = q2Prior;
      }

      int rc = PriorityUtilizationQueueOrderingPolicy.compare(q1Used, q2Used,
          p1, p2);

      // For queue with same used ratio / priority, queue with higher configured
      // capacity goes first
      if (0 == rc) {
        Resource minEffRes1 =
                q1Sort.configuredMinResource;
        Resource minEffRes2 =
                q2Sort.configuredMinResource;
        if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals(
            Resources.none())) {
          return minEffRes2.compareTo(minEffRes1);
        }

        float abs1 = q1Sort.absoluteCapacity;
        float abs2 = q2Sort.absoluteCapacity;
        return Float.compare(abs2, abs1);
      }

      return rc;
    }

每次调度时,会调用PriorityUtilizationQueueOrderingPolicy的getAssignmentIterator方法对队列进行排序:

// 对所有队列进行排序
  public Iterator<CSQueue> getAssignmentIterator(String partition) {
    // Since partitionToLookAt is a thread local variable, and every time we
    // copy and sort queues, so it's safe for multi-threading environment.
    PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);

    // Take snapshots of ParentQueue's PriorityQueueResourcesForSorting
    // 只对已有的队列使用新List进行排序,不考虑新增队列。否则会导致调度线程crash
    List<PriorityQueueResourcesForSorting> sortedQueueResources =
            queues.stream().
                    map(queue -> new PriorityQueueResourcesForSorting(queue)).
                    collect(Collectors.toList());

    // 使用PriorityQueueComparator进行排序
    Collections.sort(sortedQueueResources, new PriorityQueueComparator());
    // Return the reference queue iterator
    return sortedQueueResources.stream().
            map(sorting -> sorting.getQueue()).
            collect(Collectors.toList()).iterator();
  }

当队列数过多时,会降低调度效率。根据上述实践,当队列不超过1000时,可以直接切换到capacityScheduler中。

7. 后续操作

由于集群调度性能下降,第二天直接将调度器回滚到fair-scheduler。为了优化fair-scheduler,后续的规划是:

  1. 使用yarn-federation,将yarn切分成小集群,每个集群的队列数不超过1000,这样可以解决排序性能问题。
  2. 由于yarn-federation需要调研、测试、灰度流程,上线时间会推迟。因此可以先在fair scheduler中上线并发调度功能,减轻调度压力。
举报

相关推荐

0 条评论