0
点赞
收藏
分享

微信扫一扫

Hadoop的MapReduce框架原理

和谐幸福的人生 2022-05-03 阅读 77

在上一篇文章中http://t.csdn.cn/m8a2D,对MapReduce框架的使用做了简要介绍,本文对框架的更多细节进行记录。

如下所示为Map Reduce框架的任务执行流程,输入Input在经过InputFormat处理之后交由Mapper进行切分,之后根据输出的key进行shuffle操作,之后将键值对交由Reducer进行汇集,输出的键值对经OutputFormat处理之后转化为想要的输出。

因此在使用MapReduce框架时,主要从InputFormat、Mapper、分区、排序、Combiner、Reducer、OutputFormat等几个方面来考虑程序的执行逻辑,本文也将重点从这几个方面对框架进行介绍。
在这里插入图片描述

1 数据切片

数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。

数据切片:在Mapper中会对任务进行切分从而提高处理的并行度,进而加快整体执行速度。为了对任务进行划分,Mapper会在逻辑上将整体数据切分为多个切片,并且为每个切片对应启动一个MapTask执行。

由于HDFS系统中的数据被切分为不同的数据块并存储在不同的节点上,因此为了方便数据的读取,数据切片大小和数据块大小是一致的。

如下所示为提交任务的调试过程中的关键代码

/*----------WordCountDriver-----------*/
waitForCompletion()

/*----------Job.java-----------*/
submit();

// 1建立连接
	connect();	
		// 1)创建提交Job的代理
		new Cluster(getConfiguration());
			// (1)判断是本地运行环境还是yarn集群运行环境
			initialize(jobTrackAddr, conf); 

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)

/*----------JobSubmitter.java-----------*/
	// 1)创建给集群提交数据的Stag路径
	Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

	// 2)获取jobid ,并创建Job路径
	JobID jobId = submitClient.getNewJobID();

	// 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);	
	rUploader.uploadFiles(job, jobSubmitDir);

	// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
		maps = writeNewSplits(job, jobSubmitDir);
		input.getSplits(job);
		JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);

	// 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
	conf.writeXml(out);

	// 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

在上面input.getSplits()方法中完成对输入文件的切片规划,并且通过createSplitFiles()方法将切片信息暂时写入hadoop/mapred/staging文件夹下。切片的数据类型InputFormat有对文件的切分FileInputFormat、数据块DbInputFormat等对多种数据源进行处理。进一步,文件切分又包含按行切分TextInputFormat、多个小文件合并切分CombineFileInputFormat等。

TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。

如下所示为FileInputFormat类中对输入文件进行切片的关键代码

//切片最小值,参数如果调的比blockSize大,则可以让切片变得比blockSize还大
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
//切片最大值,参数如果调得比blockSize小,则会让切片变小
long maxSize = getMaxSplitSize(job);

file = (FileStatus)var10.next();		//使用迭代器对文件夹中的文件进行遍历

long blockSize = file.getBlockSize();
//综合计算切片大小,不仅与blockSize有关,还要考虑上面minSize和maxSize的设置
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);		

//循环对文件进行切分,如果剩余文件/切分大小大于1.1才进行切分
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1; bytesRemaining -= splitSize) {
	blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
	splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}

CombineTextInputFormat

在TextInputFormat进行切片时,不管文件多小,都会是一个单独的切片并生成对应MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。这时可以使用CombineTextInputFormat将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

如下所示,对小文件的大小进行设置

job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

首先对小文件在逻辑上划分为虚拟块,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。

之后对虚拟块进行合并切片。首先判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片;如果不大于则跟下一个虚拟块合并形成一个切片。

2 MapReduce数据流动

如下所示为Map Reduce框架中数据处理流动示意图

在这里插入图片描述

  1. 输入待处理文件
  2. 进行切片分析,获取待处理数据的信息,根据配置形成任务规划
  3. 将执行任务的相关文件从客户端提交到集群
  4. 集群首先启动MRAppMaster,根据切片信息开启相应数量的MapTask
  5. Read阶段:MapTask通过RecorederReader读取输入文件,并通过InputFormat转化为值键对
  6. Map阶段:用户在Mapper中可以按键值对执行自定义的Map操作
  7. Collect阶段:将键值对写入到内存中的环形缓冲区,其中包含键值对数据和对应的元数据索引Meta(数据分区、起始位置等信息)。
  8. Spill阶段:利用快速排序算法对缓存区内的数据进行排序。先按照分区编号Partition,后按照key进行排序。这样数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。排序的时候并不会直接移动数据,而是通过数据索引进行的。
  9. Spill阶段:缓冲区在写到80%之后就会开始反向写入,同时将内存中排序好的数据按照分区溢写到磁盘上工作目录下output/spillN.out中(N表示当前溢写次数)。分区数据的元信息保存在内存索引数据结构SpillRecord中,如果超过1MB,会写到文件output/spillN.out.index
  10. Spill阶段:可以使用Combiner对key值相同的数据提前进行合并,由于当前数据已经有序,相同key的数据放在一起,只需对相邻数据进行比较合并即可
  11. Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件
  12. 由MRAppMaster启动相应数量的ReduceTask,并告知ReduceTask处理数据范围(数据分区)
  13. Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据到内存,如果其大小超过一定阈值,则写到磁盘上
  14. Sort阶段:在拷贝同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,并对相同键的数据进行分组合并GroupingComparator(k,knext)
  15. Reduce阶段:通过Reducer读取一组键值对数据,执行用户自定义的Reduce操作
  16. 通过OutputFormat输出结果

Shuffle

其中Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle,其详细的数据处理过程如下所示
在这里插入图片描述
(1)MapTask收集map()方法输出的<key, value>对,放到内存缓冲区中
(2)缓冲区数据写到80%会发生溢写,将内存中的数据写入到磁盘,可能会溢出多个文件
(3)多个溢出文件会被合并成大的溢出文件
(4)在溢出过程及合并的过程中,都要调用Partitioner进行分区以及针对key进行排序
(5)ReduceTask根据自己的分区号,去各个MapTask机器上获取相应的分区数据
(6)ReduceTask从不同MapTask将属于同一分区的数据汇总到一起,并通过归并排序进行合并
(7)合并成大文件后,Shuffle的过程结束,后面进入ReduceTask遍历每个键值对调用用户自定义的reduce()方法完成业务操作

Partition分区

在Reducer将文件进行输出时可以按照key对数据进行分区,从而输出到不同的文件,默认的分区方法是根据key的hashCode对ReduceTasks个数取模。

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

用户自定义的分区类需要继承Partitioner类,重写其中的getPartition()方法来控制分区过程。如下所示为按照key值手机号开头三位数字不同返回不同的分区号,注意分区号从0开始逐一累加

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        //获取手机号前三位prePhone
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);
        //定义一个分区号变量partition,根据prePhone设置分区号
        int partition;

        if("136".equals(prePhone)){
            partition = 0;
        }else if("137".equals(prePhone)){
            partition = 1;
        }else if("138".equals(prePhone)){
            partition = 2;
        }else if("139".equals(prePhone)){
            partition = 3;
        }else {
            partition = 4;
        }

        //最后返回分区号partition
        return partition;
    }
}

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置。如果设为0代表没有Reduce,直接输出Map结果;ReduceTask默认为1,输出一个结果文件;一般情况下ReduceTask数量要和分区数一致,如果ReduceTask过多会产生空白的输出文件part-r-000xx,如果过少会导致分区数据无法处理抛出异常

//在Job驱动中,设置自定义Partitioner 
job.setPartitionerClass(CustomPartitioner.class);
//根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);

WritableComparable排序

在Map Reduce两个过程中都需要根据key值对数据进行排序。对于MapTask,在环形缓冲区进行溢写到磁盘时会进行快速排序,处理完后还会对磁盘上所有文件进行归并排序;对于ReduceTask,会从每个MapTask拷贝相应的数据文件,最后统一进行归并排序。

由于需要按照key值对数据进行排序,因此键值对中的key必须是可以比较的,除了基本数据类型之外,当我们使用自定义的数据类型作为key时,就需要实现WritableComparable接口来进行比较。

例如对手机流量的统计结果按照总流量从大到小进行排序如下所示

13509468723	7335	110349	117684
13736230513	2481	24681	27162
13956435636	132		1512	1644
13846544121	264		0		264

这时候就需要使用手机流量FlowBean作为key进行比较和排序,因此实现WritableComparable接口并实现compareTo()方法用于比较流量

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //总流量

		//构造函数、getter、setter、序列化反序列化方法、toString方法和原来一样

    @Override
    public int compareTo(FlowBean o) {
        //按照总流量比较,倒序排列
        if(this.sumFlow > o.sumFlow){
            return -1;
        }else if(this.sumFlow < o.sumFlow){
            return 1;
        }else {
            return 0;
        }
    }
}

由于这里使用<FlowBean, Text>作为Mapper的输出,所以需要修改Mapper和Reducer相关泛型类,并在Driver类中设置相应的键值对类型,此外,在Reducer进行输出时,为了避免相同流量的手机号进行合并,所以需要再将手机号作为key,流量作为value

/*--------FlowReducer.java-----------*/
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        //遍历values集合,循环写出,避免总流量相同的情况
        for (Text value : values) {
            //调换KV位置,反向写出
            context.write(value,key);
        }
    }
}

/*--------FlowDriver.java-----------*/
public class FlowDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		...
    		
        //4 设置Map端输出数据的KV类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
				...
    }
}

Combiner

Combiner是Mapper和Reducer之间用于对MapTask输出进行局部汇总以减少网络传输量的组件。

例如在WordCount进行字符统计的时候,对于相同的单词就可以在Combiner中先进行一次合并,从而减少向Reducer传输的数据量。但是如果遇到求平均值的情景,在Combiner求均值后传输就会导致丢失原来数据而计算错误。

Combiner作为Reducer的子类,其实现过程和Reducer类似,如下所示使用WordCountCombiner对单词数量进行聚合

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
	private IntWritable outV = new IntWritable();
	
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }

        //封装outKV
        outV.set(sum);
        //写出outKV
        context.write(key,outV);
    }
}

之后将WordcountReducer作为Combiner在WordcountDriver驱动类中指定

job.setCombinerClass(WordCountReducer.class);

可以看到Combiner对输出进行了聚合
在这里插入图片描述

OutputFormat

MapReduce通过OutputFormat对结果进行输出,他有多种实现类,不仅可以输出到文件,还可以写到MySQL、HBase等数据库。其默认输出格式为TextOutputFormat,将结果输出到文件。但是有时候我们需要自定义输出结果,这时候就需要自定义输出类。

如下所示,实现自定义输出将结果按照key值的不同分别输出到不同文件当中。首先自定义LogOutputFormat继承FileOutputFormat,该类主要用于返回自定义的文件写入对象LogRecordWriter

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        //创建一个自定义的RecordWriter返回
        LogRecordWriter logRecordWriter = new LogRecordWriter(job);
        return logRecordWriter;
    }
}

接下来实现自定义的文件写入类LogRecordWriter,在该类的构造方法中打开文件流,并在write()方法中根据key中的关键字不同分别将结果写入不同文件流,最后在close()方法中关闭文件流。

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
    private FSDataOutputStream warnLog;
    private FSDataOutputStream infoLog;

    public LogRecordWriter(TaskAttemptContext job) {
        try {
            //获取文件系统对象
            FileSystem fs = FileSystem.get(job.getConfiguration());
            //用文件系统对象创建两个输出流对应不同的目录
            warnLog= fs.create(new Path("d:/hadoop/info.log"));
            infoLog= fs.create(new Path("d:/hadoop/warn.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String log = key.toString();
        //根据一行的log数据是否包含warn,判断两条输出流输出的内容
        if (log.contains("warn")) {
            warnLog.writeBytes(log + "\n");
        } else {
            infoLog.writeBytes(log + "\n");
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        //关闭文件流
        IOUtils.closeStream(warnLog);
        IOUtils.closeStream(infoLog);
    }
}

最后需要在驱动类中设置文件输出类。需要注意的是虽然已经在LogRecordWriter中定义了输出文件位置,但是fileoutputformat要输出一个_SUCCESS文件,所以还得通过setOutputPath()指定一个输出目录

//设置自定义的outputformat
job.setOutputFormatClass(LogOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path("D:\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput"));

3 数据压缩

在Map和Reduce之间需要数据传递,当数据量很大时通过压缩可以减少传输时间,对于IO密集型的任务使用压缩可以减少运行时间,但是对于运算密集型的任务,压缩和解压会占用大量时间反而导致变慢。另一方面,将数据压缩后保存也可以减少磁盘存储空间的占用。

3.1 压缩算法

常用的压缩算法如下所示

压缩格式Hadoop自带算法文件扩展名是否可切片是否需要修改原程序
DEFLATE是,直接使用DEFLATE.deflate和文本处理一样,不需要修改
Gzip是,直接使用DEFLATE.gz和文本处理一样,不需要修改
bzip2是,直接使用bzip2.bz2和文本处理一样,不需要修改
LZO否,需要安装LZO.lzo需要建索引,还需要指定输入格式
Snappy是,直接使用Snappy.snappy和文本处理一样,不需要修改

在选择压缩方式时需要考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片

  • Gzip压缩优点:压缩率比较高; 缺点:不支持Split;压缩/解压速度一般;
  • Bzip2压缩优点:压缩率高;支持Split; 缺点:压缩/解压速度慢
  • Lzo压缩优点:压缩/解压速度比较快;支持Split;缺点:压缩率一般;想支持切片需要额外创建索引。
  • Snappy压缩优点:压缩和解压缩速度快; 缺点:不支持Split;压缩率一般;

在MapReduce中有三个位置需要用到数据压缩和解压

  1. 数据输入时,无须显示指定使用的编解码方式。Hadoop会自动检查文件扩展名,如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压缩和解压。这是需要考虑数据量的大小,如果小于数据块块,就是用压缩速度比较快的LZO/Snappy;否则使用支持切片的Bzip2和LZO
  2. 在Map和Reduce之间传输时,为了减少网络IO,使用压缩和解压缩快的LZO、Snappy。
  3. Output数据输出时,如果数据永久保存,考虑压缩率比较高的Bzip2和Gzip。

3.2 使用设置

Hadoop中提供的压缩算法对应的编码解码器如下

压缩格式对应的编码/解码器
DEFLATEorg.apache.hadoop.io.compress.DefaultCodec
gziporg.apache.hadoop.io.compress.GzipCodec
bzip2org.apache.hadoop.io.compress.BZip2Codec
LZOcom.hadoop.compression.lzo.LzopCodec
Snappyorg.apache.hadoop.io.compress.SnappyCodec

首先可以使用配置文件的方式指定压缩方式,如下所示,在mapred-site.xml文件中对mapper和Reducer输出的压缩方式进行设置

<property>
	<name>mapreduce.map.output.compress</name>
	<value>true</value>
	<description>开启mapper输出压缩</description>
</property>
<property>
	<name>mapreduce.map.output.compress.codec</name>
	<value>org.apache.hadoop.io.compress.GzipCodec</value>
	<description>指定mapper压缩方式</description>
</property>

<property>
	<name>mapreduce.output.fileoutputformat.compress</name>
	<value>true</value>
	<description>开启reducer输出压缩</description>
</property>
<property>
	<name>mapreduce.output.fileoutputformat.compress.codec</name>
	<value>org.apache.hadoop.io.compress.BZip2Codec</value>
	<description>指定reducer输出压缩方式</description>
</property>

或者在驱动类中通过代码的方式设置压缩

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;	
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();

		// 开启map端输出压缩
		conf.setBoolean("mapreduce.map.output.compress", true);
		// 设置map端输出压缩方式
		conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,CompressionCodec.class);

		// 设置reduce端输出压缩开启
		FileOutputFormat.setCompressOutput(job, true);
		// 设置压缩的方式
	  FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); 
	}
}
举报

相关推荐

0 条评论