一、建立Maven工程 pom.xml文件的 dependencies内加入
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.2</version>
</dependency>
</dependencies>
二、导入hadoop配置文件
从已搭建的hadoop集群中下载 hadoopx.x.x/etc/hadoop core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml。将这四个文件放入到maven工程的resource目录中
三、编写代码
======================Mapper类=========================
package com.cg.family.map;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
/**
* @author zhihui.liu
*
* mapper实现类
*/
public class Mapper01 extends Mapper<Object, Text,Text, IntWritable> {
// 单词统计的value类型,默认给了一个1,因为每一个单词都出现了一次,最后会汇总累计和
private final static IntWritable one = new IntWritable(1);
// 出现次数统计时,每个单词和其数量,都是k-v形式存在,key用Text
private Text word = new Text();
/**
* 重写 map方法
*
* @param key 偏移量
* @param value 输入值
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 字符串解析
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
// 设置key,即单词
word.set(itr.nextToken());
// 一个单词 出现1次
context.write(word,one);
}
}
}
======================Reducer类=========================
package com.cg.family.reduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author zhihui.liu
*
* reducer实现类
*/
public class Reducer01 extends Reducer<Text, IntWritable, Text, IntWritable> {
// 单词累计数
private final static IntWritable result = new IntWritable(0);
/**
* 重写reduce方法,对前一步map计算结果求和
*
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 初始数 0
int sum = result.get();
// 累计计算
for (IntWritable i : values) {
sum += i.get();
}
// 将求和结果重新写入
result.set(sum);
// 返回性输出
context.write(key,result);
}
}
======================执行类=========================
package com.cg.family;
import com.cg.family.map.Mapper01;
import com.cg.family.reduce.Reducer01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
/**
* Hello MapReduce!
*/
public class App {
public static void main(String[] args) throws Exception {
// 配置信息
Configuration conf = new Configuration(true);
// 参数解析器 hadoop jar 后面接的参数,
// java命令参数 被解析器直接set到conf中,其他的执行参数被放到args中
GenericOptionsParser parser = new GenericOptionsParser(conf,args);
// 获取普通执行参数
String[] path = parser.getRemainingArgs();
// 构建任务
Job job = Job.getInstance(conf);
// jar报的主类
job.setJarByClass(App.class);
// 任务名
job.setJobName("wordcount");
// 设置输入
TextInputFormat.addInputPath(job, new Path(path[0]));
Path outfile = new Path(path[1]+"/" + System.currentTimeMillis()+"/1");
// 如果输出路径已经存在,则删除,hadoop有一规则,输出文件完整路径存在,则执行失败
if (outfile.getFileSystem(conf).exists(outfile)) outfile.getFileSystem(conf).delete(outfile, true);
// 设置输出
TextOutputFormat.setOutputPath(job, outfile);
// map计算的执行类
job.setMapperClass(Mapper01.class);
// reduce计算的执行类
job.setReducerClass(Reducer01.class);
// 输出信息的key类型,序列化使用
job.setMapOutputKeyClass(Text.class);
// 输出信息的value类型,序列化使用
job.setMapOutputValueClass(IntWritable.class);
// 执行任务
job.waitForCompletion(true);
}
}
四、上传jar包到服务器
通过maven install打成jar,上传到某一个台hadoop服务器,并在该服务器上执行如下命令
yarn jar HadoopMR-1.0-SNAPSHOT.jar com.cg.family.App /输入信息 /输出路径
输入路径通过 hdfs dfs -mkdir /xxx/xxx 可以去创建,
并通过hdfs dfs -put 文件信息 /xxx/xxx 把需要统计的文件上传到hdfs的对应目录下
五、结果查询
hadoop的命令控制台会有执行进度
hadoop服务器的IP:9870 可以查看到上传文件及 计算后的输出文件
yarn服务器的IP:8088 也可以查看到任务的上传和执行情况









