编写Mapper类,实现map功能
package MR_wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
*LongWritable :输入的key类型 偏移量
* Text :输入的value类型 对应每一行的数据
* Text :输出的key类型 word
* IntWritable :输出的value类型ONE =1
**/
public class MRMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
IntWritable ONE=new IntWritable(1);
@Override
protected void setup(Context context) throws IOException, InterruptedException {
System.out.println("--------------------------Maper.setup------------------------");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("--------------------------Maper.map------------------------");
final String line=value.toString().toLowerCase();
final String[] splits=line.split(" ");
for (String word:splits){
context.write(new Text(word),ONE);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
System.out.println("--------------------------Maper.cleanup------------------------");
}
}
编写MRReducer类,实现reduce功能
package MR_wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MRReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
System.out.println("--------------------------Reducer.setup------------------------");
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
System.out.println("--------------------------Reducer.reduce------------------------");
//初始count
int count=0;
//对于每个key进行聚合操作
for(IntWritable value : values){
count+=value.get();//IntWritable转换为int,用get()方法
}
//输出
context.write(key,new IntWritable(count));
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
System.out.println("--------------------------Reducer.setup------------------------");
}
}
编写MRDriver包,实现Driver 连接Mapper和Reducer
主要步骤:
1.获取job对象
2.设置class
3.设置Mapper和Reducer
4.设置Mapper阶段输出数据的kv类型,对应就是MRmapper的第三四个参数的类型
5.设置Reducer阶段输出数据的kv类型,对应就是MRReducer的第三四个参
6.设置输入输出的路径
7.提交job
具体代码实现:
package MR_wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRDrive {
public static void main(String[] args) throws Exception{
String input="data/wc.txt";
String output="out_mr_wc";
final Configuration configuration=new Configuration();
//1.获取job对象
final Job job=Job.getInstance(configuration);
//2.设置class
job.setJarByClass(MRDrive.class);
//3.设置Mapper和Reducer
job.setMapperClass(MRMapper.class);
job.setReducerClass(MRReducer.class);
//4.设置Mapper阶段输出数据的kv类型,对应就是MRmapper的第三四个参数的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置Reducer阶段输出数据的kv类型,对应就是MRReducer的第三四个参数类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.设置输入输出的路径
FileInputFormat.setInputPaths(job,new Path(input));
FileOutputFormat.setOutputPath(job,new Path(output));
//7.提交job
final boolean result=job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
运行程序查看结果,会生成以下文件:
打开part-r-00000,查看结果,这样一个简单的wordcount就成功了:
创建FileUtils类,实现自动删除输出文件作用
主要是为了让程序多次运行,具体代码如下
package MR_wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class FileUtils {
public static void deleteTarget(Configuration configuration, String output) throws Exception{
final FileSystem fileSystem=FileSystem.get(configuration);
final Path path=new Path(output);
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
}
}
并且把这个类方法加入到前面的Driver包里面,方可实现作用
这样一步可循环操作的wordcount就完成了!