0
点赞
收藏
分享

微信扫一扫

MapReduce之自定义inputFormat合并小文件


        无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。

        小文件的优化无非以下几种方式:

  1. 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
  2. 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
  3. 在mapreduce处理时,可采用combineInputFormat提高效率

        本篇博客小菌将大家实现的是上述第二种方式!

        先让我们确定程序的核心机制:


自定义一个InputFormat 改写RecordReader,实现一次读取一个完整文件封装为KV
在输出时使用SequenceFileOutPutFormat输出合并文件


        具体的代码如下:



自定义InputFromat

public class Custom_FileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {


/*
直接返回文件不可切割,保证一个文件是一个完整的一行
*/

@Override
protected boolean isSplitable(JobContext context, Path filename) {

return false;

}


@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

Custom_RecordReader custom_recordReader = new Custom_RecordReader();


custom_recordReader.initialize(split,context);


return custom_recordReader;
}
}

自定义RecordReader

/**
*
* RecordReader的核心工作逻辑:
* 通过nextKeyValue()方法去读取数据构造将返回的key value
* 通过getCurrentKey 和 getCurrentValue来返回上面构造好的key和value
*
*
* @author
*
*/
public class Custom_RecordReader extends RecordReader<NullWritable, BytesWritable> {

private FileSplit fileSplit;

private Configuration conf;

private BytesWritable bytesWritable = new BytesWritable();

private boolean pressced = false;


//初始化

/**
*
* @param split 封装的文件的对象内容
* @param context 上下文对象
* @throws IOException
* @throws InterruptedException
*/

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

this.fileSplit = (FileSplit) split;
this.conf=context.getConfiguration();

}

//读取下一个文件
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {


if (!pressced){

//获取文件路径
Path path = fileSplit.getPath();

//获取FileSystem对象
FileSystem fileSystem = null;
FSDataInputStream inputStream = null;

try {
fileSystem = FileSystem.get(conf);

//读取文件
inputStream = fileSystem.open(path);

//初始化一个字节数据,大小为文件的长度
byte[] bytes = new byte[(int)fileSplit.getLength()];

//把数据流转换成字节数组
IOUtils.readFully(inputStream,bytes,0,bytes.length);

//把 字节数组 转换成 BytesWritable 对象
bytesWritable.set(bytes,0,bytes.length);


} catch (Exception e) {
e.printStackTrace();

}finally {

fileSystem.close();

if (null != inputStream ){
inputStream.close();
}
}

pressced = true;
return true;

}else{

return false;
}

}

//获取当前的key值
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();

}

//获取当前的value值
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return bytesWritable;
}

//获取当前的进程
@Override
public float getProgress() throws IOException, InterruptedException {
return pressced?0:1;
}

//关流
@Override
public void close() throws IOException {


}
}

map处理

public class Custom_Mapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {


@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {

FileSplit fileSplit = (FileSplit)context.getInputSplit();

String name = fileSplit.getPath().getName();

context.write(new Text(name),value);



}
}

定义mapreduce处理流程

public class Customer_Driver {

public static void main(String[] args) throws Exception {


//1.实例化job对象
Job job = Job.getInstance(new Configuration(), "Customer_Driver");
job.setJarByClass(Customer_Driver.class);

//2.设置输入
job.setInputFormatClass(Custom_FileInputFormat.class);
Custom_FileInputFormat.addInputPath(job,new Path("E:\\2019大数据课程\\DeBug\\测试\\order\\素材\\5\\自定义inputformat_小文件合并\\input"));

//3.设置Map
job.setMapperClass(Custom_Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);

//4.设置Reduce()
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);

//5.设置输出
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job,new Path("E:\\2019大数据课程\\DeBug\\测试\\order\\素材\\5\\自定义inputformat_小文件合并\\output4"));

boolean b = job.waitForCompletion(true);
System.out.println(b?0:1);

}
}

        到这里我们的程序就算完成了。在运行之前,我们先打开我们程序读取的目录,可以看到在input目录下有两个文件。

MapReduce之自定义inputFormat合并小文件_自定义inputFormat

        然后运行程序。

MapReduce之自定义inputFormat合并小文件_Hadoop_02

        伴随着成功运行,我们可以再进入到程序输出目录,查看情况。

MapReduce之自定义inputFormat合并小文件_MapReduce_03

        可以发现该文件已经把多个文件的内容合并在了一起,部分内容出现乱码属于正常现象。这是由于该过程属于序列化后的结果,如果我们想要看到文件最初的内容需要后续做反序列化处理!


        那么本次分享的内容就到这里了,下期小菌将为大家带来MapReduce之自定义outputFormat的内容,敬请期待!!!


举报

相关推荐

0 条评论