0
点赞
收藏
分享

微信扫一扫

分布式计算框架Map/reduce

秦瑟读书 2022-02-09 阅读 80

简介:

MapReduce是一个基于集群的高性能并行计算平台,MapReduce是一个并行计算与运行的软件框架,MapReduce是一个并行程序设计模型与方法.
特点:

分布可靠,对数据集的操作分发给集群中的多个节点实现可靠性,每个节点周期性返回它完成的任务和最新的状态

②封装了实现细节,基于框架API编程,面向业务展开分布式编码

③提供跨语言编程的能力

MapReduce的主要功能:

1.1数据划分和计算任务调度

1.2数据/代码互相定位

1.3系统优化

1.4出错检测和恢复

MapReduce的运行流程:

 

由上图可以看到MapReduce执行下来主要包含这样几个步骤:

1) 首先正式提交作业代码,并对输入数据源进行切片

2) master调度worker执行map任务

3) worker当中的map任务读取输入源切片

4) worker执行map任务,将任务输出保存在本地

5) master调度worker执行reduce任务,reduce worker读取map任务的输出文件

6) 执行reduce任务,将任务输出保存到HDFS

运行流程详解

MAP:HDFS目录的数据输入进来,然后切块,将切好的块分给不同的计算机,各计算机将块按照本地规约分区,将不同的区按照key排序,然后将任务输出保存在本地.

Reduce:将数据从远程拷贝下来,然后按照key将数据合并并处理最后输出

Map阶段由一定数量的Map Task组成,流程如下:
■ 输入数据格式解析:InputFormat
■ 输入数据处理:Mapper
■ 数据分区:Partitioner
■ 数据按照key排序
■ 本地规约:Combiner(相当于local reducer,可选)
■ 将任务输出保存在本地
Reduce阶段由一定数量的Reduce Task组成,流程如下:
■ 数据远程拷贝
■ 数据按照key排序和文件合并merge
■ 数据处理:Reducer
■ 数据输出格式:OutputFormat
通常我们把从Mapper阶段输出数据到Reduce阶段的reduce计算之间的过程称之为shuffle

MapReduce Java API应用

1、MapReduce开发流程

➢ 搭建开发环境,参考HDFS环境搭建,基本一致
➢ 基于MapReduce框架编写代码,Map、Reduce、Driver三部分组成。
➢ 编译打包,将源代码打成的包和依赖jar包打成一个包
➢ 上传至运行环境
➢ 运行hadoop jar命令,现已由yarn jar替代,建议使用新命令提交执行
具体提交命令为:
yarn jar testhdfs-jar-with-dependencies.jar com.tianliangedu.driver.WordCount /tmp/tianliangedu/input /tmp/tianliangedu/output3
➢ 通过yarn web ui查看执行过程
➢ 查看执行结果

2、WordCount代码实现

Mapper:是MapReduce计算框架中Map过程的封装

Text:Hadoop对Java String类的封装,适用于Hadoop对文本字符串的处理

IntWritable:Hadoop对Java Integer类的封装,适用于Hadoop整型的处理

Context:Hadoop环境基于上下文的操作对象,如Map中key/value的输出、分布式缓存数据、分布式参数传递等

StringTokenizer:对String对象字符串的操作类,做基于空白字符的切分操作工具类
  • 2.1 Map类编写

  • package com.tianliangedu.mapper;
    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    public class MyTokenizerMapper extends
            Mapper<Object, Text, Text, IntWritable> {
        // 暂存每个传过来的词频计数,均为1,省掉重复申请空间
        private final static IntWritable one = new IntWritable(1);
        // 暂存每个传过来的词的值,省掉重复申请空间
        private Text word = new Text();
        // 核心map方法的具体实现,逐个<key,value>对去处理
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            // 用每行的字符串值初始化StringTokenizer
            StringTokenizer itr = new StringTokenizer(value.toString());
            // 循环取得每个空白符分隔出来的每个元素
            while (itr.hasMoreTokens()) {
                // 将取得出的每个元素放到word Text对象中
                word.set(itr.nextToken());
                // 通过context对象,将map的输出逐个输出
                context.write(word, one);
            }
        }
    }

    2.2 Reduce类编写

  • Reducer:是MapReduce计算框架中Reduce过程的封装

  • package com.tianliangedu.reducer;
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    //reduce类,实现reduce函数
    public class IntSumReducer extends
                    Reducer<Text, IntWritable, Text, IntWritable> {
             private IntWritable result = new IntWritable();
             //核心reduce方法的具体实现,逐个<key,List(v1,v2)>去处理
             public void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
                    //暂存每个key组中计算总和
                    int sum = 0;
                    //加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值
                    for (IntWritable val : values) {
                          //将key组中的每个词频数值sum到一起
                          sum += val.get();
                    }
                    //将该key组sum完成的值放到result IntWritable中,使可以序列化输出
                    result.set(sum);
                    //将计算结果逐条输出
                    context.write(key, result);
             }
       }

    2.3 Driver类编写

  • ➢ Configuration:与HDFS中的Configuration一致,负责参数的加载和传递
    ➢ Job:作业,是对一轮MapReduce任务的抽象,即一个MapReduce的执行全过程的管理类
    ➢ FileInputFormat:指定输入数据的工具类,用于指定任务的输入数据路径
    ➢ FileOutputFormat:指定输出数据的工具类,用于指定任务的输出数据路径
    package com.tianliangedu.driver;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import com.tianliangedu.mapper.MyTokenizerMapper;
    import com.tianliangedu.reducer.IntSumReducer;
    public class WordCountDriver {
       // 启动mr的driver方法
       public static void main(String[] args) throws Exception {
           // 得到集群配置参数
           Configuration conf = new Configuration();
           // 设置到本次的job实例中
           Job job = Job.getInstance(conf, "天亮WordCount");
           // 指定本次执行的主类是WordCount
          job.setJarByClass(WordCountDriver.class);
           // 指定map类
          job.setMapperClass(MyTokenizerMapper.class);
           // 指定combiner类,要么不指定,如果指定,一般与reducer类相同
          job.setCombinerClass(IntSumReducer.class);
           // 指定reducer类
          job.setReducerClass(IntSumReducer.class);
           // 指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(IntWritable.class);
           // 指定输入数据的路径
          FileInputFormat.addInputPath(job, new Path(args[0]));
           // 指定输出路径,并要求该输出路径一定是不存在的
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
           // 指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!
          System.exit(job.waitForCompletion(true) ? 0 : 1);
       }
    }

    2.4本地模拟分布式计算环境运行mapreduce

        鉴于远程运行进行代码测试的复杂性,以及其它新框架均开始支持本地local环境模拟分布式计算运行, 故mapreduce从2.x开始也已经支持本地环境

       具体做法请参见辅助资料集” 06-本地local环境模拟mapreduce并行计算的操作步骤”。

    2.5 Maven打包

    使用Maven命令,基于配置的Maven插件实现代码打包。

    2.6 上传到运行环境

    使用rz命令将打好的运行包上传到集群环境中。

    2.7 运行WordCount程序

    具体提交命令为:

    具体提交命令为:
    yarn jar testhdfs-jar-with-dependencies.jar com.tianliangedu.driver.WordCount /tmp/tianliangedu/input /tmp/tianliangedu/output3

    2.8 查看执行过程

    Web访问地址为:http://cluster1.hadoop:8088/ui2/#/yarn-apps/apps

  • 2.9 查看执行结果

    3、标准代码实现

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//启动mr的driver类
public class WordCountDriver {
       //map类,实现map函数
       public static class MyTokenizerMapper extends
                    Mapper<Object, Text, Text, IntWritable> {
             //暂存每个传过来的词频计数,均为1,省掉重复申请空间
             private final static IntWritable one = new IntWritable(1);
             //暂存每个传过来的词的值,省掉重复申请空间
             private Text word = new Text();
             //核心map方法的具体实现,逐个<key,value>对去处理
             public void map(Object key, Text value, Context context)
                          throws IOException, InterruptedException {
                    //用每行的字符串值初始化StringTokenizer
                    StringTokenizer itr = new StringTokenizer(value.toString());
                    //循环取得每个空白符分隔出来的每个元素
                    while (itr.hasMoreTokens()) {
                          //将取得出的每个元素放到word Text对象中
                          word.set(itr.nextToken());
                          //通过context对象,将map的输出逐个输出
                          context.write(word, one);
                    }
             }
       }
       //reduce类,实现reduce函数
       public static class IntSumReducer extends
                    Reducer<Text, IntWritable, Text, IntWritable> {
             private IntWritable result = new IntWritable();
             //核心reduce方法的具体实现,逐个<key,List(v1,v2)>去处理
             public void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
                    //暂存每个key组中计算总和
                    int sum = 0;
                    //加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值
                    for (IntWritable val : values) {
                          //将key组中的每个词频数值sum到一起
                          sum += val.get();
                    }
                    //将该key组sum完成的值放到result IntWritable中,使可以序列化输出
                    result.set(sum);
                    //将计算结果逐条输出
                    context.write(key, result);
             }
       }
       //启动mr的driver方法
       public static void main(String[] args) throws Exception {
             //得到集群配置参数
             Configuration conf = new Configuration();
             //设置到本次的job实例中
             Job job = Job.getInstance(conf, "天亮WordCount");
             //通过指定相关字节码对象,找到所属的主jar包
             job.setJarByClass(WordCountDriver.class);
             //指定map类
             job.setMapperClass(MyTokenizerMapper.class);
             //指定combiner类,要么不指定,如果指定,一般与reducer类相同
             job.setCombinerClass(IntSumReducer.class);
             //指定reducer类
             job.setReducerClass(IntSumReducer.class);
             //指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(IntWritable.class);
             //指定输入数据的路径
             FileInputFormat.addInputPath(job, new Path(args[0]));
             //指定输出路径,并要求该输出路径一定是不存在的
             FileOutputFormat.setOutputPath(job, new Path(args[1]));
             //指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!
             System.exit(job.waitForCompletion(true) ? 0 : 1);
       }
}

MapReduce Shell应用

1、MapReduce的二级命令

mapred称为一级命令,直接输入mapred回车,即可查看二级命令:

2、MapReduce的三级命令

输入一级命令mapred后,再任意输入一个二级命令,即可查看三级命令:

3、MapReduce shell应用

  • 查看当前正在执行的job任务

  • 先提交一个WordCount任务,然后使用mapred job -list查看任务列表

  • 终止(kill)一个任务的执行

由于某种原因,要立即终止某任务的执行,则使用mapred job -kill job-id。

构造场景:先提交一个WordCount job,然后通过kill job-id来终止任务:

  • 查看一个job的日志

使用mapred shell命令,通过job-id可以查看job的工作日志。

命令格式为:mapred job -logs job-id:

MapReduce技术特征
1、向“外”横向扩展,而非向“上”纵向扩展
➢ 集群的构建完全选用价格便宜、易于扩展的低端商用服务器,而非价格昂贵不易扩展的商用服务
➢ 大规模数据处理和大规模数据存储的需要,讲求集群综合能力,而非单台机器处理能力,横向增加机器节点数据量
2、失效被认为是常态
➢ 使用大量普通服务器,节点硬件和软件出错是常态
➢ 具备多种有效的错误检测和恢复机制,在某个计算节点失效后会自动转移到别的计算节点。某个任务节点失败后其他节点能够无缝接管失效节点的计算任务
➢ 当失效节点恢复后自动无缝加入集群,不需要管理员人工进行系统配置
3、移动计算,把处理向数据迁移(数据本地性)
➢ 采用代码/数据互定位的功能,计算和数据在同一个机器节点或者是同一个机架中,发挥数据本地化特点
➢ 可避免跨机器节点或是机架传输数据,提高运行效率
4、顺序处理数据、避免随机访问数据
➢ 磁盘的顺序访问远比随机访问快得多,因此MapReduce设计为面向顺序式大规模数据的磁盘访问处理
➢ 利用集群中的大量数据存储节点同时访问数据,实现面向大数据集批处理的高吞吐量的并行处理
5、推测执行
➢ 一个作业由若干个Map任务和Reduce任务构成,整个作业完成的时间取决于最慢的任务的完成时间。由于节点硬件、软件问题,某些任务可能运行很慢
➢ 采用推测执行机制,发现某个任务的运行速度远低于任务平均速度,会为慢的任务启动一个备份任务,同时运行。哪个先运行完,采用哪个结果。
6、平滑无缝的可扩展性
➢ 可弹性的增加或减少集群计算节点来调节计算能力
➢ 计算的性能随着节点数的增加保持接近线性程度的增长
7、为应用开发隐藏系统底层细节
➢ 并行编程有很多困难,需要考虑多线程中复杂繁琐的细节,诸如分布式存储管理、数据分发、数据通信和同步、计算结果收集等细节问题。
➢ MapReduce提供了一种抽象机制将程序员与系统层细节隔离开,程序员只需关注业务,其他具体执行交由框架处理即可。
举报

相关推荐

0 条评论