1、总体思路
(1)首先将要分析的csv文件对象price和id,定义成String类型,因为MapReduce的输入和输出都是k,v键值对的形式。
@Override
protected void reduce(Text key, Iterable<CsvBean> values, Context context) throws IOException, InterruptedException {
for (CsvBean value : values) {
context.write(key,value);
}
}
(2)所以我们这里将price封装成一个对象,将price的对象属性按照csv文件进行设置。
//4 封装到对象
outV.setId(id);
outV.setAge(price);
outK.set(price);
(3)封装了对象后,我们需要对定义输入和输出的类型,这里用的是重写序列化方法以及重写反序列化方法。
重写序列化方法:writeUTF方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(price);
}
重写反序列化方法:readUTF方法
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.price = dataInput.readUTF();
}
(4)把结果显示在文件中,重写 toString(),这里用 "," 隔开显示。
@Override
public String toString() {
return "," + id ;
}
(5)最后我们设置csv文件的输入输出路径(这里的路径设置可以改为自己放置csv文件的位置)
//6 设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("D:\\IDEA\\mapreduce\\steam\\input\\steam.csv"));
FileOutputFormat.setOutputPath(job,new Path("D:\\IDEA\\mapreduce\\steam\\output"));
2、代码展示
(1)封装对象CsvBean
package com.gis507.test.CsvSplit;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CsvBean implements Writable{
private String id;
private String price;
public CsvBean() {
}
public CsvBean(String id, String age) {
this.id = id;
this.price = age;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAge() {
return price;
}
public void setAge(String age) {
this.price = age;
}
@Override
public String toString() {
return "," + id ;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.price = dataInput.readUTF();
}
}
(2)Mapper类,把id和price读进来
package com.gis507.test.CsvSplit;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CsvSplitMapper extends Mapper<LongWritable, Text,Text,CsvBean> {
private Text outK = new Text();
private CsvBean outV = new CsvBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1获取一行转为String
String line = value.toString();
//2 按照逗号分割
String[] csvComments = line.split(",");
//3 获取需要的值
String id = csvComments[0];
String price = csvComments[2];
//4 封装到对象
outV.setId(id);
outV.setAge(price);
outK.set(price);
//5 写出
context.write(outK,outV);
}
}
(3)Reducer类
package com.gis507.test.CsvSplit;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CsvSplitReducer extends Reducer<Text,CsvBean, Text,CsvBean> {
@Override
protected void reduce(Text key, Iterable<CsvBean> values, Context context) throws IOException, InterruptedException {
for (CsvBean value : values) {
context.write(key,value);
}
}
}
(4)定义输入输出函数CsvSplitDriver
package com.gis507.test.CsvSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CsvSplitDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 关联Driver类
job.setJarByClass(CsvSplitDriver.class);
//3 关联Mapper和Reducer类
job.setMapperClass(CsvSplitMapper.class);
job.setReducerClass(CsvSplitReducer.class);
//4 设置Map的输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CsvBean.class);
//5 设置最终的输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CsvBean.class);
//6 设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("D:\\IDEA\\mapreduce\\steam\\input\\steam.csv"));
FileOutputFormat.setOutputPath(job,new Path("D:\\IDEA\\mapreduce\\steam\\output"));
//7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
3、查看输出结果
4、项目源代码
Mapreduce对csv文件数据进行价格排序处理