目录
需求描述:
需求分析:
设计技术:
默认的分区规则:
如何自定义分区?
ProvincePartitioner
代码设计:
运行结果:
关于分区:
大于影响
小于影响:
小总结:
需求描述:
需求分析:
设计技术:
默认的分区规则:
小测试验证:
如何自定义分区?
ProvincePartitioner
package cn.itcast.mapreduce.flowcount_2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.HashMap;
public class ProvincePartitioner extends Partitioner<Text,FlowBean_2> {
public static HashMap<String, Integer> province = new HashMap<String, Integer>();
static{
province.put("134",0);
province.put("135",1);
province.put("136",2);
province.put("137",3);
province.put("138",4);
}
public int getPartition(Text key, FlowBean_2 flowBean_2, int numPartitions) {
Integer code = province.get(key.toString().substring(0, 3));
if (code!=null){
return code;
}
return 5;
}
}
代码设计:
package cn.itcast.mapreduce.flowcount_2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowSumProvince {
//Mapper的数据处理
public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean_2> {
//设置map输出的key value
Text k = new Text();
FlowBean_2 v = new FlowBean_2();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿取一行文本转为String
String line = value.toString();
//按照分隔符\t进行分割
String[] fileds = line.split("\t");
//获取用户手机号
String phoNum = fileds[1];
/* k.set(phoNum);
v.set(Long.parseLong(fileds[fileds.length-3]),Long.parseLong(fileds[fileds.length-2]));*/
long upFlow = Long.parseLong(fileds[fileds.length-3]);
long downFlow = Long.parseLong(fileds[fileds.length-2]);
k.set(phoNum);
v.set(upFlow, downFlow);
context.write(k,v);
}
}
//reducer的数据处理
public static class FlowSumProvinceReducer extends Reducer<Text,FlowBean_2,Text,FlowBean_2>{
FlowBean_2 v = new FlowBean_2();
@Override
protected void reduce(Text key, Iterable<FlowBean_2> values, Context context) throws IOException, InterruptedException {
long upFlowCount=0;
long downFlowCount=0;
for (FlowBean_2 flowBean_2 : values) {
upFlowCount +=flowBean_2.getUpFlow();
downFlowCount +=flowBean_2.getDownFlow();
}
v.set(upFlowCount,downFlowCount);
context.write(key,v);
}
}
//主函数运行入口
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//配置参数,用于指定mr运行时相关的参数属性
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定我这个 job 所在的 jar包位置
job.setJarByClass(FlowSumProvince.class);
//指定我们使用的Mapper是那个类 reducer是哪个类
job.setMapperClass(FlowSumProvinceMapper.class);
job.setReducerClass(FlowSumProvinceReducer.class);
//4指定map输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean_2.class);
//指定ruduce输出的kv类型,也就是mr的最终输出结果
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean_2.class);
//这里设置运行reduceTask的个数
// todo reducetask个数 = 分区个数
// reducetask个数 > 分区个数 空文件产生 影响性能
// reducetask个数 < 分区个数 Illegal partition 非法分区 报错不执行
job.setNumReduceTasks(6);
//这里指定使用我们自定义的分区组件
job.setPartitionerClass(ProvincePartitioner.class);
//指定mr程序的输出路径
FileInputFormat.addInputPath(job,new Path("D:\\data\\test_2\\input"));
FileOutputFormat.setOutputPath(job,new Path("D:\\data\\test_2\\output3"));
boolean result = job.waitForCompletion(true);//提交mr程序,并且开始任务执行监控的功能
//如果mr程序执行成功,退出0,否则1
System.exit(result?0:1);
}
}
运行结果:
关于分区:
大于影响
小于影响:
小总结: