0
点赞
收藏
分享

微信扫一扫

mapreduce 去重的问题怎么解决

mapreduce 去重的问题怎么解决?


 john 89


 tom 100


 mary 100


 mary 200


 tom 20


———–


我刚学mapreduce,正在练习,上面这个我计算了很久也不对,就是对第一列去重,去重后应该是3


如果用mapreduce计算成功后,part-00000 的文件内容 是:


请问下,这个mapreduce怎么写啊?



map按第一列为key,value无所谓         
reduce class中初始化一个计数器         
每个reduce方法中计数器每次加一         
reduce 的cleanup方法中commit计数器就可以了

map 知道怎么写了,那reduce的具体怎么写啊?

直接一个Map,在Map里面定义一个全局的HashSet,map方法里面把key加入进去,cleanup方法里面把结果写入就行了。

直接一个Map,在Map里面定义一个全局的HashSet,map方法里面把key加入进去,cleanup方法里面把结果写入就行了。         


只用map不可能解决这个问题 

如果在不同的map中都用同一个key,怎么解决? 


必须用reduce去group后的key才能得到去重效果

直接一个Map,在Map里面定义一个全局的HashSet,map方法里面把key加入进去,cleanup方法里面把结果写入就行了。                    
只用map不可能解决这个问题          
如果在不同的map中都用同一个key,怎么解决?          

必须用reduce去group后的key才能得到去重效果         

嗯,对,没想那么仔细,谢谢指正。

public static class Map extends Mapper<LongWritable, Text, Text, Text> { 

                public void map(LongWritable key, Text value, Context context) 

                                throws IOException, InterruptedException { 

                        String line = value.toString(); 

                        try { 

                                String[] lineSplit = line.split("\t"); 

                                context.write(new Text(lineSplit[0]), new Text("")); 

                            } catch (java.lang.ArrayIndexOutOfBoundsException e) { 

                                context.getCounter(Counter.LINESKIP).increment(1); 

                                return; 

                        } 

                } 

        } 


        public static class Reduce extends Reducer<Text, Text, Text, Text> { 

                private Set<String> count = new HashSet<String>(); 


                public void reduce(Text key, Iterable<Text> values, Context context) 

                                throws IOException, InterruptedException { 

                      for(Text value:values){ 

                             count.add(value.toString()); 

                     } 

                        context.write(key, new Text("")); 

                } 

        } 

-------------------------  这个问题纠结我2周了,这个方面的学习资料太少了,我的map和reduce是这样写的,但是数据量大一些,就会内存溢出,我想我这个思路是错误的 

        你说的  “必须用reduce去group后的key才能得到去重效果 ”,这个 map和reduce是具体怎么写的啊?

public static class Map extends Mapper<LongWritable, Text, Text, Text> {          

                public void map(LongWritable key, Text value, Context context)          

                                throws IOException, InterruptedException {          

                        String line = value.toString();          

                        try {          

                                String[] lineSplit = line.split("\t");          

                                context.write(new Text(lineSplit[0]), new Text(""));          

                             context.write(new Text("uniq") ,new Text(lineSplit[0]) );          


                            } catch (java.lang.ArrayIndexOutOfBoundsException e) {          

                                context.getCounter(Counter.LINESKIP).increment(1);          

                                return;          

                        }          

                }          

        }          


        public static class Reduce extends Reducer<Text, Text, Text, Text> {          

                private Set<String> count = new HashSet<String>();          


                public void reduce(Text key, Iterable<Text> values, Context context)          

                                throws IOException, InterruptedException {          

                      for(Text value:values){          

                             count.add(value.toString());          

                     }          

                        context.write("uniq", new Text(count.size()+""));          

                }          

        }          

-------------------------  这个问题纠结我2周了,这个方面的学习资料太少了,我的map和reduce是这样写的,但是数据量大一些,就会内存溢出,我想我这个思路是错误的          

        你说的  “必须用reduce去group后的key才能得到去重效果 ”,这个 map和reduce是具体怎么写的啊?         
    -------------刚才写的mapreduce错了,以这个为准

map按第一列为key,value无所谓          
reduce class中初始化一个计数器          
每个reduce方法中计数器每次加一          
reduce 的cleanup方法中commit计数器就可以了         

  谢谢了,请教下,你说的这个map我知道怎么写了,但是这个reduce怎么写啊?

reduce阶段只用一个计数器就行了

import                  java.io.IOException;                

                                  

                 import                  org.apache.hadoop.conf.Configuration;                

                 import                  org.apache.hadoop.fs.Path;                

                 import                  org.apache.hadoop.io.LongWritable;                

                 import                  org.apache.hadoop.io.NullWritable;                

                 import                  org.apache.hadoop.io.Text;                

                 import                  org.apache.hadoop.mapreduce.Job;                

                 import                  org.apache.hadoop.mapreduce.Mapper;                

                 import                  org.apache.hadoop.mapreduce.Reducer;                

                 import                  org.apache.hadoop.mapreduce.lib.input.FileInputFormat;                

                 import                  org.apache.hadoop.mapreduce.lib.input.TextInputFormat;                

                 import                  org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;                

                                  

                 public                  class                  wzl189_distinct {                

                                  public                  static                  class                  MyMapper                  extends                

                                  Mapper<Object, Text, Text, NullWritable> {                

                                  

                                  Text outKey =                  new                  Text();                

                                  

                                  @Override                

                                  public                  void                  map(Object key, Text value, Context context)                

                                  throws                  IOException, InterruptedException {                

                                  

                                  String tmp[] = value.toString().split(                 " "                 );                

                                  if                  (tmp.length !=                  2                 )                

                                  return                 ;                

                                  outKey.set(tmp[                 0                 ]);                

                                  context.write(outKey, NullWritable.get());                

                                  

                                  }                

                                  }                

                                  

                                  public                  static                  class                  MyReducer                  extends                

                                  Reducer<Text, NullWritable, LongWritable, NullWritable> {                

                                  

                                  long                  myCount = 0l;                

                                  

                                  @Override                

                                  public                  void                  reduce(Text key, Iterable<NullWritable> values,                

                                  Context context)                  throws                  IOException, InterruptedException {                

                                  ++myCount;                

                                  }                

                                  

                                  @Override                

                                  public                  void                  cleanup(Context context)                  throws                  IOException,                

                                  InterruptedException {                

                                  context.write(                 new                  LongWritable(myCount), NullWritable.get());                

                                  };                

                                  }                

                                  

                                  public                  static                  void                  main(String[] args)                  throws                  Exception {                

                                  Configuration conf =                  new                  Configuration();                

                                  if                  (args.length !=                  2                 ) {                

                                  System.err.println(                 "Usage: <in> <out>"                 );                

                                  System.exit(                 2                 );                

                                  }                

                                  

                                  conf.set(                 "mapred.child.java.opts"                 ,                  "-Xmx350m -Xmx1024m"                 );                

                                  

                                  @SuppressWarnings                 (                 "deprecation"                 )                

                                  Job job =                  new                  Job(conf,                  "wzl189_distinct"                 );                

                                  job.setNumReduceTasks(                 1                 );                

                                  job.setInputFormatClass(TextInputFormat.                 class                 );                

                                  job.setJarByClass(wzl189_distinct.                 class                 );                

                                  job.setMapperClass(MyMapper.                 class                 );                

                                  

                                  job.setMapOutputKeyClass(Text.                 class                 );                

                                  job.setMapOutputValueClass(NullWritable.                 class                 );                

                                  

                                  job.setReducerClass(MyReducer.                 class                 );                

                                  job.setOutputKeyClass(Text.                 class                 );                

                                  job.setOutputValueClass(NullWritable.                 class                 );                

                                  

                                  FileInputFormat.addInputPath(job,                  new                  Path(args[                 0                 ]));                

                                  FileOutputFormat.setOutputPath(job,                  new                  Path(args[                 1                 ]));                

                                  System.exit(job.waitForCompletion(                 true                 ) ?                  0                  :                  1                 );                

                                  }                

                 }

import                   java.io.IOException;                 

                                    

                  import                   org.apache.hadoop.conf.Configuration;                 

                  import                   org.apache.hadoop.fs.Path;                 

                  import                   org.apache.hadoop.io.LongWritable;                 

                  import                   org.apache.hadoop.io.NullWritable;                 

                  import                   org.apache.hadoop.io.Text;                 

                  import                   org.apache.hadoop.mapreduce.Job;                 

                  import                   org.apache.hadoop.mapreduce.Mapper;                 

                  import                   org.apache.hadoop.mapreduce.Reducer;                 

                  import                   org.apache.hadoop.mapreduce.lib.input.FileInputFormat;                 

                  import                   org.apache.hadoop.mapreduce.lib.input.TextInputFormat;                 

                  import                   org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;                 

                                    

                  public                   class                   wzl189_distinct {                 

                                        public                   static                   class                   MyMapper                   extends                 

                                                Mapper<Object, Text, Text, NullWritable> {                 

                                    

                                            Text outKey =                   new                   Text();                 

                                    

                                            @Override                 

                                            public                   void                   map(Object key, Text value, Context context)                 

                                                    throws                   IOException, InterruptedException {                 

                                    

                                                String tmp[] = value.toString().split(                  " "                  );                 

                                                if                   (tmp.length !=                   2                  )                 

                                                    return                  ;                 

                                                outKey.set(tmp[                  0                  ]);                 

                                                context.write(outKey, NullWritable.get());                 

                                    

                                            }                 

                                        }                 

                                    

                                        public                   static                   class                   MyReducer                   extends                 

                                                Reducer<Text, NullWritable, LongWritable, NullWritable> {                 

                                    

                                            long                   myCount = 0l;                 

                                    

                                            @Override                 

                                            public                   void                   reduce(Text key, Iterable<NullWritable> values,                 

                                                    Context context)                   throws                   IOException, InterruptedException {                 

                                                ++myCount;                 

                                            }                 

                                    

                                            @Override                 

                                            public                   void                   cleanup(Context context)                   throws                   IOException,                 

                                                    InterruptedException {                 

                                                context.write(                  new                   LongWritable(myCount), NullWritable.get());                 

                                            };                 

                                        }                 

                                    

                                        public                   static                   void                   main(String[] args)                   throws                   Exception {                 

                                            Configuration conf =                   new                   Configuration();                 

                                            if                   (args.length !=                   2                  ) {                 

                                                System.err.println(                  "Usage: <in> <out>"                  );                 

                                                System.exit(                  2                  );                 

                                            }                 

                                    

                                            conf.set(                  "mapred.child.java.opts"                  ,                   "-Xmx350m -Xmx1024m"                  );                 

                                    

                                            @SuppressWarnings                  (                  "deprecation"                  )                 

                                            Job job =                   new                   Job(conf,                   "wzl189_distinct"                  );                 

                                            job.setNumReduceTasks(                  1                  );                 

                                            job.setInputFormatClass(TextInputFormat.                  class                  );                 

                                            job.setJarByClass(wzl189_distinct.                  class                  );                 

                                            job.setMapperClass(MyMapper.                  class                  );                 

                                    

                                            job.setMapOutputKeyClass(Text.                  class                  );                 

                                            job.setMapOutputValueClass(NullWritable.                  class                  );                 

                                    

                                            job.setReducerClass(MyReducer.                  class                  );                 

                                            job.setOutputKeyClass(Text.                  class                  );                 

                                            job.setOutputValueClass(NullWritable.                  class                  );                 

                                    

                                            FileInputFormat.addInputPath(job,                   new                   Path(args[                  0                  ]));                 

                                            FileOutputFormat.setOutputPath(job,                   new                   Path(args[                  1                  ]));                 

                                            System.exit(job.waitForCompletion(                  true                  ) ?                   0                   :                   1                  );                 

                                        }                 

                  }                 





reduce阶段只用一个计数器就行了         

太感谢了,你了解这么多啊,我都搞了2周,没有结果,想再请教最后一个问题: 

假如 第一列是姓名,第二列是班级(先不管我这个需求是否合理) 

 john 100 

 john 100 

 mary 100 

 mary 200 

 tom  200 


想统计处如下结果,就是按班级人数去重 

100 2 

200 2 


这个mapreduce怎么写啊?  望高手最后再解答下,万分感谢了。

map 输出key 用 班级 + 分隔符 + 姓名         
重写 grouping 实现二次排序,如果reduce num > 1 还需要重写 partition         
reduce略作修改,增个姓名变量 ,比较当前姓名是否和前一个姓名是否一致,如果不一致 计数器+=1         

代码就不贴了,LZ多思考一下,这种简单的MR不难解决


举报

相关推荐

0 条评论