0
点赞
收藏
分享

微信扫一扫

需求:排序,倒序

JakietYu 2022-10-31 阅读 67


目录

 

​​数据准备:​​

​​需求描述:​​

​​需求分析:​​

​​涉及技术:​​

​​代码设计:​​

​​FlowBean(java):优化​​

​​FlowCountSortMapper: ​​

​​FlowCountSort :​​

​​FolowCountClient :​​

​​运行结果:​​

数据准备:

需求:排序,倒序_mapreduce

 

13480253104 180 180 360
13502468823 7335 110349 117684
13560436666 1116 954 2070
13560439658 2034 5892 7926
13602846565 1938 2910 4848
13660577991 6960 690 7650
13719199419 240 0 240
13726230503 2481 24681 27162
13726238888 2481 24681 27162
13760778710 120 120 240
13826544101 264 0 264
13922314466 3008 3720 6728
13925057413 11058 48243 59301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 3659 3538 7197
15920133257 3156 2936 6092
15989002119 1938 180 2118
18211575961 1527 2106 3633
18320173382 9531 2412 11943
84138413 4116 1432 5548

需求描述:

需求:排序,倒序_apache_02

需求分析:

需求:排序,倒序_mapreduce_03

 

需求:排序,倒序_mapreduce_04

 

 

需求:排序,倒序_mapreduce_05

涉及技术:

需求:排序,倒序_hadoop_06

 

需求:排序,倒序_hadoop_07

代码设计:

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.yanlong</groupId>
<artifactId>hadoop-04</artifactId>
<version>1.0-SNAPSHOT</version>

<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.4</version>
</dependency>
</dependencies>

</project>

FlowBean(java):优化

需求:排序,倒序_apache_08

 

 

需求:排序,倒序_hadoop_09

添加代码:(倒序)

 

需求:排序,倒序_hadoop_10

需求:排序,倒序_mapreduce_11

package cn.itcast.mapreduce.flowcount_1;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable ,WritableComparable {
private long upFlow;
private long downFlow;
private long totalFlow;

public FlowBean() {
}

public FlowBean(long upFlow, long downFlow, long totalFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.totalFlow = totalFlow;
}

//与map对应
public FlowBean(long upFlow, long downFlow) {
this.upFlow=upFlow;
this.downFlow=downFlow;
this.totalFlow=upFlow+downFlow;
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getTotalFlow() {
return totalFlow;
}

public void setTotalFlow(long totalFlow) {
this.totalFlow = totalFlow;
}

/**
*
* @param dataOutput
* @throws IOException
*/
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(totalFlow);
}

public void readFields(DataInput dataInput) throws IOException {
this.upFlow= dataInput.readLong();
this.downFlow=dataInput.readLong();
this.totalFlow=dataInput.readLong();
}

//reduce需要使用
public void set(long upFlow, long downFlow) {
this.upFlow= upFlow;
this.downFlow=downFlow;
this.totalFlow=upFlow+downFlow;
}


/* @Override
public String toString() {
return "FlowBean{" +
"upFlow=" + upFlow +
", downFlow=" + downFlow +
", totalFlow=" + totalFlow +
'}';
}*/
@Override
public String toString() {
return upFlow+"\t" +downFlow+"\t" +totalFlow ;
}

//比较排序,倒序.(如果不重写一般默认的,是正序)
public int compareTo(Object o) {
FlowBean o1=(FlowBean) o;
return this.getTotalFlow()>o1.getTotalFlow()?-1:1;
}


}

FlowCountSortMapper: 

需求:排序,倒序_apache_12

package cn.itcast.mapreduce.flowcount_1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountSortMapper extends Mapper<LongWritable,Text,FlowBean,Text> {

FlowBean k = new FlowBean();
Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();


String[] fields = line.split("\t");
k.set(Long.parseLong(fields[1]),Long.parseLong(fields[2]));
v.set(fields[0]);

context.write(k,v);
}
}

 

 

FlowCountSortReducer:

 

需求:排序,倒序_hadoop_13

需求:排序,倒序_hadoop_14

需求:排序,倒序_mapreduce_15

 

package cn.itcast.mapreduce.flowcount_1;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountSortReducer extends Reducer<FlowBean,Text,Text,FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text phoNum = values.iterator().next();
context.write(phoNum,key);
}
}

FolowCountClient :

需求:排序,倒序_apache_16

 

需求:排序,倒序_mapreduce_17

package cn.itcast.mapreduce.flowcount_1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowCountClient {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.配置参数,用于指定mr运行时相关的参数属性
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "wordcount");

//2.指定mr运行的类
job.setJarByClass(FlowCountClient.class);

//3.指定mr程序的m,r类
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);


//4指定map输出kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

//指定ruduce输出的kv类型,也就是mr的最终输出结果
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

//指定mr程序的输出路径
FileInputFormat.addInputPath(job,new Path("D:\\data\\test_1\\input_1"));
FileOutputFormat.setOutputPath(job,new Path("D:\\data\\test_1\\out_1"));


boolean result = job.waitForCompletion(true);//提交mr程序,并且开始任务执行监控的功能
//如果mr程序执行成功,退出0,否则1
System.exit(result?0:1);

}
}

运行结果:

需求:排序,倒序_apache_18

举报

相关推荐

0 条评论