0
点赞
收藏
分享

微信扫一扫

Hadoop学习:MapReduce实现两张表合并


✌✌✌古人有云,好记性不如烂笔头,千里之行,始于足下,每日千行代码必不可少,每日总结写一写,目标大厂,满怀希望便会所向披靡,哈哈哈!!!✌✌✌

Hadoop学习:MapReduce实现两张表合并_mapreduce

一、✌题目要求

record表:

ID

城市编号

空气指数

001

03

245

002

02

655

003

05

743

004

04

246

005

02

956

006

01

637

007

05

831

008

03

683

009

02

349

city表:

城市编号

城市名称

01

长沙

02

株洲

03

湘潭

04

怀化

05

岳阳

目标表:

ID

城市名称

空气指数

001

湘潭

245

002

株洲

655

003

岳阳

743

004

怀化

246

005

株洲

956

006

长沙

637

007

岳阳

831

008

湘潭

683

009

株洲

349

二、✌实现思想

我们将重复字段作为Map的Key,其他属性封装在Bean中作为Value
经过Map后,文件的格式为:

城市编号

ID

城市名称

空气指数

文件类型

1

006

637

record

1

长沙

city

2

002

655

record

2

005

956

record

2

009

349

record

2

株洲

city

3

001

245

record

3

008

683

record

3

湘潭

city

4

004

246

record

4

怀化

city

5

003

743

record

5

007

831

record

5

岳阳

city

1.将所有属性封装成一个对象,同时实现序列化
2.Map的输入格式应为LongWritable,Text
3.Map的输出格式为Text,Bean
4.Reduce输出格式为Bean,NullWritable

三、✌代码实现

1.✌Bean类

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Bean implements Writable {

//定义属性
private String id;
private String pid;
private int amount;
private String pname;
private String type;

//定义空参构造,为后面反射使用
public Bean() {
super();
}

//有参构造
public Bean(String id, String pid, int amount, String pname, String type) {
this.id = id;
this.pid = pid;
this.amount = amount;
this.pname = pname;
this.type = type;
}

//重写toString方法
@Override
public String toString() {
return id + "\t" + pname + "\t\t" + amount;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getPid() {
return pid;
}

public void setPid(String pid) {
this.pid = pid;
}

public int getAmount() {
return amount;
}

public void setAmount(int amount) {
this.amount = amount;
}

public String getPname() {
return pname;
}

public void setPname(String pname) {
this.pname = pname;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

//重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(type);
}

//反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
id = in.readUTF();
pid = in.readUTF();
amount = in.readInt();
pname = in.readUTF();
type = in.readUTF();
}

}

2.✌Map类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class Map extends Mapper<LongWritable, Text, Text, Bean> {

String fileName;

//获得文件的名称,因为在同一目录,方便再map阶段对不同表做不同操作
@Override
protected void setup(Context context) throws IOException, InterruptedException {

FileSplit split = (FileSplit) context.getInputSplit();

fileName = split.getPath().getName();

}

//map阶段,将文章内容封装为Bean对象
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

if (fileName.startsWith("record")) {

String[] words = line.split("\t");

context.write(new Text(words[1]), new Bean(words[0], words[1], Integer.parseInt(words[2]), "", "record"));

} else {

String[] words = line.split("\t");

context.write(new Text(words[0]), new Bean("", words[0], 0, words[1], "city"));

}

}
}

3.✌Reduce类

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class Reduce extends Reducer<Text, Bean, Bean, NullWritable> {

@Override
protected void reduce(Text key, Iterable<Bean> values, Context context) throws IOException, InterruptedException {

//存取多个Bean对象
ArrayList<Bean> list = new ArrayList<>();

Bean pd = new Bean();

//对不同表做不同操作,设置pname
for (Bean value : values) {

if ("record".equals(value.getType())) {

Bean temp = new Bean();

try {
BeanUtils.copyProperties(temp, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}

list.add(temp);

} else {
try {
BeanUtils.copyProperties(pd, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}

for (Bean bean : list) {
bean.setPname(pd.getPname());
context.write(bean, NullWritable.get());
}
}

}

4.✌Driver类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;

import java.io.IOException;

public class Driver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

//设置本地输入输出路径
args = new String[]{"D:/input", "D:/output"};

BasicConfigurator.configure();

//配置信息
Configuration conf = new Configuration();

//获取job对象
Job job = Job.getInstance(conf);

//关联相关类
job.setJarByClass(Driver.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

//设置map输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Bean.class);

//设置最终输出类型
job.setOutputKeyClass(Bean.class);
job.setOutputValueClass(NullWritable.class);

//设置文件路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//提交任务
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}


举报

相关推荐

0 条评论