0
点赞
收藏
分享

微信扫一扫

大数据学习之路,MapReduce的实例(Wordcount)代码实现

陬者 2022-03-30 阅读 50

编写Mapper类,实现map功能

package MR_wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
/**
 *LongWritable :输入的key类型 偏移量
 * Text :输入的value类型 对应每一行的数据
 * Text :输出的key类型 word
 * IntWritable :输出的value类型ONE =1
 **/
public class MRMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    IntWritable ONE=new IntWritable(1);
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

        System.out.println("--------------------------Maper.setup------------------------");
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("--------------------------Maper.map------------------------");

        final  String line=value.toString().toLowerCase();
        final  String[] splits=line.split(" ");
        for (String word:splits){

            context.write(new Text(word),ONE);
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        System.out.println("--------------------------Maper.cleanup------------------------");
    }
}

编写MRReducer类,实现reduce功能

package MR_wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class MRReducer  extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        System.out.println("--------------------------Reducer.setup------------------------");
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        System.out.println("--------------------------Reducer.reduce------------------------");
        //初始count
        int count=0;

        //对于每个key进行聚合操作
        for(IntWritable value : values){
            count+=value.get();//IntWritable转换为int,用get()方法

        }
        //输出
        context.write(key,new IntWritable(count));
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        System.out.println("--------------------------Reducer.setup------------------------");
    }
}

编写MRDriver包,实现Driver 连接Mapper和Reducer

主要步骤:

1.获取job对象
2.设置class
3.设置Mapper和Reducer
4.设置Mapper阶段输出数据的kv类型,对应就是MRmapper的第三四个参数的类型
5.设置Reducer阶段输出数据的kv类型,对应就是MRReducer的第三四个参
6.设置输入输出的路径
7.提交job

具体代码实现:

package MR_wc;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MRDrive {
    public static void main(String[] args)  throws Exception{
        String input="data/wc.txt";
        String output="out_mr_wc";

        final Configuration configuration=new Configuration();

        //1.获取job对象
        final Job job=Job.getInstance(configuration);

        //2.设置class
        job.setJarByClass(MRDrive.class);

        //3.设置Mapper和Reducer
        job.setMapperClass(MRMapper.class);
        job.setReducerClass(MRReducer.class);

        //4.设置Mapper阶段输出数据的kv类型,对应就是MRmapper的第三四个参数的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5.设置Reducer阶段输出数据的kv类型,对应就是MRReducer的第三四个参数类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        //6.设置输入输出的路径
        FileInputFormat.setInputPaths(job,new Path(input));
        FileOutputFormat.setOutputPath(job,new Path(output));

        //7.提交job
        final  boolean result=job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}

运行程序查看结果,会生成以下文件:
在这里插入图片描述
打开part-r-00000,查看结果,这样一个简单的wordcount就成功了:
在这里插入图片描述

创建FileUtils类,实现自动删除输出文件作用

主要是为了让程序多次运行,具体代码如下

package MR_wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FileUtils {
    public  static  void deleteTarget(Configuration configuration, String output) throws  Exception{
            final FileSystem fileSystem=FileSystem.get(configuration);

            final Path path=new Path(output);

            if(fileSystem.exists(path)){
                fileSystem.delete(path,true);
            }
    }
}

并且把这个类方法加入到前面的Driver包里面,方可实现作用

在这里插入图片描述
这样一步可循环操作的wordcount就完成了!

举报

相关推荐

0 条评论