0
点赞
收藏
分享

微信扫一扫

一道Hadoop面试题MapReduce编程,oh no,用了一下午来源码实现


自定义 MR 实现如下逻辑

product_no lac_id moment start_time user_id county_id staytime city_id
13429100031 22554 8 2013-03-11 08:55:19.151754088 571 571 282 571
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 103 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100140 26642 9 2013-03-11 09:02:19.151754088 571 571 18 571
13429100082 22691 8 2013-03-11 08:57:32.151754088 571 571 287 571
13429100189 22558 8 2013-03-11 08:56:24.139539816 571 571 48 571
13429100349 22503 8 2013-03-11 08:54:30.152622440 571 571 211 571

字段解释:

product_no:用户手机号;

lac_id:用户所在基站;

start_time:用户在此基站的开始时间;

staytime:用户在此基站的逗留时间。

需求描述:

根据 lac_id 和 start_time 知道用户当时的位置,根据 staytime 知道用户各个基站的逗留时长。根据轨迹合

并连续基站的 staytime。最终得到每一个用户时间排序在每一个基站驻留时长。

期望输出举例:

13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 390 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571

分析上面的结果:

第一列升序,第四列时间降序。因此,首先需要将这两列抽取出来,然后自定义排序。

实现如下:

package FindFriend;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class StringComp2 {
final static String INPUT_PATH = "hdfs://master:8020/liguodong/test2";
final static String OUT_PATH = "hdfs://master:8020/liguodong/test2out";


public static void main(String[] args) throws IOException,
URISyntaxException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
final FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf);
if(fs.exists(new Path(OUT_PATH))){
fs.delete(new Path(OUT_PATH),true);
}

Job job = Job.getInstance(conf, "date sort");

job.setJarByClass(StringComp2.class);

job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(NewK2.class);
job.setMapOutputValueClass(Text.class);
//job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job,new Path(OUT_PATH));

System.exit(job.waitForCompletion(true)?0:1);
}

static class MyMapper extends Mapper<LongWritable, Text, NewK2, Text>{

@Override
protected void map(LongWritable k1, Text v1,
Context context)
throws IOException, InterruptedException {

//这里采用正则表达式抽取出了product_no 与 start_time列的数据。
Pattern pattern = Pattern.compile
("([\\d]{11})|([\\d]{4}-[\\d]{2}-[\\d]{2} [\\d]{2}:[\\d]{2}:[\\d]{2}.[\\d]{9})");
Matcher matcher = pattern.matcher(v1.toString());
matcher.find();
String str1= matcher.group();
matcher.find();
String str2= matcher.group();

final NewK2 k2 = new NewK2(str1, str2);

//System.err.println(stringBuilder);
context.write(k2, v1);
}

}

static class MyReducer extends Reducer<NewK2, Text, Text, NullWritable>{

@Override
protected void reduce(NewK2 k2, Iterable<Text> v2s, Context context)
throws IOException, InterruptedException {
for (Text v2 : v2s) {
context.write(v2,NullWritable.get());
}

}

}
static class NewK2 implements WritableComparable<NewK2>{
String first;
String second;

public NewK2(){}

public NewK2(String first, String second){
this.first = first;
this.second = second;
}


@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readUTF();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeUTF(second);
}

/**
* 当k2进行排序时,会调用该方法.
* 当第一列不同时,升序;当第一列相同时,第二列降序
*/
@Override
public int compareTo(NewK2 o) {
final int minus = compTo(this.first,o.first);
if(minus != 0){
return minus;
}
return -compTo(this.second,o.second);
}
//仿照JDK源码String类的compareTo方法进行实现,
//我发现直接使用String类的compareTo方法,并不能得到我想要的结果(第一列升序,第二列降序)。
public int compTo(String one,String another) {
int len = one.length();

char[] v1 = one.toCharArray();
char[] v2 = another.toCharArray();
int k = 0;
while (k < len) {
char c1 = v1[k];
char c2 = v2[k];
if (c1 != c2) {
return c1 - c2;
}
k++;
}
return 0;
}

@Override
public int hashCode() {
return this.first.hashCode()+this.second.hashCode();
}

@Override
public boolean equals(Object obj) {
if(!(obj instanceof NewK2)){
return false;
}
NewK2 oK2 = (NewK2)obj;
return (this.first==oK2.first)&&(this.second==oK2.second);
}
}
}

运行结果:

一道Hadoop面试题MapReduce编程,oh no,用了一下午来源码实现_apache



举报

相关推荐

0 条评论