0
点赞
收藏
分享

微信扫一扫

partition分区将统计的结果按照不同的要求输出到不同的文件中


源代码作用默认的partition分区

public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}

默认分区是根据key的hashCode对reduceTasks个数取模得到的,用户没发控制哪个key存储到哪个分区

自定义Patitioner步骤

(1)自定义类继承Partitioner,重新getPartitioner()方法

package com.zyd.flowsum;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* K2 V2 对应是map输出的K,V类型
* @author Administrator
*
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{

@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
//1 获取电话号码的前三位
String preNum = key.toString().substring(0,3);

int partition =4 ;
// 2 判断是哪个省的
if ("136".equals(preNum)) {
partition = 0;
}else if("137".equals(preNum)){
partition = 1;
}else if("138".equals(preNum)){
partition = 2;
}else if("139".equals(preNum)){
partition = 3;
}


return partition;
}

}

(2)在job驱动中,设置自定义的Partitioner

(3)自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce Task

//2 设置jar包路径
job.setJarByClass(FlowDriver.class);

// 指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);
//指定响应数量的reduceTask 注意task的数目等于分区数 分区数 0 到设置的数
job.setNumReduceTasks(5);

//3 管理类mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);

注意:

如果reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;

如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;

假定自定义分区数为5,则

(1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
(2)job.setNumReduceTasks(2);会报错
(3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件

所有设置分区数=mapTask数



举报

相关推荐

0 条评论