0
点赞
收藏
分享

微信扫一扫

Hadoop序列化案例实操


Hadoop 序列化案例实操

1 需求与分析

统计每一个手机号耗费的总上行流量、下行流量、总流量

(1)输入数据

1 13736230513 192.196.100.1 www.dev1.com  2481  24681 200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.dev1.com 1527 2106 200
6 84188413 192.168.100.3 www.dev1.com 4116 1432 200

(2)输入数据格式:

7     13560436666    120.196.100.99        1116         954            200
id 手机号码 网络ip 上行流量 下行流量 网络状态码

(3)期望输出数据格式

13560436666         1116              954             2070
手机号码 上行流量 下行流量 总流量

(4)Map阶段

4.1 读一行数据,切分字段

7     13560436666    120.196.100.99        1116         954            200

4.2 抽取手机号,上行流量,下行流量

13560436666    1116 954

4.3 以手机号为key,bean对象为value输出,即​​context.write(手机号,bean对象)​​​ 4.4 bean对象能传输必须实现序列化接口
(5)Reduce阶段
5.1 累加上行流量和下行流量的总和

13560436666    1116  +   954  =2070
手机号, 上行流量,下行流量

序列化案例实操

(1)编写流量统计的Bean对象

package com.dev1.flowsum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

// 1 实现writable接口
public class FlowBean implements Writable{

private long upFlow;
private long downFlow;
private long sumFlow;

//2 反序列化时,需要反射调用空参构造函数,所以必须有
public FlowBean() {
super();
}

public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

//3 写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

//4 反序列化方法
//5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}

// 6 编写toString方法,方便后续打印到文本
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}

//此处省略getter/setter
}

(2)编写Mapper类

package com.dev1.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

FlowBean v = new FlowBean();
Text k = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 1 获取一行
String line = value.toString();

// 2 切割字段
String[] fields = line.split("\t");

// 3 封装对象
// 取出手机号码
String phoneNum = fields[1];

// 取出上行流量和下行流量
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);

k.set(phoneNum);
v.set(downFlow, upFlow);

// 4 写出
context.write(k, v);
}
}

(3)编写Reducer类

package com.dev1.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {

long sum_upFlow = 0;
long sum_downFlow = 0;

// 1 遍历所用bean,将其中的上行流量,下行流量分别累加
for (FlowBean flowBean : values) {
sum_upFlow += flowBean.getUpFlow();
sum_downFlow += flowBean.getDownFlow();
}

// 2 封装对象
FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);

// 3 写出
context.write(key, resultBean);
}
}

(4)编写Driver驱动类

package com.dev1.flowsum;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "e:/input/inputflow", "e:/output1" };

// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowsumDriver.class);

// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

提交到集群

hdfs dfs -mkdir -p /user/dev1/input1
hdfs dfs -put phone_data.txt /user/dev1/input1
hadoop jar ./flowsum.jar com.dev1.flowsum.FlowsumDriver /user/dev1/input1 /user/dev1/output1
hdfs dfs -cat /user/dev1/output1/*


举报

相关推荐

0 条评论