切片与 与 MapTask 并行度
1 ) 问题引出
MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。
思考:1G 的数据,启动 8 个 MapTask,可以提高集群的并发处理能力。那么 1K 的数
据,也启动 8 个 MapTask,会提高集群性能吗?MapTask 并行任务是否越多越好呢?哪些因
素影响了 MapTask 并行度?
2 )MapTask 并行度决定 机制
数据 块:Block 是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。
数据 切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行
存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
Job 提交流程源码
waitForCompletion()
submit();
// 1 建立连接
connect();
// 1)创建提交 Job 的代理
new Cluster(getConfiguration());
// (1)判断是本地运行环境还是 yarn 集群运行环境
initialize(jobTrackAddr, conf);
// 2 提交 job
submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的 Stag 路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取 jobid ,并创建 Job 路径
JobID jobId = submitClient.getNewJobID();
// 3)拷贝 jar 包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向 Stag 路径写 XML 配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交 Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(),job.getCredentials());
解析
FileInputFormat 切片源码
(1)程序先找到你数据存储的目录。
(2)开始遍历处理(规划切片)目录下的每一个文件
(3)遍历第一个文件ss.txt
a)获取文件大小fs.sizeOf(ss.txt)
b)计算切片大小
computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
c)默认情况下,切片大小=blocksize
d)开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M
(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
e)将切片信息写到一个切片规划文件中
f)整个切片的核心过程在getSplit()方法中完成
g)InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等。
(4)提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。
FileInputFormat 切片机制
FileInputFormat切片大小的参数配置
TextInputFormat
1 )FileInputFormat 实现类
思考:在运行 MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制
格式文件、数据库表等。那么,针对不同的数据类型,MapReduce 是如何读取这些数据的呢?
FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、
NLineInputFormat、CombineTextInputFormat 和自定义 InputFormat 等。
2 )TextInputFormat
TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text 类型。
CombineTextInputFormat 切片机制
框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会
是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的
MapTask,处理效率极其低下。
1 ) 应用场景:
CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到
一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。
2 ) 虚拟存储 切片 最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
3 ) 切片机制
生成切片过程包括:虚拟存储过程和切片过程二部分。
CombineTextInputFormat 案例实操
4个小文件下载
链接:https://pan.baidu.com/s/1QPMxm1WOgCmbVc45K_w7XA
提取码:hjl4
大家可以先去执行以下我另外一篇文章的例子
Windows下搭建环境测试Mapreduce--集群测试_你可以自己看的博客-CSDN博客
只是运行前面的例子(运行以上四个小文件),后面的jar测试不要运行
会发现打印的日志中,将四个文件分成四个切片了,因为默认用的是TextInputFormat
接下来我们在WordCountDriver上添加两行代码
// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置 4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
再去启动方法
这个时候发现切片变成三个了
然后我们再把虚拟存储切片最大值设置为20M
//虚拟存储切片最大值设置 20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
再去执行方法
会发现现在切片已经变成了1个
MapReduce 工作流程
上面两个图的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解如下:
1.MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
2.从缓存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3.多个溢出文件会被合并成大的溢出文件
4.在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
5.ReduceTask根据自己的分区号,去哥哥MapTask机器上取相应的结果分区数据
6.ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
7.合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中去除一个一个的键值对Group,调用用户自定义的reduce()方法)
注意:
(1)Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区
越大,磁盘 io 的次数越少,执行速度就越快。
(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb 默认 100M。
Shuffle机制
Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle
Partition 分区
下面进行自定义Partitioner
在这之前,大家可以先去参考我的另外一篇文章,因为要使用到里面的例子
Hadoop的序列化和反序列化_你可以自己看的博客-CSDN博客
1.自定义类继承Partitioner,重写getPartition()方法。分成了四个区
package com.gk.mapreduce;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce
* @ClassName :
* @CreateTime :2022/3/1015:27
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class CustomPartitioner extends Partitioner<Text, FlowBean> {
public int getPartition(Text text, FlowBean flowBean, int i) {
// 设置分区控制编码 让135开头的为一个区,136开头的为一个区,137开头的为一个去,剩余的为一个区
int partition;
String prePhone = text.toString().substring(0, 3);
if ("135".equals(prePhone)) {
partition = 0;
} else if ("136".equals(prePhone)) {
partition = 1;
} else if ("137".equals(prePhone)) {
partition = 2;
} else {
partition = 3;
}
return partition;
}
}
在Driver里添加如下代码
// 在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
job.setNumReduceTasks(4);
这个时候再去启动方法
然后看生成的文件
可以看到已经根据电话号码的前缀分成了四个文件
大家可以尝试将ReduceTask设置小于想要分区的数量,已经大于分区数量,或等于1的情况
WritableComparable 排序
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
这里我们还是使用手机号码流量统计作为例子,输入文件是这个例子的输出文件,然后进行再排序
Hadoop的序列化和反序列化_你可以自己看的博客-CSDN博客
这里需要将FlowBean进行一个改造,实现里WritableCompareable接口,重新了compareTo方法
package com.gk.mapreduce.writableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce
* @ClassName :
* @CreateTime :2022/3/919:38
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class FlowBean implements WritableComparable<FlowBean> {
private Long upFlow; // 上行流量
private Long downFlow; // 下行流量
private Long sumFlow; // 总流量
// 提供无参构造
public FlowBean() {
}
public Long getUpFlow() {
return upFlow;
}
public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}
public Long getDownFlow() {
return downFlow;
}
public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}
public Long getSumFlow() {
return sumFlow;
}
public void setSumFlow(Long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
// 实现序列化和反序列化方法,注意顺序一定要保存一致
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
// 重写toString方法
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
// 重写compareTo方法
public int compareTo(FlowBean o) {
// 按照总流量比较,倒叙排列
if (this.sumFlow > o.sumFlow) {
return -1;
} else if (this.sumFlow < o.sumFlow) {
return 1;
} else {
return 0;
}
}
}
下面我们改造Mapper类,因为现在是根据总流量进行排序,所以将FlowBean作为key,手机号作为value
package com.gk.mapreduce.writableComparable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce
* @ClassName :
* @CreateTime :2022/3/919:44
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean outK = new FlowBean();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.获取一行数据
String line = value.toString();
// 2.切割数据
String[] split = line.split("\t");
// 封装outK outK
outK.setUpFlow(Long.valueOf(split[1]));
outK.setDownFlow(Long.valueOf(split[2]));
outK.setSumFlow();
outV.set(value);
// 5.写出
context.write(outK, outV);
}
}
接下来改造reducer类,因为mapper输出变了,而手机号相同的已经合并过了
package com.gk.mapreduce.writableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce
* @ClassName :
* @CreateTime :2022/3/919:51
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
private FlowBean outV = new FlowBean();
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 遍历values集合,循环写出,避免总流量相同的情况
for (Text value : values) {
// 调换kv位置,反向写出
context.write(value, key);
}
}
}
改造Driver类,因为Mapper类的输出变了
package com.gk.mapreduce.writableComparable;
import com.gk.mapreduce.partitioner.CustomPartitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce
* @ClassName :
* @CreateTime :2022/3/919:56
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2.读取jar包
job.setJarByClass(FlowDriver.class);
// 3.连接mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.mapper的输出格式
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5.最终结果的输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6.输入文件和输出文件路径
FileInputFormat.setInputPaths(job, new Path("E:\\SGG-Hadoop\\wcOutput"));
FileOutputFormat.setOutputPath(job, new Path("E:\\SGG-Hadoop\\wcOutput2"));
// 7.提交job
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
执行方法,获得输出文件
现在这个输出文件可以看到,是按照总流量进行倒叙排序
接下来加上上述例子的partitioner自定义分区,记得Mapper输入输出变了
package com.gk.mapreduce.writableComparable;
import com.gk.mapreduce.partitioner.CustomPartitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce
* @ClassName :
* @CreateTime :2022/3/919:56
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2.读取jar包
job.setJarByClass(FlowDriver.class);
// 3.连接mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.mapper的输出格式
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5.最终结果的输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设置自定义分区器
job.setPartitionerClass(CustomPartitioner2.class);
// 设置对应的 ReduceTask 的个数
job.setNumReduceTasks(5);
// 6.输入文件和输出文件路径
FileInputFormat.setInputPaths(job, new Path("E:\\SGG-Hadoop\\wcOutput"));
FileOutputFormat.setOutputPath(job, new Path("E:\\SGG-Hadoop\\wcOutput4"));
// 7.提交job
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
package com.gk.mapreduce.writableComparable;
import com.gk.mapreduce.partitioner.CustomPartitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce
* @ClassName :
* @CreateTime :2022/3/919:56
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2.读取jar包
job.setJarByClass(FlowDriver.class);
// 3.连接mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.mapper的输出格式
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5.最终结果的输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设置自定义分区器
job.setPartitionerClass(CustomPartitioner2.class);
// 设置对应的 ReduceTask 的个数
job.setNumReduceTasks(5);
// 6.输入文件和输出文件路径
FileInputFormat.setInputPaths(job, new Path("E:\\SGG-Hadoop\\wcOutput"));
FileOutputFormat.setOutputPath(job, new Path("E:\\SGG-Hadoop\\wcOutput5"));
// 7.提交job
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
接下来再执行一遍
可以看到已经生成了5个文件,大家可以点进去看一下,我这边就不赘述了
如果想要再根据上行流量排序,则可以在compareTo方法下加判断,如下
Combiner 合并
(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
(2)Combiner组件的父类就是Reducer。
(3)Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行;
(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv
应该跟Reducer的输入kv类型要对应起来。
Mapper
3 5 7 ->(3+5+7)/3=5
2 6 ->(2+6)/2=4
Reducer
(3+5+7+2+6)/5=23/5 不等于 (5+4)/2=9/2
参考例子(参考上半部分即可,上传部分忽略):
Windows下搭建环境测试Mapreduce--集群测试_你可以自己看的博客-CSDN博客
现在我们写一个combiner类
package com.gk.mapreduce.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.combiner
* @ClassName :
* @CreateTime :2022/3/1020:28
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable value = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 遍历读取值
for (IntWritable value : values) {
// 1.累加求和
sum += value.get();
}
// 2.输出
value.set(sum);
context.write(key, value);
}
}
我们先不开启combiner
这是未开启combiner,map阶段输出的数据
接下来开启combiner
大家可以看到输出的字节变小了,是因为重复的单词在map阶段让combiner进行合并了,减少了传输次数
接下来我们改一下输入的combiner,改成reducer
大家可以发现,使用WordCountReducer类也可以起到同样的效果。大家倒回去开一下combiner类和reducer类的代码,会发现是一样的。所以在开发中,并不是一定需要我们去重写combiner的,可以直接使用reducer
OutputFormat 数据
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。
下面我们写一个例子
输入文件
链接:https://pan.baidu.com/s/1taUXNBBMxs88-Q3eXJGnKg
提取码:hjl4
现在我们来创建Mapper
package com.gk.mapreduce.logOutPutFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.logOutPutFormat
* @ClassName :
* @CreateTime :2022/3/1021:02
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 不做处理,直接写出一行log数据
context.write(value, NullWritable.get());
}
}
创建reducer
package com.gk.mapreduce.logOutPutFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.logOutPutFormat
* @ClassName :
* @CreateTime :2022/3/1021:03
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 防止有重复数据,导致数据缺失
for (NullWritable value : values) {
context.write(key, value);
}
}
}
创建自定义outputformat
package com.gk.mapreduce.logOutPutFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.logOutPutFormat
* @ClassName :
* @CreateTime :2022/3/1021:06
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class LogOutPutFormat extends FileOutputFormat<Text, NullWritable> {
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
LogRecordWriter logRecordWriter = new LogRecordWriter(taskAttemptContext);
return logRecordWriter;
}
}
创建自定义recordWriter
package com.gk.mapreduce.logOutPutFormat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.logOutPutFormat
* @ClassName :
* @CreateTime :2022/3/1021:07
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream atguiguOut;
private FSDataOutputStream otherOut;
public LogRecordWriter(TaskAttemptContext taskAttemptContext) {
try {
// 获取文件操作系统
FileSystem fs = FileSystem.get(taskAttemptContext.getConfiguration());
// 用文件系统对象创建连哥哥输出流对应不同的目录
atguiguOut = fs.create(new Path("E:\\SGG-Hadoop\\wcInput\\inputoutputformat\\atguigu.log"));
otherOut = fs.create(new Path("E:\\SGG-Hadoop\\wcInput\\inputoutputformat\\other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
String log = text.toString();
// 根据一行的log数据是否包含atguigu,判断两条输出流输出的数据
if (log.contains("atguigu")) {
atguiguOut.writeBytes(log + "\n");
} else {
otherOut.writeBytes(log + "\n");
}
}
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
IOUtils.closeStream(atguiguOut);
IOUtils.closeStream(otherOut);
}
}
创建driver
package com.gk.mapreduce.logOutPutFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.logOutPutFormat
* @ClassName :
* @CreateTime :2022/3/1021:14
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置自定义的 outputformat
job.setOutputFormatClass(LogOutPutFormat.class);
FileInputFormat.setInputPaths(job, new Path("E:\\SGG-Hadoop\\wcInput\\inputoutputformat"));
//虽然我们自定义了 outputformat,但是因为我们的 outputformat 继承自fileoutputformat
//而 fileoutputformat 要输出一个_SUCCESS 文件,所以在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path("E:\\SGG-Hadoop\\wcInput\\inputoutputformat\\logoutput1"));
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
现在让我们执行方法
查看生成的文件
可以看到已经将想要的数据单独输出到另外的文件
MapTask 工作机制
1.Readj阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value
2.Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value
3.Collect收集阶段:在用户编写map()函数中。当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
4.Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤 1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。
步骤 2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文
件 output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之
前,对每个分区中的数据进行一次聚集操作。
步骤 3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元
信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大
小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。
5.Merge 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件output/file.out 中,同时生成相应的索引文件 output/file.out.index。
在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 mapreduce.task.io.sort.factor(默认 10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
ReduceTask 工作机制
1. Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
2.Sort 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。
3.Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。
ReduceTask 并行度决定机制
注意事项
MapTask & ReduceTask 源码解析
Join 应用
Reduce Join
测试的文件下载:
链接:https://pan.baidu.com/s/1KWckJZNOcNwLGEk8tRuEzQ
提取码:hjl4
接下来开始写代码
TableBean
package com.gk.mapreduce.reduceJoin;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.reduceJoin
* @ClassName :
* @CreateTime :2022/3/1116:02
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class TableBean implements Writable {
private String id; //订单id
private String pid; //产品id
private int amount; //产品数量
private String pname; //产品名称
private String flag; //判断是order表还是pd表的字段
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(pid);
dataOutput.writeInt(amount);
dataOutput.writeUTF(pname);
dataOutput.writeUTF(flag);
}
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.pid = dataInput.readUTF();
this.amount = dataInput.readInt();
this.pname = dataInput.readUTF();
this.flag = dataInput.readUTF();
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public String toString() {
return
id + "\t" + pname + "\t" + amount;
}
}
TableMapper
package com.gk.mapreduce.reduceJoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.reduceJoin
* @ClassName :
* @CreateTime :2022/3/1116:06
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
private String fileName;
private Text outK = new Text();
private TableBean outV = new TableBean();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 获取对应文件的名称
InputSplit split = context.getInputSplit();
FileSplit fileSplit = (FileSplit) split;
fileName = fileSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一行数据
String line = value.toString();
//判断是哪个文件,然后针对文件进行不同的操作
if (fileName.contains("order")) { //订单表的处理
String[] split = line.split("\t");
//封装 outK
outK.set(split[1]);
//封装 outV
outV.setId(split[0]);
outV.setPid(split[1]);
outV.setAmount(Integer.parseInt(split[2]));
outV.setPname("");
outV.setFlag("order");
} else { //商品表的处理
String[] split = line.split("\t");
//封装 outK
outK.set(split[0]);
//封装 outV
outV.setId("");
outV.setPid(split[0]);
outV.setAmount(0);
outV.setPname(split[1]);
outV.setFlag("pd");
}
// 写出
context.write(outK, outV);
}
}
TableReducer
package com.gk.mapreduce.reduceJoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.checkerframework.checker.units.qual.A;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.reduceJoin
* @ClassName :
* @CreateTime :2022/3/1116:15
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
ArrayList<TableBean> orderBeans = new ArrayList<TableBean>();
TableBean pdBean = new TableBean();
for (TableBean value : values) {
//判断数据来自哪个表
if ("order".equals(value.getFlag())) { //订单表
//创建一个临时 TableBean 对象接收 value
TableBean tmpOrderBean = new TableBean();
try {
BeanUtils.copyProperties(tmpOrderBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
//将临时 TableBean 对象添加到集合 orderBeans
orderBeans.add(tmpOrderBean);
} else { //商品表
try {
BeanUtils.copyProperties(pdBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
for (TableBean orderBean : orderBeans) {
orderBean.setPname(pdBean.getPname());
//写出修改后的 orderBean 对象
context.write(orderBean, NullWritable.get());
}
}
}
TableDriver
package com.gk.mapreduce.reduceJoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.reduceJoin
* @ClassName :
* @CreateTime :2022/3/1116:21
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取对象
Job job = Job.getInstance(new Configuration());
// 获取jar
job.setJarByClass(TableDriver.class);
// 绑定Mapper Reducer
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
// 设置输入输出对象
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
// 输入输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\SGG-Hadoop\\wcInput\\inputtable"));
FileOutputFormat.setOutputPath(job, new Path("E:\\SGG-Hadoop\\output"));
// 提交
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
运行方法,查看输出文件
已经将两个文件的数据合并到一起了
Map Join
2) 需求分析
MapJoin 适用于关联表中有小表的情形。
接下来进行代码编写
Driver
package com.gk.mapreduce.mapJoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.mapJoin
* @ClassName :
* @CreateTime :2022/3/1117:00
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class MapJoinDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// 1 获取 job 信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置加载 jar 包路径
job.setJarByClass(MapJoinDriver.class);
// 3 关联 mapper
job.setMapperClass(MapJoinMapper.class);
// 4 设置 Map 输出 KV 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 设置最终输出 KV 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 加载缓存数据
job.addCacheFile(new URI("file:///E:/SGG-Hadoop/wcInput/inputtable/pd.txt"));
// Map 端 Join 的逻辑不需要 Reduce 阶段,设置 reduceTask 数量为 0
job.setNumReduceTasks(0);
// 6 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\SGG-Hadoop\\wcInput\\inputtable2"));
FileOutputFormat.setOutputPath(job, new Path("E:\\SGG-Hadoop\\wcOutPut"));
// 7 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
mapper
package com.gk.mapreduce.mapJoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.mapJoin
* @ClassName :
* @CreateTime :2022/3/1117:03
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private HashMap<String, String> pdMap = new HashMap<String, String>();
private Text text = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 通过缓存文件得到小表数据pd.txt
URI[] cacheFiles = context.getCacheFiles();
Path path = new Path(cacheFiles[0]);
// 获取文件系统对象,并开流
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fis = fs.open(path);
// 通过包装流转换为reader,方便按行读取
BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
// 逐行读取,按行处理
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
// 切割一行
// 01 小米
String[] split = line.split("\t");
pdMap.put(split[0], split[1]);
}
// 关流
IOUtils.closeStream(reader);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 读取大表数据
// 1001 01 1
String[] fields = value.toString().split("\t");
// 通过大表每行数据的pid,去pdMap里面取出pname
String pname = pdMap.get(fields[1]);
// 将大表每行数据的pid替换为pname
text.set((fields[0] + "\t" + pname + "\t" + fields[2]));
// 写出
context.write(text, NullWritable.get());
}
}
运行方法,查看生成文件
与reduce的join是一样的结果。
数据清洗(ETL )
测试文件下载:
链接:https://pan.baidu.com/s/1e0mrmjn74DGmB0jFScp8jA
提取码:hjl4
开始编写代码
Mapper
package com.gk.mapreduce.webLog;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.webLog
* @ClassName :
* @CreateTime :2022/3/1119:46
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.获取一行数据
String line = value.toString();
// 2.解析日志
boolean result = parseLog(line, context);
// 3.日志不合法退出
if (result) {
return;
}
// 日志合法写出
context.write(value, NullWritable.get());
}
// 封装解析日志的方法
private boolean parseLog(String line, Context context) {
// 1.截取
String[] split = line.split(" ");
// 2.日志长度大于11的合法
if (split.length > 11) {
return true;
} else {
return false;
}
}
}
Driver
package com.gk.mapreduce.webLog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* create with IntelliJ IDEA
*
* @Project :hadoop-writable
* @Package :com.gk.mapreduce.webLog
* @ClassName :
* @CreateTime :2022/3/1119:49
* @Version :1.0
* @Author :锦林
* @Email :836658031@qq.com
**/
public class WebLogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[]{"E:\\SGG-Hadoop\\wcInput\\inputlog", "E:\\SGG-Hadoop\\wcOutPut"};
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WebLogDriver.class);
job.setMapperClass(WebLogMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置reducerTask的个数为0
job.setNumReduceTasks(0);
// 5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
执行方法,查看输入前的文件和结果文件
可以发现,文件的条数变小了,已经过滤掉一部分了
MapReduce 开发总结
Hadoop 数据压缩
压缩方式选择
压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。
Gzip 压缩
优点:压缩率比较高;
缺点:不支持 Split;压缩/解压速度一般;
Bzip2 压缩
优点:压缩率高;支持 Split;
缺点:压缩/解压速度慢。
Lzo 压缩
优点:压缩/解压速度比较快;支持 Split;
缺点:压缩率一般;想支持切片需要额外创建索引。
Snappy 压缩
优点:压缩和解压缩速度快;
缺点:不支持 Split;压缩率一般;
压缩位置选择
压缩参数配置
压缩测试
Map 输出端采用压缩
在我们之前的例子中找一个简单的——统计单词
在driver中添加如下代码
// 开启 map 端输出压缩
conf.setBoolean("mapreduce.map.output.compress", true);
// 设置 map 端输出压缩方式
conf.setClass("mapreduce.map.output.compress.codec",
BZip2Codec.class,CompressionCodec.class);
执行方法,会发现并没有什么变化,因为我们压缩的是mapper输出内容,会在reduce阶段被解压再输出,所以没变化
Reduce 输出端采用压缩
在driver中添加如下代码
// 设置 reduce 端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
执行方法,查看输出文件,会发现变成了bz2的压缩包
常见错误及解决方案