0
点赞
收藏
分享

微信扫一扫

reduce端join与map端join算法实现


        本篇博客小菌为大家带来的是MapReduce中reduce端join与map端join算法的实现。

reduce端join算法实现

        先让我们来看下需求,有下面两种表格:


订单数据表 ​t_order

id

date

pid

amount

1001

20150710

P0001

3

1002

20150710

P0002

3

商品信息表 ​t_product:

id

pname

category_id

price

P0001

小米5

1000

2000

P002

锤子T1

1000

3000

        假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下。

用SQL查询运算的话,语句如下:

select  a.id,a.date,b.pname,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

        但如果现在想用MapReduce实现类似的效果该如何实现呢?

        正确的思路是:​通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联

        我们先表格中的数据整理成文件。

orders.txt

1001,20150710,p0001,2
1002,20150710,p0001,3
1002,20150710,p0002,3

product.txt

p0001,小米5,1000,2000
p0002,锤子T1,1000,3000


        接下来我们就开始上手代码~~

第一步:定义OrderBean

package demo14_join算法_reducejoin;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* @Auther: 传智新星
* @Date: 2019/11/18 10:42
* @Description:
*/

public class JoinBean implements Writable {


private String id;

private String date;

private String pid;

private String amount;

private String pname;

private String category_id;

private String price;

@Override
public String toString() {
return "JoinBean{" +
"id='" + id + '\'' +
", date='" + date + '\'' +
", pid='" + pid + '\'' +
", amount='" + amount + '\'' +
", pname='" + pname + '\'' +
", category_id='" + category_id + '\'' +
", price='" + price + '\'' +
'}';
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getDate() {
return date;
}

public void setDate(String date) {
this.date = date;
}

public String getPid() {
return pid;
}

public void setPid(String pid) {
this.pid = pid;
}

public String getAmount() {
return amount;
}

public void setAmount(String amount) {
this.amount = amount;
}

public String getPname() {
return pname;
}

public void setPname(String pname) {
this.pname = pname;
}

public String getCategory_id() {
return category_id;
}

public void setCategory_id(String category_id) {
this.category_id = category_id;
}

public String getPrice() {
return price;
}

public void setPrice(String price) {
this.price = price;
}

public JoinBean() {
}

public JoinBean(String id, String date, String pid, String amount, String pname, String category_id, String price) {
this.id = id;
this.date = date;
this.pid = pid;
this.amount = amount;
this.pname = pname;
this.category_id = category_id;
this.price = price;
}


//序列化
@Override
public void write(DataOutput out) throws IOException {


out.writeUTF(id+"");
out.writeUTF(date+"");
out.writeUTF(pid+"");
out.writeUTF(amount+"");
out.writeUTF(pname+"");
out.writeUTF(category_id+"");
out.writeUTF(price+"");


}


//反序列化

@Override
public void readFields(DataInput in) throws IOException {

this.id=in.readUTF();
this.date=in.readUTF();
this.pid=in.readUTF();
this.amount=in.readUTF();
this.pname=in.readUTF();
this.category_id=in.readUTF();
this.price=in.readUTF();


}
}

第二步:定义map类

package demo14_join算法_reducejoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
* @Auther: 传智新星
* @Date: 2019/11/18 10:48
* @Description:
*/
public class JoinMap extends Mapper<LongWritable, Text,Text,JoinBean> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


//实例JoinBean
JoinBean joinBean = new JoinBean();

//通过context可以获取这行文本所属的文件名称
FileSplit inputSplit = (FileSplit)context.getInputSplit();
String name = inputSplit.getPath().getName();


String [] split = value.toString().split(",");
//对文件名进行判断

//包含orders的就获取角标为2 的数据
if (name.contains("orders")){

joinBean.setId(split[0]);
joinBean.setDate(split[1]);
joinBean.setPid(split[2]);
joinBean.setAmount(split[3]);

context.write(new Text(split[2]),joinBean);


}else{
//不包含orders的就获取数据内角标为0的数据
joinBean.setPname(split[1]);
joinBean.setCategory_id(split[2]);
joinBean.setPrice(split[3]);

context.write(new Text(split[0]),joinBean);

}

}
}

第三步:自定义reduce类

package demo14_join算法_reducejoin;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* @Auther: 传智新星
* @Date: 2019/11/18 11:00
* @Description:
*/
public class JoinReduce extends Reducer<Text,JoinBean,JoinBean, NullWritable> {


// 遍历values 进将多个一半的joinBean 拼接到一起
@Override
protected void reduce(Text key, Iterable<JoinBean> values, Context context) throws IOException, InterruptedException {

//实例一个最终的bean
JoinBean joinBean = new JoinBean();

for (JoinBean value : values) {


if (value.getId()!=null&&!value.getId().equals("null")){

joinBean.setId(value.getId());
joinBean.setDate(value.getDate());
joinBean.setPid(value.getPid());
joinBean.setAmount(value.getAmount());


}else{

joinBean.setPname(value.getPname());
joinBean.setCategory_id(value.getCategory_id());
joinBean.setPrice(value.getPrice());

}

}

//将赋值完的对象赋值
context.write(joinBean,NullWritable.get());



}
}

第四步:开发main方法入口

package demo14_join算法_reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* @Auther: 传智新星
* @Date: 2019/11/18 11:08
* @Description:
*/
public class JoinDrive extends Configured implements Tool {


public static void main(String[] args) throws Exception {

int run = ToolRunner.run(new JoinDrive(), args);

System.out.println("运行的状态:"+run);

}

@Override
public int run(String[] args) throws Exception {

//1.实例化Configuration对象
Configuration conf = new Configuration();

//实例化Job对象
Job job = Job.getInstance(conf, "MoreFile");

job.setJarByClass(JoinDrive.class);

//2.设置输入
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("E:\\2019大数据课程\\DeBug\\测试\\order\\素材\\4\\map端join\\input"));

//3.设置map
job.setMapperClass(JoinMap.class);

//设置key,value的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(JoinBean.class);


//4.设置reduce
job.setReducerClass(JoinReduce.class);

//设置key,value的输出类型
job.setOutputKeyClass(JoinBean.class);
job.setOutputValueClass(NullWritable.class);

//5.设置输出
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("E:\\2019大数据课程\\DeBug\\测试结果\\join1"));

//返回运行结果
return job.waitForCompletion(true)?0:1;

}
}

        让我们打开join1目录下生成的文件

reduce端join与map端join算法实现_reduce端join算法

        说明我们的程序运行成功!

        但我们这个程序也有一个很明显的​缺点​:join算法是在reduce阶段完成的,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜!

        具体的解决方案是什么?这自然而然地引出了我们后面的"主角"——​map端的join算法!



map端join算法实现

        先让我们来看下map的join算法的原理阐述

  • 适用于关联表中有小表的情形
  • 可以将小表分发到所有的map节点。这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,​可以大大提高join操作的并发度,加快处理速度

        先让我们准备一下数据


pdts.txt​(作为"小表"存在的文件必须位于Hadoop集群上)

reduce端join与map端join算法实现_ReduceMap_02

p0001,xiaomi,1000,2
p0002,appale,1000,3
p0003,samsung,1000,4

orders.txt​(map_join_iput文件夹下)

1001,20150710,p0001,2
1002,20150710,p0002,3
1003,20150710,p0003,3

        终于可以开始上手代码了~

第一步:定义mapJoin

package demo14_join算法_reducejoin.mapjoin;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class JoinMap extends Mapper<LongWritable, Text,Text,Text> {

HashMap<String,String> b_tab = new HashMap<String, String>();
String line = null;



/*
map端的初始化方法当中获取缓存文件,一次性加载到map当中来
*/
@Override
public void setup(Context context) throws IOException, InterruptedException {
//这种方式获取所有的缓存文件
// URI[] cacheFiles1 = DistributedCache.getCacheFiles(context.getConfiguration());

URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());

// 获取map的缓存文件
FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
//打开缓存文件
FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0]));

//创建缓冲流对象进行读取
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));


while ((line = bufferedReader.readLine())!=null){
String[] split = line.split(",");
b_tab.put(split[0],split[1]+"\t"+split[2]+"\t"+split[3]);

}

fileSystem.close();
IOUtils.closeStream(bufferedReader);

}

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//这里读的是这个map task所负责的那一个切片数据(在hdfs上)
String[] fields = value.toString().split(",");

String orderId = fields[0];
String date = fields[1];
String pdId = fields[2];
String amount = fields[3];

//获取map当中的商品详细信息
String productInfo = b_tab.get(pdId);
context.write(new Text(orderId), new Text(date + "\t" + productInfo+"\t"+amount));


}
}

第二步:定义程序运行main方法

package demo14_join算法_reducejoin.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

/**
* @Auther: 传智新星
* @Date: 2019/11/18 11:46
* @Description:
*/
public class MapJoinDriver extends Configured implements Tool {


@Override
public int run(String[] args) throws Exception {


Configuration conf = new Configuration();

//设置缓存文件
DistributedCache.addCacheFile(new URI("hdfs://192.168.100.100/tmp/pdts.txt"),conf);

Job job = Job.getInstance(conf, "MapJoin");
job.setInputFormatClass(TextInputFormat.class);

TextInputFormat.addInputPath(job,new Path("E:\\2019大数据课程\\DeBug\\测试\\order\\素材\\4\\map端join\\map_join_iput"));

job.setMapperClass(JoinMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("E:\\2019大数据课程\\DeBug\\测试结果\\mapjoin2"));

return job.waitForCompletion(true)?0:1;

}

public static void main(String[] args) throws Exception {


int run = ToolRunner.run(new MapJoinDriver(), args);

System.out.println("运行状态:"+run);
}
}

        程序运行完后,我们进入写入的目录,打开文件

reduce端join与map端join算法实现_大数据_03

        同样结果正确,说明我们的map端的join算法算是成功实现了!!!

    那么本次的分享就到这里了,后续小菌还会为大家带来更多Hadoop的内容,喜欢的朋友们不要忘了关注小菌吖٩(๑>◡<๑)۶ 。



举报

相关推荐

0 条评论