文章目录
- Partition分区
- ☠ 默认分区 --- HashPartitioner
- ▪ 案例 --- WordCount
- Mapper阶段
- Reducer阶段
- Driver阶段
- ☠ 自定义Partitioner分区
- ▪ 自定义分区基本步骤
- ▪ 案例
- 需求分析
- 代码实现
- PhoneBean封装类
- ProvincePartitioner分区类
- Mapper阶段
- Reducer阶段
- Driver阶段
- ★ 分区总结
Partition分区
在进行数据处理的时候要求将统计结果按照条件输出到不同文件中,这是就会将数据按照不同的条件进行区域划分,统计计算后输出。这就涉及到了分区的概念。
- 比如:将统计结果按照手机归属地
不同省份
输出到不同文件中,这时就需要按照省份进行分区
☠ 默认分区 — HashPartitioner
public class HashPartitioner<k,v> extends Partitioner<k,v>{
public int get Partition(k key,v value,int numReduceTasks){
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
-
默认分区是根据key的hashCode对ReduceTasks个数取模得到的
,用户没法空值哪个key存储到那个分区。
▪ 案例 — WordCount
Mapper阶段
/**
* Mapper 阶段
* KEYIN 输入数据的key类型
* VALUEIN 输入数据的value类型
* KEYOUT 输出数据的key类型
* VALUEOUT 输出数据的value类型
*/
public class wordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
// 创建对象
Text k = new Text();
IntWritable v = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println(key.toString());
// 1.获取一行数据
// atguigu atguigu
String line = value.toString();
// 2.切分
String[] words = line.split(" ");
// 3.循环写出
for (String word:words){
// 设置键 atguigu
k.set(word);
// 设置词频为 1 , 也可以在上面创建对象时默认为1
v.set(1);
// 生成键值对 (atguigu,1)
context.write(k,v);
}
}
}
Reducer阶段
/**
* Reducer 阶段
* KEYIN ,VALUEIN Reducer阶段输入(Mapper阶段输出)数据的类型
* KEYOUT 最终输出数据的key类型
* VALUEOUT 最终输出数据的value类型
*/
public class wordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
IntWritable v = new IntWritable();
@Override
// Iterable<IntWritable> values 对key的value值进行迭代实现词频统计
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// atguigu,1
// atguigu,1
// 1.累加求和
int sum = 0;
for (IntWritable value:values){
// value是IntWritable类型数据,通过get转为int型,才好计算
sum += value.get();
}
// 2.写出结果
v.set(sum);
context.write(key,v);
}
}
Driver阶段
public class wordCountDriver {
public static void main(String[] args) {
Configuration conf = new Configuration();
Job job = null;
try {
// 1.获取job对象
job = Job.getInstance(conf);
// 2.设置jar存储位置
job.setJarByClass(wordCountDriver.class);
// 3.关联map、reduce类
job.setMapperClass(wordCountMapper.class);
job.setReducerClass(wordCountReducer.class);
// 4.设置Mapper阶段输出数据的key、value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置Reducer阶段输出数据的key、value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置 ReduceTask 数量,默认为1
job.setNumReduceTasks(2);
// 6.设置输入、出路径
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\dataset\\"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\output\\"));
// 打jar包
// FileInputFormat.setInputPaths(job,new Path(args[0]));
// FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 7.提交job
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
默认分区是根据key的hashCode对ReduceTasks个数取模得到的,在这里我们设置ReduceTask的数量为2。
// 设置 ReduceTask 数量,默认为1
job.setNumReduceTasks(2);
返回顶部
☠ 自定义Partitioner分区
▪ 自定义分区基本步骤
返回顶部
▪ 案例
需求分析
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据
(2)期望输出数据
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
返回顶部
代码实现
PhoneBean封装类
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PhoneBean implements Writable {
private String ip; // ip
private long upFlow; // 上行流量
private long downFlow; // 下行流量
private long sumFlow; // 总流量
public PhoneBean() {
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(ip);
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
ip = dataInput.readUTF();
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}
@Override
public String toString() {
// 方便后续切割
return ip + "\t" +upFlow + "\t" + downFlow + "\t" + sumFlow;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void set(String ip1,long upFlow1,long downFlow1){
ip = ip1;
upFlow = upFlow1 ;
downFlow = downFlow1;
sumFlow = upFlow1 + downFlow1;
}
}
返回顶部
ProvincePartitioner分区类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner <Text,PhoneBean> {
@Override
public int getPartition(Text text, PhoneBean phoneBean, int numPartitions) {
// key 是手机号
// value 是户主信息
// 1.获取手机号前三位
String phoneNum = text.toString().substring(0,3);
// 2.定义分区数 注意:分区数必须从0开始
int partition = 4;
if ("136".equals(phoneNum)){
partition = 0;
} else if ("137".equals(phoneNum)){
partition = 1;
} else if ("138".equals(phoneNum)){
partition = 2;
}else if ("139".equals(phoneNum)){
partition = 3;
} else {
partition = 4;
}
return partition;
}
}
返回顶部
Mapper阶段
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PhoneMapper extends Mapper<LongWritable, Text,Text,PhoneBean> {
Text k = new Text();
PhoneBean v = new PhoneBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1. 读取一行数据
String line = value.toString();
// 2. 拆分
String[] words = line.split("\t");
// 3. 封装对象
k.set(words[1]);
String ip = words[2];
long upFlow = Long.parseLong(words[words.length-3]);
long dowmFlow = Long.parseLong(words[words.length-2]);
v.setIp(ip);
v.setUpFlow(upFlow);
v.setDownFlow(dowmFlow);
// 4.写出
context.write(k,v);
}
}
返回顶部
Reducer阶段
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PhoneReducer extends Reducer<Text,PhoneBean,Text,PhoneBean> {
String ip = "";
long sum_upFlow = 0;
long sum_downFlow = 0;
PhoneBean v = new PhoneBean();
@Override
protected void reduce(Text key, Iterable<PhoneBean> values, Context context) throws IOException, InterruptedException {
// 1.累加求和
for (PhoneBean phoneBean:values){
ip = phoneBean.getIp();
sum_upFlow += phoneBean.getUpFlow();
sum_downFlow += phoneBean.getDownFlow();
}
v.set(ip,sum_upFlow,sum_downFlow);
// 2.写出
context.write(key,v);
ip = "";
sum_upFlow = 0;
sum_downFlow = 0;
}
}
返回顶部
Driver阶段
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PhoneDriver {
public static void main(String[] args) {
Job job = null;
Configuration conf = new Configuration();
try{
// 获取job对象
job = Job.getInstance(conf);
// 配置
job.setMapperClass(PhoneMapper.class);
job.setReducerClass(PhoneReducer.class);
job.setJarByClass(PhoneDriver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PhoneBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PhoneBean.class);
// 指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);
// 同时指定相应数量的reduce task
job.setNumReduceTasks(5);
// 设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\dataset\\phone_data .txt"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\output1\\"));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
} catch (Exception e){
e.printStackTrace();
}
}
}
返回顶部
★ 分区总结
返回顶部