0
点赞
收藏
分享

微信扫一扫

2.MapReduce的使用——分析气象数据集


文章目录

  • ​​简介​​
  • ​​定义​​
  • ​​使用​​
  • ​​实现​​
  • ​​运行​​
  • ​​结果​​

简介

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)“和"Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组

定义

MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:

  1. MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。
  2. MapReduce是一个并行计算与运行软件框架(Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
  3. MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理

使用

(参考了Hadoop权威指南的说明)
MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段,每个阶段都以键值对作为输入和输出,类型由程序员来选择。对应其两个阶段,程序员需要编写两个函数:map函数和reduce函数。
这里,以书上的Hadoop权威指南上的案例“分析气象数据集”来说明这两个阶段该如何实现。

  • map阶段
  • map阶段的输入是NCDC(美国国家气候数据中心)的原始数据,这里选择文本格式作为输入格式,将数据集的每一行作为文本输入。这里的键是某一行起始位置相对于文件起始位置的偏移量,可以忽略。
  • 在本例中,我们只对年份和气温属性感兴趣,所以只需要取出这两个字段数据。
  • map函数的功能仅限于提取年份和气温信息,并将它们作为输出。
  • reduce阶段
  • map函数的输出经由MapReduce框架处理后,最后发送到reduce函数,这个处理过程基于键来对键值对进行排序和分组。
  • reduce函数要做的是遍历整个列表,并从中找出最大的读数。

实现

  1. 导入Hadoop的Maven依赖

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.3.0</version>
</dependency>

【建议】最好在resources目录下导入一个日志文件的配置文件log4j.properties,以便在控制台观察到运行结果

# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=DEBUG, A1

# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t]

  1. 在项目目录中新建一个input文件夹,然后导入NCDC原始数据sample.txt

0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

数据说明:
第15-19个字符是year
第45-50位是温度表示,+表示零上 -表示零下,且温度的值不能是9999,9999表示异常数据
第50位值只能是0、1、4、5、9几个数字

  1. MaxTemperatureMapper类

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private static final int MISSING = 9999;

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15,19);
int airTemperature;
if (line.charAt(87) == '+'){
airTemperature = Integer.parseInt(line.substring(88,92));
}else {
airTemperature = Integer.parseInt(line.substring(87,92));
}
String quality = line.substring(92,93);
if (airTemperature != MISSING && quality.matches("[01459]")){
context.write(new Text(year), new IntWritable(airTemperature));
}
}


}

  1. MaxTemperatureReduce类

public class MaxTemperatureReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue,value.get());
}
context.write(key,new IntWritable(maxValue));
}
}

  1. MaxTemperature类

public class MaxTempature {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2){
System.err.println("Usage:MaxTemperature <input path> <output>");
System.exit(-1);
}

Job job = new Job();
job.setJarByClass(MaxTempature.class);
job.setJobName("Max temperature");

FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

环境搭建完成后,项目目录如图所示(需要提前在out目录下build该项目的jar包,可以参考下一节的博客):

2.MapReduce的使用——分析气象数据集_apache

运行

在Windows环境下,IDEA不能直接运行MapReduce程序,需要配置一些信息。

结果

调试结果如下:

2.MapReduce的使用——分析气象数据集_hadoop_02


输出结果可以通过output目录下的输出文件查看,如图所示:

2.MapReduce的使用——分析气象数据集_apache_03


最后,该MapReduce程序的输出结果如图所示:

2.MapReduce的使用——分析气象数据集_apache_04


举报

相关推荐

0 条评论