Hadoop的OutputFormat类介绍
-
OutputFormat是一个用于描述MapReduce作业的输出格式和规范的抽象类,位于org.apache.Hadoop.mapreduce.OutputFormat<K, V>. Mapreduce框架依靠文件输出格式完成输出规范检查(如检查目录是否存在),并为文件输出格式提供作业结果数据输出的功能,即提供RecordWriter的实现,输出文件被存储在文件系统FileSystem中。
-
如果用户要基于Hadooop内置的输出格式和内置的RecordWriter进行定制,则需要重载OutputFormat类的getRecordWriter()方法以便获取新的RecordWriter; 如果完全基于抽象的输出格式类OutputFormat和抽象的RecordWriter类进行全新的程序定制,则需要实现OutputFormat中的getRecordWriter()等抽象方法。
-
此外,OutputFormat类还包括一个getOutputCommiter()方法来负责确保输出被正确提交,以及一个检查作业输出规范有效性的方法checkOutputSecs()。
-
FileOutputFormat类
①写入到HDFS的所有OutputFormat类都继承自FileOutputFormat类,直接已知的子类有MapFileOutputFormat, MultipleOutputFormat, SequenceFileOutputFormat 和 TextOutputFormat
② FileOutputFormat类提供了若干静态方法,用户可以用它们进行输入路径设置、分块大小设置等全局设置。
setOutputPath()方法设置Mapreduce任务输出目录的路径, getRecordWriter()方法获得当前给定任务的RecordWriter类型;setOutputName()方法设置要创建的输出文件的名称
-
TextOutputFormat类 是默认的输出格式,它把每条记录写成文本行。由于TextOutputFormat调用toString()方法把键和值转换为字符串,他的键和值可以是任意的类型。
-
SequenceFileOutputFormat类
SequenceFileOutputFormat将他的输出写为一个二进制顺序文件。由于他的格式紧凑,很容易被压缩,因此如果输出需要作为后续的MapReduce任务的输入,这便是一个很好的输出格式。
7、NullOutputFormat类
NullOutputFormat是继承自OutputFormat类的一个抽象类,位于org.apache.hadoop.mapreduce.lib.output.NullOutputFormat<K, V>,它会消耗掉所有输出,并把他们赋值为null
8、RecordWriter
①对于一个文件输出格式,都需要有一个对应的数据记录输出RecordWriter,以便系统明确输出结果写入到文件中的具体格式。RecordWriter是一个抽象类,位于org.apache.hadoop.mapreduce.RecordWriter<K, V>
②编写输出格式OutputFormat扩展类,其实主要就是实现RecordWriter,重点是构建一个类来实现这个接口或抽象类。
③close()方法负责关闭操作,而write方法则实现如何写key/value键值对。这两个抽象方法按如下:
public abstract void close(TaskAttempContext context)//关闭RecordWriter,继承自Closable接口
public abstract void write(K key, V value)//写一个key/value键值对
- 常用的内置RecordWriter类
TextOutputFormat 对应默认的RecordWriter LineRecordWriter 将结果数据以“key+\t+value” 的形式输出到文本文件中
DBOutputFormat 对应默认RecordWriter DBRecordWriter 将结果写入到一个数据表中
FilerOutputFormat 对应默认FilterRecordWriter 对应于过滤输出模式的数据记录模式,只将过滤后的结果输出到文件中
代码范例
import java.io.*;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class CustomOFPW<K,V> extends FileOutputFormat<K,V>{
protected static class LineRecordWriter<K,V> extends RecordWriter<K,V> {
private static final String encod = "UTF-8";
protected PrintWriter out;
public LineRecordWriter(DataOutputStream out) throws UnsupportedEncodingException {
this.out = new PrintWriter(new OutputStreamWriter(out, encod));
}
public synchronized void write(K key, V value) {
boolean nullKey = (key == null || key instanceof NullWritable);
boolean nullvalue = (value == null || value instanceof NullWritable);
if (nullKey && nullvalue) return;
if (!nullvalue) out.println(value.toString());
}
public synchronized void close(TaskAttemptContext context) {
out.close();
}
}
public RecordWriter<K,V> getRecordWriter (TaskAttemptContext job ) throws IOException{
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
CompressionCodec codec = null;
String extension="";
if (isCompressed){
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job,extension);
FileSystem fs = file.getFileSystem(conf);
if(!isCompressed){
FSDataOutputStream fileOut = fs.create(file,false);
return new LineRecordWriter<K,V> (fileOut);
}else {
FSDataOutputStream fileOut = fs.create(file,false);
return new LineRecordWriter<K,V> (new DataOutputStream(codec.createOutputStream(fileOut)));
}
}
}