案例:YARN多资源队列配置和使用
我们的需求是这样的,希望增加2个队列,一个是online队列,一个是offline队列
然后向offline队列中提交一个mapreduce任务
online队列里面运行实时任务
offline队列里面运行离线任务,我们现在学习的mapreduce就属于离线任务
实时任务我们后面会学习,等讲到了再具体分析。
这两个队列其实也是我们公司中最开始分配的队列,不过随着后期集群规模的扩大和业务需求的增加,后期又增加了多个队列。
在这里我们先增加这2个队列,后期再增加多个也是一样的。
具体步骤如下:
修改集群中etc/hadoop目录下的capacity-scheduler.xml配置文件
修改和增加以下参数,针对已有的参数,修改value中的值,针对没有的参数,则直接增加
这里的default是需要保留的,增加online,offline,这三个队列的资源比例为7:1:2
具体的比例需要根据实际的业务需求来,看你们那些类型的任务比较多,对应的队列中资源比例就调高一些,我们现在暂时还没有online任务,所以我就把online队列的资源占比设置的小一些。
先修改bigdata01上的配置
[root@bigdata01 hadoop]# vi capacity-scheduler.xml
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,online,offline</value>
<description>队列列表,多个队列之间使用逗号分割</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>70</value>
<description>default队列70%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.capacity</name>
<value>10</value>
<description>online队列10%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.capacity</name>
<value>20</value>
<description>offline队列20%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>70</value>
<description>Default队列可使用的资源上限.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.maximum-capacity</name>
<value>10</value>
<description>online队列可使用的资源上限.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.maximum-capacity</name>
<value>20</value>
<description>offline队列可使用的资源上限.</description>
</property>
修改好以后再同步到另外两个节点上
注:如果是单节点部署的,只要改本机就行了。
[root@bigdata01 hadoop]# scp -rq capacity-scheduler.xml bigdata02:/data/soft/hadoop-3.2.0/etc/hadoop/
[root@bigdata01 hadoop]# scp -rq capacity-scheduler.xml bigdata03:/data/soft/hadoop-3.2.0/etc/hadoop/
然后重启集群才能生效
[root@bigdata01 hadoop-3.2.0]# sbin/stop-all.sh
[root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh
进入yarn的web界面,查看最新的调度器队列信息
注意了,现在默认提交的任务还是会进入default的队列,如果希望向offline队列提交任务的话,需要指定队列名称,不指定就进默认的队列
在这里我们还需要同步微调一下代码,否则我们指定的队列信息 代码是无法识别的
拷贝WordCountJob类,新的类名为WordCountJobQueue
主要在job配置中增加一行代码
package com.imooc.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* 指定队列名称
*
* Created by xuwei
*/
public class WordCountJobQueue {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//输出k1,v1的值
//System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words) {
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2,v2);
}
}
}
/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
Logger logger = LoggerFactory.getLogger(MyReducer.class);
/**
* 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for(LongWritable v2: v2s){
//输出k2,v2的值
//System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
//logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
sum += v2.get();
}
//组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//输出k3,v3的值
//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
//logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
// 把结果写出去
context.write(k3,v3);
}
}
/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
//指定Job需要的配置参数
Configuration conf = new Configuration();
//解析命令行中-D后面传递过来的参数,添加到conf中
String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//创建一个Job
Job job = Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
job.setJarByClass(WordCountJobQueue.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,new Path(remainingArgs[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3的类型
job.setOutputKeyClass(Text.class);
//指定v3的类型
job.setOutputValueClass(LongWritable.class);
//提交job
job.waitForCompletion(true);
}catch(Exception e){
e.printStackTrace();
}
}
}
重新编译打包,上传到服务器上面
执行任务
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJobQueue -Dmapreduce.job.queuename=offline /test/hello.txt /outqueue
到yarn中查看任务的信息,这就说明这个任务是在offline队列中执行的
如果我们去掉指定队列名称的配置,此时还会使用default队列
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -rm -r /outqueue
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJobQueue /test/hello.txt /outqueue
到yarn中查看任务的信息,显示是在default队列中执行
这就是YARN中调度器多资源队列的配置,在工作中我们只要掌握如何使用这些队列就可以了,具体如何配置是我们向运维同学提需求,他们去配置。