目录
数据准备:
需求描述:
需求分析:
涉及技术:
代码设计:
FlowBean(java):优化
FlowCountSortMapper:
FlowCountSort :
FolowCountClient :
运行结果:
数据准备:
13480253104 180 180 360
13502468823 7335 110349 117684
13560436666 1116 954 2070
13560439658 2034 5892 7926
13602846565 1938 2910 4848
13660577991 6960 690 7650
13719199419 240 0 240
13726230503 2481 24681 27162
13726238888 2481 24681 27162
13760778710 120 120 240
13826544101 264 0 264
13922314466 3008 3720 6728
13925057413 11058 48243 59301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 3659 3538 7197
15920133257 3156 2936 6092
15989002119 1938 180 2118
18211575961 1527 2106 3633
18320173382 9531 2412 11943
84138413 4116 1432 5548
需求描述:
需求分析:
涉及技术:
代码设计:
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yanlong</groupId>
<artifactId>hadoop-04</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.4</version>
</dependency>
</dependencies>
</project>
FlowBean(java):优化
添加代码:(倒序)
package cn.itcast.mapreduce.flowcount_1;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable ,WritableComparable {
private long upFlow;
private long downFlow;
private long totalFlow;
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow, long totalFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.totalFlow = totalFlow;
}
//与map对应
public FlowBean(long upFlow, long downFlow) {
this.upFlow=upFlow;
this.downFlow=downFlow;
this.totalFlow=upFlow+downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getTotalFlow() {
return totalFlow;
}
public void setTotalFlow(long totalFlow) {
this.totalFlow = totalFlow;
}
/**
*
* @param dataOutput
* @throws IOException
*/
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(totalFlow);
}
public void readFields(DataInput dataInput) throws IOException {
this.upFlow= dataInput.readLong();
this.downFlow=dataInput.readLong();
this.totalFlow=dataInput.readLong();
}
//reduce需要使用
public void set(long upFlow, long downFlow) {
this.upFlow= upFlow;
this.downFlow=downFlow;
this.totalFlow=upFlow+downFlow;
}
/* @Override
public String toString() {
return "FlowBean{" +
"upFlow=" + upFlow +
", downFlow=" + downFlow +
", totalFlow=" + totalFlow +
'}';
}*/
@Override
public String toString() {
return upFlow+"\t" +downFlow+"\t" +totalFlow ;
}
//比较排序,倒序.(如果不重写一般默认的,是正序)
public int compareTo(Object o) {
FlowBean o1=(FlowBean) o;
return this.getTotalFlow()>o1.getTotalFlow()?-1:1;
}
}
FlowCountSortMapper:
package cn.itcast.mapreduce.flowcount_1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowCountSortMapper extends Mapper<LongWritable,Text,FlowBean,Text> {
FlowBean k = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
k.set(Long.parseLong(fields[1]),Long.parseLong(fields[2]));
v.set(fields[0]);
context.write(k,v);
}
}
FlowCountSortReducer:
package cn.itcast.mapreduce.flowcount_1;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountSortReducer extends Reducer<FlowBean,Text,Text,FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text phoNum = values.iterator().next();
context.write(phoNum,key);
}
}
FolowCountClient :
package cn.itcast.mapreduce.flowcount_1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountClient {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.配置参数,用于指定mr运行时相关的参数属性
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "wordcount");
//2.指定mr运行的类
job.setJarByClass(FlowCountClient.class);
//3.指定mr程序的m,r类
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);
//4指定map输出kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//指定ruduce输出的kv类型,也就是mr的最终输出结果
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定mr程序的输出路径
FileInputFormat.addInputPath(job,new Path("D:\\data\\test_1\\input_1"));
FileOutputFormat.setOutputPath(job,new Path("D:\\data\\test_1\\out_1"));
boolean result = job.waitForCompletion(true);//提交mr程序,并且开始任务执行监控的功能
//如果mr程序执行成功,退出0,否则1
System.exit(result?0:1);
}
}