0
点赞
收藏
分享

微信扫一扫

Shuffle之1分区Partition


Shuffle

Shuffle

》1:Shuffle是什么?

shuffle    英[ˈʃʌfl] 美[ˈʃʌfl] 洗(牌)
将所有元素随机排序

在MR中,Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle

》2:Shuffle过程详解

Shuffle之1分区Partition_数据

Shuffle之1分区Partition_jar包_02

暂时不讲,只要求划出未知知识点

具体Shuffle过程详解,如下:
1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3)多个溢出文件会被合并成大的溢出文件
4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

Partition分区概述

》1:什么是Partition分区

要求将统计结果按照条件输出到不同文件中(分区)。

比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

》》2:默认Partition分区

public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}

key.hashCode() & Integer.MAX_VALUE
防止key.hashCode()溢出

% numReduceTasks
轮流发牌

默认分区是根据key的hashCode对reduceTasks个数取模得到的。
用户没法控制哪个key存储到哪个分区。

》》3 自定义Partitioner步骤
(1)自定义类继承Partitioner,重写getPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {

// 1 获取电话号码的前三位
String preNum = key.toString().substring(0, 3);

int partition = 4;

// 2 判断是哪个省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}

(2)在job驱动中,设置自定义partitioner:

job.setPartitionerClass(CustomPartitioner.class);

(3)自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task

job.setNumReduceTasks(5);

》》4 reduceTask的数量

如果reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,
最终也就只会产生一个结果文件 part-r-00000;

例如:假设自定义分区数为5,则
(1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
(2)job.setNumReduceTasks(2);会报错
(3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件

Partition分区案例实操

需求

将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据

1 13736230513 192.196.100.1 www.dev1.com  2481  24681 200
2 13836230513 192.196.100.1 www.dev1.com 2481 24681 200
3 13636230513 192.196.100.1 www.dev1.com 2481 24681 200
4 13936230513 192.196.100.1 www.dev1.com 2481 24681 200
5 13136230513 192.196.100.1 www.dev1.com 2481 24681 200

(2)期望输出数据

文件1
文件2
文件3
文件4
文件5

(3)增加一个ProvincePartitioner分区

136 分区0
137 分区1
138 分区2
139 分区3
其他 分区4

(4)Driver驱动类

// 8 指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);

// 9 同时指定相应数量的reduce task
job.setNumReduceTasks(5);

手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

###实现
》》1 在案例FlowSum的基础上,增加一个分区类

package com.dev1.flowsum;
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {

// 1 获取电话号码的前三位
String preNum = key.toString().substring(0, 3);

int partition = 4;

// 2 判断是哪个省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}

》》2 在驱动函数中增加自定义数据分区设置和ReduceTask设置

package com.dev1.flowsum;

public class FlowsumDriver {

public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[]{"e:/output1","e:/output2"};

// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowsumDriver.class);

// 3 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

// 4 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 8 指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);

// 9 同时指定相应数量的reduce task
job.setNumReduceTasks(5);

// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}


举报

相关推荐

0 条评论