0
点赞
收藏
分享

微信扫一扫

Hadoop Wordcount案例用Java编写MapReduce

寒羽鹿 2022-03-30 阅读 70

1.Maven 本地环境搭建,请看上一篇文章

本地Maven环境配置

2.打开Ieda工具,新建Mavenpeoject

3.编写Mapper类

package WC;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//import org.w3c.dom.Text;

import java.io.IOException;
import java.util.Locale;

public class MR_Mapper extends Mapper<LongWritable , Text,Text, IntWritable> {
       IntWritable ONE = new IntWritable(1);
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        System.out.println("----------Mapper.setup-----------");
    }




    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("----------Mapper.Map-----------");

        //获取数据转成小写
        final String line = value.toString().toLowerCase();

        //按照分隔符进行拆分
        final String[] splits =line.split(" ");

        //将拆分后的splits数组输出
        for(String word :splits){
            context.write(new  Text(word),ONE);

        }



    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        System.out.println("----------Mapper.cleanup-----------");
    }


}

 4.编写Reduce类

package WC;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MRReudcer extends Reducer<Text,IntWritable,Text,IntWritable> {

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        System.out.println("----------Mapper.setup-----------");
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        System.out.println("----------Mapper.reduce-----------");
        //计数
        int count =0;

        //对于每个key进行聚合操作(加法)
        for (IntWritable value :values){
            count +=value.get();// Intwritable  转int   用.get方法
        }

        //输出
        context.write(key,new IntWritable(count));


    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        System.out.println("----------Mapper.cleanup-----------");
    }

}

5.编写删除文件类(目标文件),下次运行不需要手动删除

package WC;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;


public class FIleUntils {

    public static  void deleteTarget(Configuration configuration, String output) throws Exception{
                       final FileSystem fileSystem =FileSystem.get(configuration);

                       final Path path =new Path(output);

                       if(fileSystem.exists(path)){
                           fileSystem.delete(path,true);
                       }

    }
}

6.编写Driver类

package WC;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MRDRIVER {
    public static void main(String[] args) throws Exception {


           String input = "Data/wc.txt";
           String output = "out_mr_mc";

          //获取配置文件信息
           final Configuration configuration =new Configuration();

           //1 获取MRjob对象
        final Job  job =Job.getInstance(configuration);

           //删除目标文件夹
        FIleUntils.deleteTarget(configuration,output);

           //设置class
        job.setJarByClass(MRDRIVER.class);

           //设置Mapper  跟 Reduce
        job.setMapperClass(MR_Mapper.class);
        job.setReducerClass(MRReudcer.class);

          //4.设置Mapper阶段输出数据 kv
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

          //5.设置Redeuce的输出数据  kv
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

         //6.设置输入的输出的路劲
        FileInputFormat.setInputPaths(job,new Path(input));
        FileOutputFormat.setOutputPath(job,new Path(output));

         //7.提交job
        final boolean result = job.waitForCompletion(true);

        System.exit(result?0:1);

    }
}

7 配置logj.properties文件,让日志更详细

# Set root category priority to INFO and its only appender to CONSOLE.
#log4j.rootCategory=DEBUG, CONSOLE
log4j.rootLogger=DEBUG,A1

log4j.appender.A1= org.apache.log4j.ConsoleAppender

log4j.appender.A1.layout=org.apache.log4j.PatternLayout

log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n 

8运行Driver

 

 9 相应的路径出现了目标文件,完成

举报

相关推荐

0 条评论