0
点赞
收藏
分享

微信扫一扫

InputFormat&OutputFormat


本文的主要目的是从源码级别讲解Hadoop中InputFormat和OutputFormat部分,首先简介InputFormat和OutputFormat,然后介绍两个重要的组件,RecordWriter和RecordReader,再以FileInputFormat和FileOutputFormat为例,介绍一组InputFormat和OutputFormat的实现细节,最后以SqoopInputFormat和SqoopOutputFormat为例,体会一下InputFormat和OutputFormat的灵活应用。

InputFormat和OutputFormat简介

InputFormat规定了MR框架,如何解析输入文件,比如说TextOutputFormat内部实现了一个LineRecordReader,从LineRecordReader#nextKeyValue方法可以看出,它的key是将要读的这一行数据的起始字节位置,value是这一行的内容。

OutputFormat规定了MR框架,最后输出的文件的内容格式,比如说SequenceFileOutputFormat,其getRecordWriter方法返回一个RecordWriter的内部类,具体的操作在SequenceFile类里write相关的方法中。SequenceFile的具体实现只从类来看比价复杂,其一条记录可以简单理解成如下的格式[record length][key length][key][value]。

RecordReader和RecordWriter简介

上边提到了RecordReader和RecordWriter,这里简单介绍一下,RecordWriter和RecordReader在MR框架每一个输入输出的地方用到,读写操作调用的都是RecordReader和RecordWriter提供的接口,比如说我们在Mapper#map和Reducer#reduce的结束通常都会写这样一行代码context.write(key,value),这行代码实际调用的就是RecordWriter#write方法,RecordReader是MR框架调用的。

一个InputFormat对应一个RecordReader,一个OutputFormat对应一个RecordWriter。这样在后边的DIY的时候,也知道应该写些什么了。

FileInputFormat和FileOutputFormat

这里以FileInputFormat,FileOutputFormat和其对应的子类TextInputFormat和TextOutputFormat为例,分析一套InputFormat和OutputFormat的具体实现。

1.      InputFormat

InputFormat抽象类中有两个方法:

InputFormat#getSplits(JobContext context)

对输入文件进行逻辑上的分片

InputFormat#createRecordReader(InputSplitsplit,TaskAttemptContext context)

返回一个RecordReader对象,该对象可将输入的InputSplit解析成若干个key/value对。

2.      FileInputFormat

所有需要处理文件的InputFormat的基类,从其实现可以看出,FileInputFormat主要是实现InputFormat#getSplit(JobContext context)方法,由于每个子类对应不同的输入格式,所以解析InputSplit的方法InputFormat#createRecordReader(InputSplit split,TaskAttemptContextcontext)由各个子类自己实现。

这里我们先分析FileInputFormat#getSplit(JobContext context)方法。

/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throwsIOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

// generatesplits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);//获取输入的所有文件
for (FileStatus file: files) {//一次循环处理一个文件
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {//文件不是空的
FileSystem fs =path.getFileSystem(job.getConfiguration());
BlockLocation[] blkLocations =fs.getFileBlockLocations(file, 0, length);//获取一个文件的所有的Block的带位置的信息
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize =computeSplitSize(blockSize, minSize, maxSize);//计算一个FileSplit的大小,计算过程如下,Math.max(minSize,Math.min(maxSize, blockSize))

long bytesRemaining =length;
//这个while循环就是根据上边的准备的信息,不停的读文件,产生FileSplit,一直到文件末尾。
while (((double)bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex =getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path,length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
//处理最后不足splitSize大小的数据
if (bytesRemaining != 0) {
int blkIndex =getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path,length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts()));
}
} else { // not splitable不能分割的话,文件只有一块blkLocations[0]就是这一块
splits.add(makeSplit(path, 0, length,blkLocations[0].getHosts()));
}
} else {
//Create empty hosts array for zerolength files
splits.add(makeSplit(path, 0, length,new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
LOG.debug("Total # of splits: " + splits.size());
return splits;
}

3.      TextInputFormat

TextInputFormat直接使用了FileInputFormat的getSplits方法,自己实现了createRecordReader方法。

TextInputFormat#createRecordReader方法逻辑比较简单,最后返回一个LineRecordReader对象,下面主要看这个方法LineRecordReader#nextKeyValue。

public boolean nextKeyValue() throws IOException {
if (key == null) {
key = newLongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read oneextra line, which lies outside the upper
// split limit i.e. (end -1)
while (getFilePosition() <= end) {
//这一句就是读数据的地方了
newSize =in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize <maxLineLength) {
break;
}

// line too long. tryagain
LOG.info("Skippedline of size " + newSize + " at pos " +
(pos -newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}

4.      OutputFormat

OutputFormat# getRecordWriter (TaskAttemptContext context)

返回一个RecordWriter对象

OutputFormat# checkOutputSpecs (JobContext context)

判断输出文件存不存在,写过MR程序的人应该很熟悉了,如果输出路径已经存在话,会抛出一个Output directory " + outDir + " already exists"错误,就是这个方法抛出的。

OutputFormat# getOutputCommitter (TaskAttemptContext context)

MR运行过程中,会产生很多中间文件,比如Mapper的输出,推测式执行Task时产生的文件等等,这个方法负责在任务执行完成后,处理这些中间文件。顺便说下,OutputCommitter对象里的方法都是回调方法,MR自动调用。

5.      FileOutputFormat

FileOutputFormat对上边提到的3个方法中的后两个提供了通用的实现,OutputFormat# getRecordWriter (TaskAttemptContext context)方法需要子类自己实现。

public void checkOutputSpecs(JobContext job
) throws FileAlreadyExistsException, IOException{
// Ensure that the outputdirectory is set and not already there
Path outDir =getOutputPath(job);
if (outDir == null) {
throw newInvalidJobConfException("Output directory not set.");
}

// get delegation tokenfor outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir },job.getConfiguration());

if(outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw newFileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}

public synchronized
OutputCommittergetOutputCommitter(TaskAttemptContext context
)throws IOException {
if (committer == null) {
Path output =getOutputPath(context);
committer = newFileOutputCommitter(output, context);
}
return committer;
}

6.      TextOutputFormat

TextOutputFormat直接使用了FileInputFormat的getSplits方法,自己实现了createRecordReader方法。

TextOutputFormat#createRecordWriter方法逻辑比较简单,最后返回一个LineRecordWriter对象,下面主要看这个方法LineRecordReader#write(K key,V Value)。

public synchronized void write(K key, V value)
throws IOException {

boolean nullKey = key ==null || key instanceof NullWritable;
boolean nullValue =value == null || value instanceof NullWritable;
if (nullKey &&nullValue) {
return;
}
//这个地方是最主要的逻辑
if (!nullKey) {
writeObject(key);//写出key
}
if (!(nullKey ||nullValue)) {
out.write(keyValueSeparator);//写出key/value的分隔符
}
if (!nullValue) {
writeObject(value);//写出value
}
out.write(newline);//写出行结束符
}

SqoopInputFormat和SqoopOutputFormat

下面的分析基于Sqoop1.99.3

在Sqoop中,InputFormat和OutputFormat的子类有3个,分别是:

public class SqoopInputFormat extendsInputFormat<SqoopSplit, NullWritable>
public class SqoopFileOutputFormat extendsFileOutputFormat<Data, NullWritable>
public class SqoopNullOutputFormat extendsOutputFormat<Data, NullWritable>

下面一个个分析:

1.      SqoopInputFormat

从泛型传入的类型可以看出,SqoopInputFormat的key是SqoopSplit,value是NullWritable。直接把整个InputSplit作为key,传到Mapper中,SqoopInputFormat不对InputSplit做任何的解析操作。

2.      SqoopFileOutputFormat

从泛型传入的类型来看,跟SqoopInputFormat类似,只是用KEY部分保存信息。SqoopFileOutputFormat只重写了父类FileOutputFormat的getRecordWriter方法和getOutputCommitter方法,checkOutputSpec方法使用的父类的。

getRecordWriter方法调用SqoopOutputFormatLoadExecutor#getRecordWriter,这个方法在返回一个SqoopRecordWriter的同时,开启一个消费者线程,SqoopRecordWriter是生产者线程。[后面接着分析Sqoop源码时细说]

3.      SqoopNullOutputFormat

SqoopNullOutputFormat将OutputFormat的3个方法都重写了。SqoopNullOutputFormat#getRecordWriter方法同样是调用SqoopOutputFormatLoadExecutor#getRecordWriter。

举报

相关推荐

0 条评论