0
点赞
收藏
分享

微信扫一扫

MapReduce之RecordReader组件源码解析及实例


简述

无论我们以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类;

系统默认的RecordReader是​​LineRecordReader​​,​​TextInputFormat​​;

LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value;

而SequenceFileInputFormat的RecordReader是SequenceFileRecordReader;

应用场景:自定义读取每一条记录的方式;自定义读入key的类型,如希望读取的key是文件的路径或名字而不是该行在文件中的偏移量。

TextInputFormat源码如下:

package org.apache.hadoop.mapreduce.lib.input;
/** An {@link InputFormat} for plain text files. Files are broken into lines.
* Either linefeed or carriage-return are used to signal end of line. Keys are
* the position in the file, and values are the line of text.. */
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}

@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
}

​textinputformat.record.delimiter​​指的是读取一行的数据的终止符号,即遇到​​textinputformat.record.delimiter​​所包含的字符时,该一行的读取结束。

可以通过Configuration的set()方法来设置自定义的终止符,如果没有设置 ​​textinputformat.record.delimiter​​,那么Hadoop就采用以CR,LF或者CRLF作为终止符,这一点可以查看LineReader的readDefaultLine方法 。

LineRecordReader源码如下:

package org.apache.hadoop.mapreduce.lib.input;

/**
* Treats keys as offset in file and value as line.
*/
public class LineRecordReader extends RecordReader<LongWritable, Text> {
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
......
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();

// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
......
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
......
}
......
}

自定义RecordReader

1、继承抽象类RecordReader,实现RecordReader的一个实例;

2、实现自定义InputFormat类,重写InputFormat中​​createRecordReader()​​方法,返回值是自定义的RecordReader实例;

3、配置job.setInputFormatClass()设置自定义的InputFormat实例;

实例

数据:


10
20
30
40
50
60
70
……


要求:读取整个文件,分别计算奇数行与偶数行数据之和

奇数行之和:10+30+50+70=160

偶数行之和:20+40+60=120

package Recordreader;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.LineReader;

public class MyRecordReader {

private final static String INPUT_PATH = "hdfs://liguodong:8020/inputsum";
private final static String OUTPUT_PATH = "hdfs://liguodong:8020/outputsum";

public static class DefRecordReader extends RecordReader<LongWritable, Text>{

private long start;//分片开始位置
private long end;//分片结束位置
private long pos;
private FSDataInputStream fin = null;
//自定义自己的key与value
private LongWritable key = null;
private Text value = null;
//A class that provides a line reader from an input stream.
private LineReader reader = null;

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)split;
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path path = fileSplit.getPath();//获取输入分片的路径
Configuration conf = context.getConfiguration();
//Return the FileSystem that owns this Path.
FileSystem fs = path.getFileSystem(conf);
fin = fs.open(path);
reader = new LineReader(fin);
pos = 1;
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

if(key == null){
key = new LongWritable();
}
key.set(pos);//设置key
if(value == null){
value = new Text();
}
//并没有跨块,跨文件,而是一个文件作为不可分割的
if(reader.readLine(value)==0){//一次读取行的内容,并设置值
return false;
}
pos++;
return true;
}

@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}

/**
* Get the progress within the split
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}

@Override
public void close() throws IOException {
fin.close();
}

}

public static class MyFileInputFormat extends FileInputFormat<LongWritable, Text>{

@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {

return new DefRecordReader();
}

@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}


public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

context.write(key, value);
}
}


public static class DefPartitioner extends Partitioner<LongWritable,Text>{

@Override
public int getPartition(LongWritable key, Text value, int numPartitions) {
//判断奇数行还是偶数行,确定分区
if(key.get()%2==0){
key.set(1);//偶数行key通通改为1
return 1;
}else {
key.set(0);//奇数行key通通改为0
return 0;
}
}

}

//接收来自不同分区的数据
public static class MyReducer extends Reducer<LongWritable, Text,Text, IntWritable>{
Text write_key = new Text();
IntWritable write_value = new IntWritable();

@Override
protected void reduce(LongWritable key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
int sum=0;
for (Text val : values) {
sum += Integer.parseInt(val.toString());
}
if(key.get()==0){
write_key.set("奇数行之和");
}else {
write_key.set("偶数行之和");
}
write_value.set(sum);
context.write(write_key, write_value);
}
}




public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
//1、配置
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);

if(fileSystem.exists(new Path(OUTPUT_PATH)))
{
fileSystem.delete(new Path(OUTPUT_PATH),true);
}
Job job = Job.getInstance(conf, "Define RecordReader");

//2、打包运行必须执行的方法
job.setJarByClass(MyRecordReader.class);

//3、输入路径
FileInputFormat.addInputPath(job, new Path(INPUT_PATH));

//设置输入格式
job.setInputFormatClass(MyFileInputFormat.class);

//4、Map
job.setMapperClass(MyMapper.class);
//指定map的输出的<k,v>类型,如果<k3,v3>的类型与<k2,v2>的类型一致,那么可以省略。
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);

//5、Combiner
//job.setCombinerClass(MyReducer.class);
job.setPartitionerClass(DefPartitioner.class);

//6、Reducer
job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(2);//reduce个数默认是1

//如果<k3,v3>的类型与<k2,v2>的类型不一致,要么都省略,要么都要写。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//7、 输出路径
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

//8、提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
[root@liguodong file]# vi inputsum
[root@liguodong file]# hdfs dfs -put inputsum /inputsum
[root@liguodong file]# hdfs dfs -cat /inputsum
10
20
30
40
50
60
70
[root@liguodong file]# yarn jar MyRecordReader.jar
[root@liguodong file]# hdfs dfs -ls /outputsum
Found 3 items
-rw-r--r-- 1 root supergroup 0 2015-06-14 21:19 /outputsum/_SUCCESS
-rw-r--r-- 1 root supergroup 20 2015-06-14 21:19 /outputsum/part-r-00000
-rw-r--r-- 1 root supergroup 20 2015-06-14 21:19 /outputsum/part-r-00001
[root@liguodong file]# hdfs dfs -cat /outputsum/part-r-00000
奇数行之和 160
[root@liguodong file]# hdfs dfs -cat /outputsum/part-r-00001
偶数行之和 120



举报

相关推荐

0 条评论