MapReduce序列化和反序列化
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
Hadoop的序列化格式:Writable
序列化在分布式环境的两大作用:进程间通信,永久存储。
Writable接口, 有两个方法分别为write和readFields,分别根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.
 MR的任意key必须实现WritableComparable接口 

MapReduce类型
MapReduce的数据出来模型比较简单,map和reduce函数的输入和输出都是k/v键值对。
Hadoop2.x中的MapReduce中的开发job中的setMapperClass和setReducerClass设置自己的函数,其中默认的类别分别为Mapper和Reducer,整理功能就是不做任何处理,对每行记录直接输出。
Map函数
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  /**
   * The <code>Context</code> MapContextImpl 来实现,context对象发送k/v
   */
  public abstract class Context
    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }
  
  /**
   * MapTask任务开始时仅被调用一次
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }
  /**
   * 每个inputsplit中的每个k/v都被调用
      */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }
  /**
   * MapTask任务结束后被调用一次
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
}Reduce函数
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  /**
   * The <code>Context</code> passed on to the {@link Reducer} implementations.
   */
  public abstract class Context 
    implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }
  /**
   * Called once at the start of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }
  /**
   * This method is called once for each key. Most applications will define
   * their reduce class by overriding this method. The default implementation
   * is an identity function.
   */
  @SuppressWarnings("unchecked")
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }
  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  /**
   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }
}MapReduce类型配置


重点说明几个选项:
l setPartitionerClass :在map阶段定义分区,默认HashPartitioner,主要可以让多个reduce同时执行
l setCombinerClass:在分区后的数据,通过Combiner函数,对相同key进行合并,减少网络传输
l setSortComparatorClass: Mapreduce默认是按照key排序,不对value排序,若涉及两个指标的排序,可以设置二次排序来完成
l setGroupingComparatorClass: 由于二次排序,map的输出key是组合键,但有时需要按照原来的key传递到对应的reduce上,需要重写分组,可以减少reduce的调用次数
 










