0
点赞
收藏
分享

微信扫一扫

Hadoop_数据清洗示例

J简文 2022-04-18 阅读 10
java后端

Hadoop_数据清洗

示例(去除空行、开头为空格的数据):

  1. 原始数据:D:data estdata.txt

    zhangsan 500 450 jan
    zhangsan 550 450 feb
    lisi 210 150 jan
    lisi 200 150 feb
    zhangsan 400 150 march

    zhangsan 600 500 april
    lisi 190 150 april
    800 100 jan
    BLU 2000 200 feb
    lisi 110 10 may

  2. DataCleanMapper

    package com.blu.dataclean;

    import java.io.IOException;

    import org.apache.commons.lang3.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    public class DataCleanMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
    throws IOException, InterruptedException {
    String val = value.toString();
    String[] vals = val.split(" ");
    if(StringUtils.isEmpty(vals[0])) {
    //如果当前行的第一个值是空的,说明不是我们要的数据,直接返回。
    return;
    }
    context.write(value, NullWritable.get());
    }
    }

  3. DataCleanJob

    package com.blu.dataclean;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class DataCleanJob {
    public static void main(String[] args) throws Exception {
    Job job = Job.getInstance();
    job.setJarByClass(DataCleanJob.class);
    job.setMapperClass(DataCleanMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    //设置任务数为0
    job.setNumReduceTasks(0);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    boolean flag = job.waitForCompletion(true);
    System.exit(flag ?0 : 1);
    }
    }

  4. 运行参数:

    D:data estdata.txt D:dataoutput

  5. 运行结果:

    zhangsan 500 450 jan
    zhangsan 550 450 feb
    zhangsan 400 150 march
    zhangsan 600 500 april
    BLU 2000 200 feb
    lisi 110 10 may

举报

相关推荐

0 条评论