文章目录
- ☠ WritableComparator排序案例(辅助排序---GroupingComparator、二次排序)
- ▪ 案例
- 需求分析
- 代码实现
- • 思路一:利用二次排序、辅助排序< GroupingComparator >
- ① Bean类
- ② Mapper阶段
- ③ Reducer阶段
- ④ OrderSortGroupingComparator辅助排序
- ♦ 源码解析辅助排序类中的构造方法作用
- ♦ 流程分析
- ⑤ Driver阶段
☠ WritableComparator排序案例(辅助排序—GroupingComparator、二次排序)
GroupingComparator分组(辅助排序)
- 对Reduce阶段的数据根据某一个或几个字段进行分组。
- 时期:
在进入reduce()方法之前
- 分组排序步骤:
- (1)自定义类继承WritableComparator
- (2)重写compare()方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比较的业务逻辑
return result;
}
▪ (3)创建一个构造将比较对象的类传给父类
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
▪ 案例
现在需要求出每一个订单中最贵的商品。
(1)输入数据
(2)期望输出数据
1 222.8
2 722.4
3 232.8
需求分析
(1)利用“订单id和成交金额
”作为key
,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序
,发送到Reduce。
(2)在Reduce端利用groupingComparator将订单id相同的kv聚合成组
,然后取第一个
即是该订单中最贵商品。
返回顶部
代码实现
• 思路一:利用二次排序、辅助排序< GroupingComparator >
① Bean类
- 构建OB对象,实现序列化、反序列化
- 重写compareTo()方法,自定义排序,本质是
二次排序
,因为先按照id排了一次
,又按照价格排了一次
。
package 第三章_MR框架原理.排序;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OB implements WritableComparable<OB> {
// 变量声明,只封装需要的信息
private int order_id;
private double price;
// 空参构造
public OB() {
}
// 重写compareTo --- 二次排序
@Override
public int compareTo(OB bean) {
int result;
// 先按照订单id升序排序
if (order_id > bean.getOrder_id()){
result = 1; // 升序1
} else if (order_id < bean.getOrder_id()){
result = -1;
} else {
// 若相同,再按照价格降序排序
if (price>bean.getPrice()){
result = -1;
}else if (price<bean.getPrice()){
result = 1;
}else {
result = 0;
}
}
return result;
}
// 序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(order_id);
dataOutput.writeDouble(price);
}
// 反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
order_id = dataInput.readInt();
price = dataInput.readDouble();
}
// 重写toString
@Override
public String toString() {
return order_id + "\t" + price;
}
// set、get
public int getOrder_id() {
return order_id;
}
public void setOrder_id(int order_id) {
this.order_id = order_id;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
返回顶部
② Mapper阶段
- 将order_id和price都封装到OB对象中,输出
package 第三章_MR框架原理.排序;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class OB_M extends Mapper<LongWritable, Text,OB, NullWritable> {
OB k = new OB();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 0000001 Pdt_01 222.8
// 1.读取一行数据
String line = value.toString();
// 2.拆分
String[] values = line.split(" ");
// 3.封装bean对象
int order_id = Integer.parseInt(values[0]);
double price = Double.parseDouble(values[2]);
k.setOrder_id(order_id);
k.setPrice(price);
// 4.写出
context.write(k,NullWritable.get());
}
}
返回顶部
③ Reducer阶段
- 从Mapper阶段传输的数据是已经按照id和价格排好序的,这里不需要进一步操作,只需要输出即可。
package 第三章_MR框架原理.排序;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class OB_R extends Reducer<OB, NullWritable,OB,NullWritable> {
@Override
protected void reduce(OB key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
返回顶部
④ OrderSortGroupingComparator辅助排序
- 在没有添加辅助排序类之前,直接利用Driver运行,结果就是Mapper输出的。因为Reducer端在输出数据的时候会自动进行一次分组,将key相同的放在同一组中。由于在Mapper阶段中,定义封装的Bean对象(作为之后传输的key)是全局变量,所以内存地址不会改变,按照相同的key处理就进入到了同一个reduce()方法中,最后全部输出。不能够输出每个相同商品(id)里的最贵的商品信息。
- 那么自然而然就会想到,将定义封装的Bean对象(作为之后传输的key)定义在map()里面,这样一来不就会不把所有的当成相同key处理了吗?但是,若是相同id的商品怎么办,定义为了局部变量后,Mapper每读一行就会创建一个Bean对象封装,并且每一次传入reduce()就会当做不同的key处理,这样也会使得最后输出的结果是全集。
public class OB_M extends Mapper<LongWritable, Text,OB, NullWritable> {
OB k = new OB(); // 全局变量
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
.....
// 4.写出
context.write(k,NullWritable.get());
}
}
- 我们使用Comparator辅助排序的目的就是为了,在Mapper到Reducer阶段之间的默认自动按照key排序过程后,在进行一次排序。按照本案例的需求就是我们要对key(Bean对象)中的id属性进行约束,只要id相同就认为是相同的key,那么这样一来,id为1的就会进入同一个reduce()方法进行处理。
package 第三章_MR框架原理.排序;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class OrderSortGroupingComparator extends WritableComparator {
// 构造方法
public OrderSortGroupingComparator() {
// 参数:当前比较的对象类,true
super(OB.class,true);
}
// 重写compare
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 要求只要id相同,就认为是相同的key
int result;
OB aBean = (OB) a;
OB bBean = (OB) b;
if (aBean.getOrder_id() > bBean.getOrder_id()){
result = 1;
}else if (aBean.getOrder_id() < bBean.getOrder_id()){
result = -1;
}else {
result = 0;
}
return result;
}
}
♦ 源码解析辅助排序类中的构造方法作用
- 若构造方法不写,或者参数为false,最终程序运行的时候会报空指针异常。
♦ 流程分析
返回顶部
⑤ Driver阶段
- 设置关联分组比较类
job.setGroupingComparatorClass(OrderSortGroupingComparator.class);
package 第三章_MR框架原理.排序;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OB_D {
public static void main(String[] args) {
Job job = null;
Configuration conf = new Configuration();
try {
// 1.获取job对象
job = Job.getInstance(conf);
// 2.配置
job.setMapperClass(OB_M.class);
job.setReducerClass(OB_R.class);
job.setJarByClass(OB_D.class);
job.setMapOutputKeyClass(OB.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OB.class);
job.setOutputValueClass(NullWritable.class);
// 3.设置分组比较
job.setGroupingComparatorClass(OrderSortGroupingComparator.class);
// 4.设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\排序\\GroupingComparator.txt"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\排序\\辅助排序output1"));
// 5.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1 );
} catch (Exception e){
e.printStackTrace();
}
}
}
补充:
- 如果要输出价格排名前两名的订单,只需要空值Redeucer阶段的输出次数
package 第三章_MR框架原理.排序;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class OB_R extends Reducer<OB, NullWritable,OB,NullWritable> {
@Override
protected void reduce(OB key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
int i = 1;
for (NullWritable value:values){
if (i>2){
break;
}
context.write(key,NullWritable.get());
i++;
}
}
}
返回顶部