一.在IDEA下编写java程序实现hadoop的wordcount操作。
Mapper代码:
package MapReduce_wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Locale;
public class MRMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
IntWritable ONE = new IntWritable(1);
@Override
protected void setup(Context context) throws IOException, InterruptedException {
System.out.println("-----------setup-----------");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("-----------map-------------------");
//获取内容并转为小写
final String line = value.toString().toLowerCase(Locale.ROOT);
//按照分隔符进行拆分
final String[] splits = line.split(" ");
//输出
for (String word:splits){
context.write(new Text(word),ONE);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
System.out.println("--------------cleanup-------------------");
}
}
2.Reduce代码:
package MapReduce_wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 功能:从Map端将数据传过来进行对数据的统计
**/
public class MRReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
System.out.println("------------------setup--------------------");
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
System.out.println("--------------------reduce----------");
//初始count
int count = 0;
//对于key进行聚合操作
for (IntWritable value : values){
count+=value.get(); //IntWritable转Int,用get()方法
}
//输出
context.write(key,new IntWritable(count));
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
System.out.println("------------cleanup-------------------");
}
}
编写测试类(Driver):
package MapReduce_wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRDriver {
public static void main(String[] args) throws Exception {
String input = "data/wc.txt";
String output = "out_mr_wc";
final Configuration configuration = new Configuration();
//获取job对象
final Job job = Job.getInstance(configuration);
//设置class
job.setJarByClass(MRDriver.class);
//设置Mapper和Reduce
job.setMapperClass(MRMapper.class);
job.setReducerClass(MRReducer.class);
//设置Mapper段的数字类型,对应的就是MRMapper的第三四个参数的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置Reduce段的数字类型,对应的就是MapReduce的第三四个参数的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入输出的路径
FileInputFormat.setInputPaths(job,new Path(input));
FileOutputFormat.setOutputPath(job,new Path(output));
//提交job
final boolean result = job.waitForCompletion(true);
System.exit(result ? 0 :1 );
}
}
框架:
运行结果: