0
点赞
收藏
分享

微信扫一扫

《Data Algorithm》读书笔记 — 使用MapReduce实现二次排序

ivy吖 2022-01-26 阅读 54


《​​Data Algorithm​​​》读书笔记一 — 使用​​MapReduce​​实现二次排序

1. 二次排序

1.1 什么是二次排序?

什么是二次排序?二次排序就是 基于某个Key = k1排序之后的结果再进行Key=k2排序。 下面给出一个示例:

2000,12,04,10
2000,11,01,20
2012,12,23,60
2000,12,02,-20
2012,12,22,-20
2000,11,07,30
2000,11,24,-40
2012,01,01,35

上述的数据是一个每日的温度数据,如果我们要基于 年-月 排序,那么得到的结果就是如下这样:

2000,11,01,20
2000,11,07,30
2000,11,24,-40

2000,12,04,10
2000,12,02,-20

2012,01,01,35
2012,12,22,-20
2012,12,23,60

但是仅仅基于年月 排序还是不够的,比如说,我们想知道某个月的温度的一个正序排序,即得到如下这种结果:

2000,11  [-40,20,30]
2000,12 [-20,10]
2012,01 [35]
2012,12 [-20,60]

那么该怎么实现呢?

1.2 二次排序的应用场景

二次排序在日常工作的需求中经常需要被实现。接下来我就主要讲解一下如何实现这个二次排序。

2. 实现方式

简单的二次排序可以通过关系型数据库,表来实现,只需要写成 ​​order by field1,field2...fieldn​​ 即可。但是如果真的对几亿条数据这么操作的话,那么几乎所有的关系型数据库都是吃不消的。那么对于大数据量的情况下,该如何操作呢? 这里采用的方式就是使用 ​​MapReduce​​ 框架,实现这个需求。

而在​​MapReduce​​ 中又有不同的实现方式,分为如下两种:

2.1 reducer 中排序

在​​Reduce​​​中将得到​​Mapper​​​传递来的所有结果,可以使用Java的自带数据结构在 ​​Reducer​​ 中 排序,但是这样排序的场景只适合小数据,如果随着数据量的增长,这种方法不具备伸缩性。因为内存中不可能对很大的数据进行排序。

2.2 map 中排序

在​​Reducer​​中排序的劣势就是不具备可扩展性。但是如果在​​Mapper​​ 将结果 发往​​Reducer​​ 的时候就排好序呢?显然这个方式会更好。如下:

​Mapper​​ 中得到的是 ​​<key,list<value>>​​ ,普通的步骤就是将这个​​<key,list<value>>​​ 传递给 ​​Reducer​​,【但是这个​​list<value>​​ 中的value是无序的,现在的操作就是将这个每个key中​​list<value>​​ 变的有序】在​​Reducer​​中将得到的结构​​<key,list<value>>​​ 进行某种计算,得出结果。

实现二次排序的第二种方法就是将​​Mapper​​得到的数据再次使用​​MapReduce​​进行操作,不过第二次的​​MapReduce​​操作是不同于第一次​​MapReduce​​操作,第一次​​MapReduce​​操作是将​​key​​相同的数据分在一起;第二次​​MapReduce​​操作是将​​key​​ 相同的值进行排序。排序之后得到一个​​<key,list<value>>​​ ,然后再将这个​​<key,list<value>>​​ 发送给 ​​Reducer​​。

但是如何实现 第二次​​MapReduce​​操作 呢? 方法很简单,我们只需要构造一种组合键即可。例如:针对上面的​​日期+温度​​数据,我们可以将 ​​yearMonth+temperature​​ 温度数据作为 组合 ​​key​​,那么​​Mapper​​就会根据其进行分组。得到的结果就是:

year,mon,tem
2000,11, [-40,20,30]
2000,12, [-20,10]
2012,01, [35]
2012,12, [-20,60]

上述三列值的意思分别是:年,月,温度

3. 实现代码

  • ​DateTemperaturePair​​ 类
package data_algorithm.lawson.chapter_1;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class DateTemperaturePair implements Writable,WritableComparable<DateTemperaturePair>{

private final Text yearMonth = new Text();
private final Text day = new Text();
private final IntWritable temperature = new IntWritable();


public DateTemperaturePair() {
}

public DateTemperaturePair(String yearMonth, String day, int temperature) {
this.yearMonth.set(yearMonth);
this.day.set(day);
this.temperature.set(temperature);
}

public Text getYearMonthDay() {
return new Text(yearMonth.toString()+day.toString());
}

public Text getYearMonth() {
return yearMonth;
}

public Text getDay() {
return day;
}

public IntWritable getTemperature() {
return temperature;
}

public void setYearMonth(String yearMonthAsString) {
yearMonth.set(yearMonthAsString);
}

public void setDay(String dayAsString) {
day.set(dayAsString);
}

public void setTemperature(int temp) {
temperature.set(temp);
}

public void write(DataOutput out) throws IOException {
}

public void readFields(DataInput in) throws IOException {
}

public int compareTo(DateTemperaturePair pair) {
int compareValue = this.yearMonth.compareTo(pair.getYearMonth());
if (compareValue == 0) {
compareValue = temperature.compareTo(pair.getTemperature());
}
//return compareValue; // to sort ascending
return -1*compareValue; // to sort descending
}


@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

DateTemperaturePair that = (DateTemperaturePair) o;
if (temperature != null ? !temperature.equals(that.temperature) : that.temperature != null) {
return false;
}
if (yearMonth != null ? !yearMonth.equals(that.yearMonth) : that.yearMonth != null) {
return false;
}

return true;
}

@Override
public int hashCode() {
int result = yearMonth != null ? yearMonth.hashCode() : 0;
result = 31 * result + (temperature != null ? temperature.hashCode() : 0);
return result;
}

/**
* 1. 这个方法其实只是对这个DateTemperature对象进行输出
* @return
*/
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("DateTemperaturePair{yearMonth=");
builder.append(yearMonth);
builder.append(", day=");
builder.append(day);
builder.append(", temperature=");
builder.append(temperature);
builder.append("}");
return builder.toString();
}
}
  • ​SecondaryMapper​​ 类
package data_algorithm.lawson.chapter_1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* 1.KEYIN = LongWritable
* 2.VALUEIN = Text
* 3.KEYOUT = DateTemperaturePair
* 4.VALUEOUT = Text
*
*/
public class SecondarySortMapper extends Mapper<LongWritable,Text,DateTemperaturePair,Text>{

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String [] tokens = value.toString().split(",");
String yearMonth = tokens[0] + tokens[1];
String day = tokens[2];
int temperature = Integer.parseInt(tokens[3]);

System.out.println("yearMonth:"+yearMonth+" day"+day+" temperature"+temperature);

//准备规约器
DateTemperaturePair reducerKey = new DateTemperaturePair();

reducerKey.setYearMonth(yearMonth);
reducerKey.setDay(day);
reducerKey.setTemperature(temperature);

//如果是简单的一个mapReduce程序,那么仅仅需要实现
//context.write(yearMonth,temperature); 即可
//但是因为这样难以实现二次排序的目的,所以将其封装到了一个Entity —— DateTemperaturePair类中
//这个DateTemperaturePair 类实现了Writable 和 WritableComparable 两个接口,分别用于实现序列化 和 可比较 两个方法
context.write(reducerKey,new Text(Integer.toString(temperature)));
}
}
  • ​SecondaryReducer​​ 类
package data_algorithm.lawson.chapter_1;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* 1.KEYIN = DateTemperaturePair
* 2.VALUEIN = Text
* 3.KEYOUT = Text
* 4.valueOUt = text
*/
public class SecondarySortReducer extends Reducer<DateTemperaturePair ,Text,Text ,Text > {

public void reduce(DateTemperaturePair key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
System.out.println("start reducer...");
StringBuilder sortedTemperatureList = new StringBuilder();
for (Text temperature : values) {
sortedTemperatureList.append(temperature);
sortedTemperatureList.append(",");
}
Text outputKey = new Text(key.getYearMonth());
System.out.println("outputKey is: "+outputKey.toString());

Text outputValue = new Text(sortedTemperatureList.toString());
System.out.println("outputValue is: "+outputValue.toString());
context.write(outputKey, outputValue);
}
}
  • ​DateTemperatureGroupingComparator​
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class DateTemperatureGroupingComparator extends WritableComparator{

public DateTemperatureGroupingComparator() {
super(DateTemperaturePair.class,true);
}

@Override
public int compare(WritableComparable wc1, WritableComparable wc2) {
System.out.println("start group comparator....");
DateTemperaturePair pair = (DateTemperaturePair)wc1;
DateTemperaturePair pair2 = (DateTemperaturePair)wc2;
//System.out.println("yearMonth: "+pair.getYearMonth()+" temperature: "+pair.getTemperature());
boolean res = pair.getYearMonth().equals(pair2.getYearMonth());
int status = res ? 0 : 1;
return status; // 1 stand for a sole group, but the 0 stand for a united group
}
}

这里只简单的列举一下主要的实现类,详细代码可见我的github的​​DayProgram​​。

4. 实现结果

[root@server4 hadoop]# hdfs dfs -cat /output/temperature/part-r-00000
35,-40,30,-20,-20,60,20,10,

可以看到上述的输出结果有将​​key​​​ 写出,而仅仅只是写出了​​value​​​ 值。出现这种情况的原因是:没有在输出类中序列化,从而导致​​Hadoop​​​ 集群并不知道这个​​DateTemperaturePair​​​类是什么,导致没有输出。修改代码,将其中的​​write()​​​ 和 ​​readFileds()​​方法

public void write(DataOutput out) throws IOException {
yearMonth.write(out);
day.write(out);
temperature.write(out);
}

public void readFields(DataInput in) throws IOException {
yearMonth.readFields(in);
day.readFields(in);
temperature.readFields(in);
}

再次运行项目得到的结果如下:

[root@server4 hadoop]# hdfs dfs -cat /output/temperature/part-r-00000
201212 60,-20,
201201 35,
200012 10,-20,
200011 30,20,-40,

如果不使用这个​​DateTemperatureGroupingComparator​​ 功能,那么将得到如下的结果:

[root@server4 hadoop]# hdfs dfs -cat /output/temperature/part-r-00000
201212 60,
201212 -20,
201201 35,
200012 10,
200012 -20,
200011 30,
200011 20,
200011 -40,

我们都知道,在​​Mapper​​​ => ​​Reducer​​​ 的这个过程中,会有一个​​Shuffle​​​ ,这个​​Shuffle​​​会将所有的相同键的值组合在一起,然后发送给​​Reducer​​​【然而这个​​Shuffle​​​过程中是黑箱的,也就是对于用户是透明的】。而这里的​​DateTemperatureGroupingComparator​​​ 功能就是将这些​​<key,value>​​ 按照 温度值进行排序 分组 。

但是可能有人​​DateTemperatureGroupingComparator​​ 这个类,下面就对这个类进行一个详细的讲解:

5.​​DateTemperatureGroupingComparator​​ 类精讲

​DateTemperatureGroupingComparator​​​ 继承​​WritableComparator​​​ 类。而关于​​WritableComparator​​​,可见我的博客:WritableComparator详解

5.1 即不使用 ​​compare​​ 方法

如果注释掉了 ​​compare​​ 方法,那么得到的结果是没有分组的,即呈如下的样子:

[root@server4 mapreduce]# hdfs dfs -cat /output/temperature/part-r-00000
201212 60,
201212 -20,
201201 35,
200012 10,
200012 -20,
200011 30,
200011 20,
200011 -40,

可以看到是没有将key相同的 数据分成一组。 因为我们在map 函数中,发送的是 ​​context.write(reducerKey,...);​​​而这个​​reducerKey​​ 是一个对象,如果要根据这个对象区分,那么每个对象都是一个不同的 key【因为每个对象都不相同】,所以就成了即使是年月一样,但是结果也不会分在一起。

5.2 如果直接在 ​​compare​​​ 方法中使用 ​​return 1​

代码修改成如下的样子:

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class DateTemperatureGroupingComparator extends WritableComparator{

public DateTemperatureGroupingComparator() {
super(DateTemperaturePair.class,true);
}

@Override
public int compare(WritableComparable wc1, WritableComparable wc2) {
return 1;
}
}

则得到如下的结果:

[root@server4 mapreduce]# hdfs dfs -cat /output/temperature/part-r-00000
201212 60,
201212 -20,
201201 35,
200012 10,
200012 -20,
200011 30,
200011 20,
200011 -40,

可以看到 这里的 ​​return 1;​​​代表的就是所有的 ​​<key-value>​​对 都是不同的组

5.3 如果直接在 ​​compare​​​ 方法中 使用​​reutrn 0​

则得到如下的结果:

[root@server4 mapreduce]# hdfs dfs -cat /output/temperature/part-r-00000
200011 60,-20,35,10,-20,30,20,-40,
5.4 扩展

如果说,想将年份相同的温度放在一起排序,那么该怎么实现呢?修改​​DateTemperatureGroupingComparator​​​类的​​compare​​方法如下:

@Override
public int compare(WritableComparable wc1, WritableComparable wc2) {
System.out.println("start group comparator....");
DateTemperaturePair pair = (DateTemperaturePair)wc1;
DateTemperaturePair pair2 = (DateTemperaturePair)wc2;
boolean res = pair.getYearMonth().toString().substring(0,4).equals(pair2.getYearMonth().toString().substring(0,4));
int status = res ? 0 : 1;
return status; // 1 stand for a sole group, but the 0 stand for a united group
}

得到的输出结果如下:

[root@server4 mapreduce]# hdfs dfs -cat /output/temperature/part-r-00000
201201 60,-20,35,
200011 10,-20,30,20,-40,

6.总结


  • ​Mapper​​​ 的功能是将读取输入,形成键值对​​<key,pair>​
  • ​Reduce​​​ 的功能是对同种​​key​​ 的值进行操作,从而输出
  • 分组比较器使得温度会按照有序的顺序到达归约器【这个是《Data Alogrithm》一书中的表述】。而我认为分组比较器的作用在于将各个相同的key 分到一组。
  • ​hadoop​​ 提供的插件式体系结构,从而我们可以在框架中注入定制的比较器

7. 思考

需要深度挖掘的地方有:


  • ​shuffle​​ 过程是如何实现的?
  • ​group​​ 具体是如何实现的?
  • 整个​​MapReduce​​的处理流程到底是什么样子?具体的描述出来?


举报

相关推荐

0 条评论