目录
- hadoop数据压缩
- 概述
- MR支持的压缩编码
- 压缩方式选择
- 1.Gzip压缩
- 2.Bzip2压缩
- 3.Lzo压缩
- 4.Snappy压缩
- 5.压缩位置选择
- 6.压缩实战
- wordcount程序压缩
- mapper
- combine
- partition
- reduce
- 测试类
hadoop数据压缩
你好! 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Markdown编辑器, 可以仔细阅读这篇文章,了解一下Markdown的基本语法知识。
概述
压缩技术能够有效减少底层存储系统(HDFS)读写字节数。压缩提高了网络带宽和磁盘空间的效率。在Hadoop下,尤其是数据规模很大和工作负载密集的情况下,使用数据压缩显得非常重要。在这种情况下,I/O操作和网络数据传输要花大量的时间。还有,Shuffle与Merge过程同样也面临着巨大的I/O压力。
鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。不过,尽管压缩与解压操作的CPU开销不高,其性能的提升和资源的节省并非没有代价。
如果磁盘I/O和网络带宽影响了MapReduce作业性能,在任意MapReduce阶段启用压缩都可以改善端到端处理时间并减少I/O和网络流量。
MR支持的压缩编码
压缩格式 | hadoop自带? | 算法 | 文件扩展名 | 是否可切分 | 换成压缩格式后,原来的程序是否需要修改 |
DEFAULT | 是,直接使用 | DEFAULT | .deflate | 否 | 和文本处理一样,不需要修改 |
Gzip | 是,直接使用 | DEFAULT | .gz | 否 | 和文本处理一样,不需要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 |
LZO | 否(低版本),需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
Snappy | 否(低版本),需要安装 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示
压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
http://google.github.io/snappy/ On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.
压缩方式选择
1.Gzip压缩
优点:压缩率比较高,而且压缩/解压速度也比较快;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;大部分linux系统都自带gzip命令,使用方便。
缺点:不支持split。
应用场景:当每个文件压缩之后在140M以内的(1个块大小内),都可以考虑用gzip压缩格式。例如说一天或者一个小时的日志压缩成一个gzip文件,运行mapreduce程序的时候通过多个gzip文件达到并发。hive程序,streaming程序,和java写的mapreduce程序完全和文本处理一样,压缩之后原来的程序不需要做任何修改。
2.Bzip2压缩
优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native(java和c互操作的API接口);在linux系统下自带bzip2命令,使用方便。
缺点:压缩/解压速度慢;不支持native。
应用场景:适合对速度要求不高,但需要较高的压缩率的时候,可以作为mapreduce作业的输出格式;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序(即应用程序不需要修改)的情况。
3.Lzo压缩
优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;可以在linux系统下安装lzop命令,使用方便。
缺点:压缩率比gzip要低一些;hadoop本身不支持,需要安装;在应用中对lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定inputformat为lzo格式)。
应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越越明显。
4.Snappy压缩
优点:高速压缩速度和合理的压缩率。
缺点:不支持split;压缩率比gzip要低;hadoop本身不支持,需要安装;
应用场景:当Mapreduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个Mapreduce作业的输出和另外一个Mapreduce作业的输入。
5.压缩位置选择
要在Hadoop中启用压缩,可以配置如下参数:
参数 | 默认值 | 阶段 | 建议 |
io.compression.codecs (在core-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 |
mapreduce.map.output.compress(在mapred-site.xml中配置) | false | mapper输出 | 这个参数设为true启用压缩 |
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 使用LZO或snappy编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) | false | reducer输出 | 这个参数设为true启用压缩 |
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress. DefaultCodec | reducer输出 | 使用标准工具或者编解码器,如gzip和bzip2 |
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置) | RECORD | reducer输出 | SequenceFile输出使用的压缩类型:NONE和BLOCK |
6.压缩实战
public class CompressDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException {
// compress("F:\\input\\phone.txt","org.apache.hadoop.io.compress.GzipCodec");
// decompression("F:\\input\\phone.txt.gz","txt");
}
/**
* 压缩
* @param filename 文件路径+文件名
* @param method 解码器
*
* */
public static void compress(String filename,String method) throws IOException, ClassNotFoundException {
//创建输入流
FileInputStream fis = new FileInputStream(new File(filename));
//通过反射找到解码器
Class codeClass = Class.forName(method);
//通过反射工具列找到解码器的对象,需要配置Hadoop的conf
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codeClass,new Configuration());
//创建输出流
FileOutputStream fos= new FileOutputStream(new File(filename+codec.getDefaultExtension()));
//获取解码器的输出对象
CompressionOutputStream cos = codec.createOutputStream(fos);
//对接流
IOUtils.copyBytes(fis,cos,1024*1024*5,true);
}
//解压缩
public static void decompression(String filename,String decoded) throws IOException {
//获取factory实例
CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
CompressionCodec codec = factory.getCodec(new Path(filename));
//配置解压缩的输入
CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
//输出流
FileOutputStream fos = new FileOutputStream(new File(filename+"."+decoded));
//流拷贝
IOUtils.copyBytes(cis,fos,1024*1024*5,true);
}
}
1:原文件
2:压缩后的文件
3:解压后的文件
wordcount程序压缩
mapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
//坑,自动导包Text很容易导成java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
/**
* <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* <LongWritable, Text, Text, IntWritable>
* apache 1
*快捷键:查看父类的方法 ctrl+o
* 补全 Ctrl+Alt+v
* */
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value,
Context context) throws IOException, InterruptedException {
//context.write(key,new IntWritable(1));
System.out.println(key.toString());
//拿到数据,进行数据转换Text=》String
String line = value.toString();
//按照空格切分
String[] split = line.split(",");
//输出数据 KEYOUT, VALUEOUT
for (String s:split) {
//数据转换String=》Text int=》IntWritable
context.write(new Text(s),new IntWritable(1));
}
// Arrays.asList(split).forEach((s) -> {
// try {
// context.write(new Text(s),new IntWritable(1));
// } catch (IOException e) {
// e.printStackTrace();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// });
}
}
combine
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountCombine extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//初始化一个计数器
int count=0;
//开始计数
for (IntWritable value:values) {
count=count+value.get();
}
//输出 int=》IntWritable
context.write(key,new IntWritable(count));
}
}
partition
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WordCountPartition extends Partitioner <Text, IntWritable>{
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
//拿到数据
String t = text.toString();
//得到每个单词的长度
int length = t.length();
if(length%2==0){
return 0;
}else {
return 1;
}
}
}
reduce
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*<Text,IntWritable,Text,IntWritable>
*
* */
public class WordCountReduce extends Reducer <Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//初始化一个计数器
int count=0;
//开始计数
for (IntWritable value:values) {
count=count+value.get();
// System.out.println("aaaaa");
}
// System.out.println("===============分隔符====================");
//输出 int=》IntWritable
context.write(key,new IntWritable(count));
}
}
测试类
public class WordCountMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
long starttime = System.currentTimeMillis();
args=new String[]{"F:\\input\\wordcount.txt","F:\\output\\wordcountcompress"};
//获取配置文件
Configuration conf = new Configuration();
// conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
// conf.set(NLineInputFormat.LINES_PER_MAP,"2");
//开启map端的输出压缩
conf.setBoolean("mapreduce.map.output.compress", true);
//设置压缩方式
//conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.c lass, CompressionCodec.class);
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
//创建job任务
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountMain.class);
//指定Map类和map的输出类型 Text, IntWritable
job.setMapperClass(WordCountMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定Reducer类和reduce的输出数据类型 Text,IntWritable
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置分区
// job.setPartitionerClass(WordCountPartition.class);
// job.setNumReduceTasks(2);
//设置预合并combine
// job.setCombinerClass(WordCountCombine.class);
//如果不设置InputFormat,它默认用的是TextInputFormat.class
// job.setInputFormatClass(CombineTextInputFormat.class);
// CombineTextInputFormat.setMaxInputSplitSize(job, 3*1024*1024);// 4m
// CombineTextInputFormat.setMinInputSplitSize(job, 2*1024*1024);// 2m
// job.setInputFormatClass(KeyValueTextInputFormat.class);
// job.setInputFormatClass(NLineInputFormat.class);
//指定数据输入的路径和输出的路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//设置reduce压缩
FileOutputFormat.setCompressOutput(job,true);
//设置压缩格式
FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);
//提交任务
job.waitForCompletion(true);
long endtime = System.currentTimeMillis();
System.out.println((endtime-starttime)/1000);
}
}
运行结果: