0
点赞
收藏
分享

微信扫一扫

【MapReduce】基础案例 ---- Map Join 实现数据合并(缓存表)



文章目录

  • ​​Map Join​​
  • ​​① Map Join工作原理​​
  • ​​② Map Join 案例​​
  • ​​☠ 需求分析​​
  • ​​☠ 代码实现​​
  • ​​Mapper阶段​​
  • ​​Driver阶段​​
  • ​​☠ 总结​​

Map Join

① Map Join工作原理

  • 1.使用场景
    Map Join适用于一张表十分小、一张表很大的场景
  • 2.优点
  • 思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
  • 在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜
  • 3.具体办法:采用DistributedCache
    (1)在Mapper的setup阶段,将文件读取到缓存集合中。
    (2)在驱动函数中加载缓存。

// 缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file://e:/cache/pd.txt"));

​​返回顶部​​

② Map Join 案例

☠ 需求分析

  • Map Join适用于关联表中有小表的情形
  • 【MapReduce】基础案例 ---- Map Join 实现数据合并(缓存表)_mapreduce

​​返回顶部​​

☠ 代码实现

Mapper阶段

package 第三章_MR框架原理.多种join应用;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class DistributedCacheMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
Text k = new Text();
// 创建集合存储拆分的数据信息
HashMap pdMap = new HashMap<String,String>();
/**
* 重写setUp方法
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 1.缓存小表
// 1.1获取要缓存的文件路径
// 1.1.1 首先获取缓存文件
URI[] cacheFiles = context.getCacheFiles();
// 1.1.2 通过缓存文件获取其路径
String path = cacheFiles[0].getPath().toString();
// 1.2 读取文件信息
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path),"UTF-8"));
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())){
//pid pname
//01 小米
// 1.3 拆分
String[] fields = line.split("\t");
// 1.4 添加到集合中,将pid作为key,通过pid获取pname,后续写到order表中
pdMap.put(fields[0],fields[1]);
}
// 1.5 关闭资源
IOUtils.closeStream(reader);
}
/**
* 在map阶段进行join操作
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// id pid amount
// 1001 01 1
// pid pname
// 01 小米
// 1.获取一行数据
String line = value.toString();
// 2.拆分
String[] fields = line.split("\t");
// 3.获取pid,到对应的pdMap中找出pname写入
String pid = fields[1];
String pname = (String) pdMap.get(pid);
// 4.拼接
line = line + "\t" + pname;
k.set(line);
// 5.写出
context.write(k,NullWritable.get());
}
}

​​返回顶部​​

Driver阶段

package 第三章_MR框架原理.多种join应用;

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DistributedCacheDriver {
public static void main(String[] args) {
Configuration configuration = new Configuration();
Job job = null;
try {
// 1 获取job信息
job = Job.getInstance(configuration);
// 2 设置加载jar包路径
job.setJarByClass(DistributedCacheDriver.class);
// 3 关联map
job.setMapperClass(DistributedCacheMapper.class);
// 4 设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\多种join应用\\data\\order.txt"));
FileOutputFormat.setOutputPath(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\多种join应用\\cacheoutput"));
// 6 加载缓存数据表
job.addCacheFile(new URI("file:///G:/Projects/IdeaProject-C/MapReduce/src/main/java/第三章_MR框架原理/多种join应用/data/pd.txt"));
// 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
// 8 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}

【MapReduce】基础案例 ---- Map Join 实现数据合并(缓存表)_hadoop_02

​​返回顶部​​

☠ 总结

  • 可以明显看出,在Map阶段进行join操作,不需要使用Reduce,合并的操作完全在Mapper阶段完成,充分利用了map阶段的资源,并且有效避免了Reduce阶段的数据倾斜情况。

​​返回顶部​​


举报

相关推荐

0 条评论