0
点赞
收藏
分享

微信扫一扫

自定义bean对象实现序列化接口(Writable)以及实现案例

吴陆奇 2022-02-24 阅读 36


  1. 自定义bean对象序列化传输必须注意

(1) 实现Writable接口

(2)反序列化时,需要反射调用空构造函数,所以必须有空构造

​ public FlowBean() { super(); } ​​(3)重写序列化方法
​ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } ​​(4)重写反序列化方法
​ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } ​

(5)反序列化的顺序和序列化的顺序完全一致

(6)要想把结果显示在文件中,需要重写toString(),可以用"\t"分开,以便后续用

(7)如果需要将自定义的bean放key中传输,则还要实现Comparable接口,因为MapReduce框中的shuffle过程一定会对key排序

​ @Override public int compareTo(FlowBean o) { // 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } ​

对象

package com.zyd.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Set;

import org.apache.hadoop.io.Writable;
/**
* Bean对象要需要实现Writable接口
* @author Administrator
*
*/
public class FlowBean implements Writable{

private long upFlow; //上行流量

private long downFlow; //下行流量

private long sumFlow; //总流量

//必须要有空参构造,为了后续反射用
public FlowBean() {
super();
}


public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow+downFlow;//总流量
}

//负责累加上行流量和下行流量
public void set(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow+downFlow;//总流量
}

//改造toString方法,方便后续处理数据
@Override
public String toString() {
return upFlow+"\t"+ downFlow+ "\t" + sumFlow;
}


//反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
//顺序一致
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();

}


//序列化方法
@Override
public void write(DataOutput out) throws IOException {
//顺序一致
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}


public long getUpFlow() {
return upFlow;
}


public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}


public long getDownFlow() {
return downFlow;
}


public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}


public long getSumFlow() {
return sumFlow;
}


public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}



//由于对象是作为value传输,所以不重写compareTo方法 key是手机号

}

Mapper类

package com.zyd.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//输入:行号 LongWritable 行内容Text 输出 key:手机号 Text value:FlowBean
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
FlowBean v = new FlowBean();
Text k = new Text();


@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
//1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
//1 获取一行
String line = value.toString();

//2 切割
String[] fields= line.split("\t");
//3 封装对象
//手机号
String phoneNum = fields[1];
//上行流量
long upFlow = Long.parseLong(fields[fields.length-3]);
//下行流量
long downFlow = Long.parseLong(fields[fields.length-2]);


k.set(phoneNum);

v.set(upFlow, downFlow);
//4 写出数据
context.write(k, v);
}
}

Reducer

package com.zyd.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

@Override
protected void reduce(Text key, Iterable<FlowBean> values,
Context context)
throws IOException, InterruptedException {
//1 累加求和 相同的求和操作
//对整个数据上行流量和下行流量的求和
long sum_upFlow = 0;
long sum_downFlow = 0;
for (FlowBean flowBean : values) {
sum_upFlow += flowBean.getUpFlow();
sum_downFlow += flowBean.getDownFlow();
}
//2. 输出
context.write(key, new FlowBean(sum_upFlow,sum_downFlow));
}
}

Driver

package com.zyd.flowsum;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 设置jar包路径
job.setJarByClass(FlowDriver.class);
//3 管理类mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4 设置mapper输出k,v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5 设置最终输出k,v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7 提交
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}

原数据

1363157993055 13565436666   C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15    1514  92054 200
1363157993055 13560436866 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1916 914 200
1363157993055 13568436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11616 95 200
1363157993055 13760436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11126 95420 200
1363157993055 13564436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11856 954 200
1363157993055 13561536666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11685 952 200
1363157993055 13560485666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1916 9504 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1616 1954 200
1363157993055 13560463666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1216 9354 200
1363157993055 13560488666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1616 9543 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1316 95420 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1316 95474 200
1363157993055 13560445666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 16616 95489 200
1363157993055 13560436856 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11136 9524 200
1363157993055 13560478666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11162 9554 200
1363157993055 13560445666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 16616 95489 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11165 9954 200
1363157993055 13566436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11126 9534 200
1363157993055 13460436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1162 9054 200
1363157993055 13560936666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11201 91254 200
1363157993055 13560445666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 16616 95489 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1616 1954 200
1363157993055 13560436866 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1916 952 200
1363157993055 13568436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11616 95 200
1363157993055 13760436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11126 9542 200
1363157993055 13560445666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 16616 95489 200
1363157993055 13564436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11856 95452 200



举报

相关推荐

0 条评论