文章目录
- ☠ WritableComparable排序案例(全排序)
- ▪ 案例
- 需求分析
- 代码实现
- 思路一:封装Bean类,自定义排序
- Bean类
- Mapper阶段
- Reducer阶段
- Driver阶段
- 思路二:不使用Bean类,直接按照默认的key排
- Mapper阶段
- Reducer阶段
- Driver阶段
- ♦ 注意
☠ WritableComparable排序案例(全排序)
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个 Reducetask,由于默认ReduceTask数是1,也可以不设置。
但该方法在处理大型文件时效率极低,因为一合机器处理所有文件,完全丧失了MapReduce的并行架构。
▪ 案例
对之前统计的流量总值进行排序
返回顶部
需求分析
返回顶部
代码实现
思路一:封装Bean类,自定义排序
Bean类
- 封装上下行流量以及总流量到Bean中,方便后续将Bean作为key,按照Bean重写的compareTo()方法进行排序。
在这里插入代码片package 第三章_MR框架原理.排序;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FB implements WritableComparable<FB>{
// 变量
private long upFlow;
private long downFlow;
private long sumFlow;
/**
* 空参构造
*/
public FB() {}
/**
* 含参构造
* @param upFlow
* @param downFlow
*/
public FB(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
sumFlow = upFlow+ downFlow;
}
/**
* 重写compareTo方法
* @param o
* @return
*/
@Override
public int compareTo(FB bean) {
int result;
// 核心比较
if (sumFlow > bean.getSumFlow()){
result = -1;
}else if(sumFlow < bean.getSumFlow()){
result = 1;
}else {
result = 0;
}
return result;
}
/**
* 序列化
* @param dataOutput
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
/**
* 反序列化
* @param dataInput
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}
/**
* 重写toString方法
* @return
*/
@Override
public String toString() {
return upFlow+"\t"+downFlow+"\t"+sumFlow;
}
/**
* 求和方法
*/
public void set(long upFlow1,long downFlow1){
upFlow = upFlow1;
downFlow = downFlow1;
sumFlow = upFlow + downFlow;
}
/**
* set、get方法
* @return
*/
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
}
与序列化统计总流量案例相比较,排序的本案例是在其结果集基础上进行的。所以输入文件就是上次的统计结果。本案例中要求对总流量进行排序,所以依然采用封装序列化并且实现compareTo方法的时候针对封装的对象中的总流量进行比较,实现排序功能
。
/**
* 重写compareTo方法
* @param o
* @return
*/
@Override
public int compareTo(FB bean) {
int result;
// 核心比较
if (sumFlow > bean.getSumFlow()){
result = -1;
}else if(sumFlow < bean.getSumFlow()){
result = 1;
}else {
result = 0;
}
return result;
}
返回顶部
Mapper阶段
这里会有所改变。因为 MapTask 和 ReduceTask 均会对数据按照 key 进行排序,而我们要排的是总流量,在Bean对象中,所以这里要将Bean对象设为key,手机号码设为value
。之前将手机号设为key是因为同一个手机号的流量记录可能有多条,按照手机号统计避免重复未合并完全。所以在写泛型的时候,输出部分应该是FB,Text
。
package 第三章_MR框架原理.排序;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FB_M extends Mapper <LongWritable, Text,FB,Text> {
FB k = new FB();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.读取一整行数据
// 手机号 上行量 下行量 总流量
// 13509468723 7335 110349 117684
String line = value.toString();
// 2.拆分
String[] fields = line.split("\t");
// 3.封装对象
String phone = fields[0];
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
long sumFlow = Long.parseLong(fields[3]);
k.setUpFlow(upFlow);
k.setDownFlow(downFlow);
k.setSumFlow(sumFlow);
v.set(phone);
// 3.写入
// k -> Bean ; v -> Text
context.write(k,v);
}
}
返回顶部
Reducer阶段
Reducer阶段并没有具体的操作,排序阶段已经在读取数据的时候(shuffle过程)利用compareTo方法完成了。但是这里为了最后的输出形式符合正常需求,在最后输出的时候手机号在前,流量信息在后
。
package 第三章_MR框架原理.排序;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FB_R extends Reducer<FB, Text,Text,FB> {
@Override
protected void reduce(FB key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value:values){
// 写入
// values是输入的不同手机号,key是对应的Bean对象,存储了流量信息
context.write(value,key);
}
}
}
返回顶部
Driver阶段
文件输入的路径就是之前流量统计值后的输出文件路径。
package 第三章_MR框架原理.排序;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FB_D {
public static void main(String[] args) {
Job job = null;
Configuration conf = new Configuration();
try {
// 获取job对象
job = Job.getInstance(conf);
// 配置
job.setMapperClass(FB_M.class);
job.setReducerClass(FB_R.class);
job.setJarByClass(FB_D.class);
job.setMapOutputKeyClass(FB.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FB.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第二章_Hadoop序列化\\output"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\排序\\全排output"));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
} catch (Exception e ){
e.printStackTrace();
}
}
}
返回顶部
思路二:不使用Bean类,直接按照默认的key排
Mapper阶段
- 在Mapper类中对数据进行拆分,取出
总流量作为key
,其他信息作为value
,在进入shuffle阶段的时候会对key进行排序 - 还有就是泛型要进行更改,这里将总流量作为key,并且是要按照其大小进行排序的,所以不能为Text类型了,要将其改为IntWritable或是其他数字类型。
package 第七章_MR扩展案例.排序;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortMapper extends Mapper<LongWritable, Text, IntWritable,Text> {
IntWritable k = new IntWritable();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.读取一行数据
// 13470253144 180 180 360
String line = value.toString();
// 2.拆分
String[] feilds = line.split("\t");
int sumFlow = Integer.parseInt(feilds[3]);
String info = feilds[0]+"\t"+feilds[1]+"\t"+feilds[2];
k.set(-sumFlow);
v.set(info);
// 3.写出
context.write(k,v);
}
}
返回顶部
Reducer阶段
- Reducer阶段接收到的数据是已经按照默认的排序方式排好了的。
package 第七章_MR扩展案例.排序;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortReducer extends Reducer<IntWritable,Text,IntWritable,Text> {
IntWritable k = new IntWritable();
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 1.读取数据写出
for (Text value:values){
k.set(-key.get());
context.write(k,value);
}
}
}
返回顶部
Driver阶段
package 第七章_MR扩展案例.排序;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortDriver {
public static void main(String[] args) {
Job job;
Configuration conf = new Configuration();
try{
// 1. 获取job
job = Job.getInstance(conf);
// 2. 配置
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setJarByClass(SortDriver.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// 3.输入输出数据路径
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第二章_Hadoop序列化\\output\\part-r-00000"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第七章_MR扩展案例\\排序\\sortoutput"));
// 4.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0:1);
} catch (Exception e){
e.printStackTrace();
}
}
}
返回顶部
♦ 注意
在实现第二种方法的时候,暂不考虑业务要求的输出样式。通过多次检测,其实系统默认的按照key进行排序的方式是升序
,但是案例要求是降序排列。所以在Mapper阶段最终放到key中的总流量取相反数,一个数越大其相反数越小嘛。所以中间shuffle过程排好序归并分组后的数据集第一列总流量全为负数,但是我们最终还是要将其还原,所以在写出的时候再次输出总流量的相反数,负负得正。
返回顶部