0
点赞
收藏
分享

微信扫一扫

MapReduce优化倒序排序

半秋L 2022-02-24 阅读 79



  1. 第一次完成正常的统计总流量数据,第二步将结果进行排序
  2. context.write(总流量,手机号)
  3. SorFlowBean实现WritableComparable接口重写compareTo方法
    ​ @Override public int compareTo(FlowBean o) { // 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } ​

SortFlowBean类

package com.zyd.sortflowbean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class SortFlowBean implements WritableComparable<SortFlowBean>{

private long upFlow;
private long downFlow;
private long sumFlow;




//反序列化需要反射调用空参构造函数
public SortFlowBean() {
super();
}



public SortFlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

public void set(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

public long getUpFlow() {
return upFlow;
}



public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}



public long getDownFlow() {
return downFlow;
}



public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}



public long getSumFlow() {
return sumFlow;
}



public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}



@Override
/**
* 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致
*/
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);

}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}

/**
* 重点
*/
@Override
public int compareTo(SortFlowBean o) {
//倒序排序,从大到小
return this.sumFlow>o.getSumFlow()?-1:1;
}

}

对于mapper方法优化为一个对象,reduce方法则直接输出结果即可,驱动函数根据输入输出重写配置即可

把需要做操作的数据作为key 不需要的或者重复的累加数据作为value

package com.zyd.sortflowbean;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
*
* @author Administrator
*
*/
public class FlowSortMapper extends Mapper<LongWritable, Text, SortFlowBean, Text> {
SortFlowBean sBean = new SortFlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 截取字段
String[] fields = line.split("\t");
//3 封装对象以及获取电话号码
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);

sBean.set(upFlow, downFlow);
v.set(fields[0]);

//4 写出去
context.write(sBean, v);
}
}

Reducer方法

package com.zyd.sortflowbean;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 输出的时候要求是key 是flowBean value 是Text 手机号
* @author Administrator
*
*/
public class FlowSortReducer extends Reducer<SortFlowBean, Text, Text, SortFlowBean>{

@Override
protected void reduce(SortFlowBean bean, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
Text v = values.iterator().next();
context.write(v, bean);
}
}

实现的主驱动类

package com.zyd.sortflowbean;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.zyd.flowsum.FlowBean;

public class FlowSorDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 获取jar的存储路径
job.setJarByClass(FlowSorDriver.class);
//3 关联map和reduce的class类
job.setMapperClass(FlowSortMapper.class);
job.setReducerClass(FlowSortReducer.class);

//4 设置map阶段输出的key和value类型
job.setMapOutputKeyClass(SortFlowBean.class);
job.setMapOutputValueClass(Text.class);

//5 设置最后输出数据的key和value类型

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(SortFlowBean.class);

//6 设置输入数据的路径 和输出数据的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//7 提交
boolean result = job.waitForCompletion(true);

System.exit(result ? 0:1);

}
}



举报

相关推荐

0 条评论