0
点赞
收藏
分享

微信扫一扫

MapReduce中多表合并案例Reduce端表合并引发数据倾斜

工程与房产肖律师 2022-02-24 阅读 45


需求:

订单数据表t_order

MapReduce中多表合并案例Reduce端表合并引发数据倾斜_apache

商品信息表t_product

MapReduce中多表合并案例Reduce端表合并引发数据倾斜_java_02

将商品信息表中数据根据商品的id合并到订单数据表中

MapReduce中多表合并案例Reduce端表合并引发数据倾斜_hadoop_03

思路:

map中处理


  1. 获取输入文件的类型
  2. 获取输入数据
  3. 不同文件分别处理
  4. 封装bean对象输出
    MapReduce中多表合并案例Reduce端表合并引发数据倾斜_java_04

mapTask进行排序分区

MapReduce中多表合并案例Reduce端表合并引发数据倾斜_java_05

根据产品的id号进行排序

reduceTask

缓存订单数据集合和产品表,然后进行合并

MapReduce中多表合并案例Reduce端表合并引发数据倾斜_java_06

根据表信息创建bean对象

package com.zyd.table;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TableBean implements Writable{
private String order_id; //订单Id
private String p_id; //产品Id
private int amount; //产品数量


private String pname; //产品名称
private String flag; //表的标记

public TableBean() {
}

public TableBean(String order_id, String p_id, int amount, String pname, String flag) {
this.order_id = order_id;
this.p_id = p_id;
this.amount = amount;
this.pname = pname;
this.flag = flag;
}

public String getOrder_id() {
return order_id;
}

public void setOrder_id(String order_id) {
this.order_id = order_id;
}

public String getP_id() {
return p_id;
}

public void setP_id(String p_id) {
this.p_id = p_id;
}

public int getAmount() {
return amount;
}

public void setAmount(int amount) {
this.amount = amount;
}

public String getPname() {
return pname;
}

public void setPname(String pname) {
this.pname = pname;
}

public String getFlag() {
return flag;
}

public void setFlag(String flag) {
this.flag = flag;
}


@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(order_id);
out.writeUTF(p_id);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.order_id = in.readUTF();
this.p_id = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}

@Override
public String toString() {
return order_id + "\t"+ pname+ "\t"+
amount ;

}
}

属性 getset方法 空构造 带参构造 重写toString 序列化和反序列化

Mapper类

package com.zyd.table;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
* 以产品id为key tableBean对象为value
*/
public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> {
TableBean bean = new TableBean();
Text k = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1 获取输入文件的类型 是订单数据表还是商品信息表
//获取输入的切片 文件类型的切片
FileSplit inputSplit = (FileSplit) context.getInputSplit();
//自己切片的路径的名称
String name = inputSplit.getPath().getName();

//2 获取输入数据
String line = value.toString();

//3 不同文件分别处理
if (name.startsWith("order")){
//订单相关信息处理
//切割
String[] fields = line.split("\t");
//封装Bean对象 1001 01 1
bean.setOrder_id(fields[0]);
bean.setP_id(fields[1]);
bean.setAmount(Integer.parseInt(fields[2]));

bean.setPname("");
bean.setFlag("0");

//设置key值
k.set(fields[1]);
}else{
//产品信息表 01 小米
//切割
String[] fields = line.split("\t");
//封装bean对象
bean.setOrder_id("");
bean.setP_id(fields[0]);
bean.setAmount(0);
bean.setPname(fields[1]);
bean.setFlag("1");

//设置key值
k.set(fields[0]);
}


// 4 封装bean对象输出
context.write(k,bean);
}
}

Reducer类

package com.zyd.table;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;


public class TableReduce extends Reducer<Text,TableBean,TableBean,NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
// 准备存储数据的缓存
TableBean pdBean = new TableBean();
ArrayList<TableBean> orderBeans = new ArrayList<>();

//根据文件的不同分别处理数据
for(TableBean bean : values){
if ("0".equals(bean.getFlag())){
//订单表的数据处理
TableBean orBean = new TableBean();
try {
BeanUtils.copyProperties(orBean,bean);
} catch (Exception e) {
e.printStackTrace();
}
orderBeans.add(orBean);
}else{
//产品表的处理 01 小米
try {
BeanUtils.copyProperties(pdBean,bean);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//数据拼接
for (TableBean bean : orderBeans){
//更新产品名称字段
bean.setPname(pdBean.getPname());

//写出
context.write(bean,NullWritable.get());
}
}
}

Driver类

package com.zyd.table;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TableDriver {
public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取配置信息或者job对象的实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 指定本程序的jar包所在的本地路径
job.setJarByClass(TableDriver.class);

//3 指定本业务job需要使用的mapper和Reducer类
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReduce.class);

//4 指定mapper输出数据的k,v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);

//5 指定输出数据的k,v类型
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);

//6 指定job的输入原始文件目录和输出文件目录
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//7 将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0:1);
}
}

MapReduce中多表合并案例Reduce端表合并引发数据倾斜_apache_07

存在Reducer处理大量的数据,数据倾斜



举报

相关推荐

0 条评论