0
点赞
收藏
分享

微信扫一扫

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件


目录

​​需求描述:​​

​​需求分析:​​

​​设计技术:​​

​​默认的分区规则:​​

​​如何自定义分区?​​

​​ProvincePartitioner ​​

​​代码设计:​​

​​运行结果:​​

​​关于分区:​​

​​大于影响​​

​​小于影响:​​

​​小总结:​​

需求描述:

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_hadoop

 

需求分析:

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_hadoop_02

设计技术:

默认的分区规则:

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_hadoop_03

小测试验证:

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_mapreduce_04

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_apache_05

 

如何自定义分区?

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_apache_06

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_mapreduce_07

 

ProvincePartitioner 

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_hadoop_08

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_apache_09

 

package cn.itcast.mapreduce.flowcount_2;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;

public class ProvincePartitioner extends Partitioner<Text,FlowBean_2> {
public static HashMap<String, Integer> province = new HashMap<String, Integer>();

static{
province.put("134",0);
province.put("135",1);
province.put("136",2);
province.put("137",3);
province.put("138",4);
}
public int getPartition(Text key, FlowBean_2 flowBean_2, int numPartitions) {

Integer code = province.get(key.toString().substring(0, 3));
if (code!=null){
return code;
}
return 5;
}
}

代码设计:

 

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_apache_10

 

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_hadoop_11

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_apache_12

package cn.itcast.mapreduce.flowcount_2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowSumProvince {


//Mapper的数据处理
public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean_2> {
//设置map输出的key value
Text k = new Text();
FlowBean_2 v = new FlowBean_2();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿取一行文本转为String
String line = value.toString();
//按照分隔符\t进行分割
String[] fileds = line.split("\t");
//获取用户手机号
String phoNum = fileds[1];

/* k.set(phoNum);
v.set(Long.parseLong(fileds[fileds.length-3]),Long.parseLong(fileds[fileds.length-2]));*/
long upFlow = Long.parseLong(fileds[fileds.length-3]);
long downFlow = Long.parseLong(fileds[fileds.length-2]);

k.set(phoNum);
v.set(upFlow, downFlow);

context.write(k,v);
}
}

//reducer的数据处理
public static class FlowSumProvinceReducer extends Reducer<Text,FlowBean_2,Text,FlowBean_2>{
FlowBean_2 v = new FlowBean_2();
@Override
protected void reduce(Text key, Iterable<FlowBean_2> values, Context context) throws IOException, InterruptedException {
long upFlowCount=0;
long downFlowCount=0;

for (FlowBean_2 flowBean_2 : values) {
upFlowCount +=flowBean_2.getUpFlow();
downFlowCount +=flowBean_2.getDownFlow();
}
v.set(upFlowCount,downFlowCount);
context.write(key,v);
}
}

//主函数运行入口
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//配置参数,用于指定mr运行时相关的参数属性
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

//指定我这个 job 所在的 jar包位置
job.setJarByClass(FlowSumProvince.class);

//指定我们使用的Mapper是那个类 reducer是哪个类
job.setMapperClass(FlowSumProvinceMapper.class);
job.setReducerClass(FlowSumProvinceReducer.class);


//4指定map输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean_2.class);

//指定ruduce输出的kv类型,也就是mr的最终输出结果
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean_2.class);

//这里设置运行reduceTask的个数
// todo reducetask个数 = 分区个数
// reducetask个数 > 分区个数 空文件产生 影响性能
// reducetask个数 < 分区个数 Illegal partition 非法分区 报错不执行
job.setNumReduceTasks(6);

//这里指定使用我们自定义的分区组件
job.setPartitionerClass(ProvincePartitioner.class);

//指定mr程序的输出路径
FileInputFormat.addInputPath(job,new Path("D:\\data\\test_2\\input"));
FileOutputFormat.setOutputPath(job,new Path("D:\\data\\test_2\\output3"));


boolean result = job.waitForCompletion(true);//提交mr程序,并且开始任务执行监控的功能
//如果mr程序执行成功,退出0,否则1
System.exit(result?0:1);
}
}

 

运行结果:

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_mapreduce_13

 

 

关于分区:

大于影响

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_apache_14

 

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_mapreduce_15

 

小于影响:

 

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_hadoop_16

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_mapreduce_17

 

小总结:

 

将流量汇总统计结果按照手机归属地不同省份输出到不同的文件_apache_18

举报

相关推荐

0 条评论