代码
本来想改改框架的,但作业写完就不想动了,就这样吧~
```java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
public class Hw1Grp0 {
static class Join_params {
String r_path;
String s_path;
int r_key;
int s_key;
List<Integer> res_r, res_s;
Join_params(String[] args){
r_path=args[0].subSequence(2, args[0].length()).toString();
s_path=args[1].subSequence(2, args[1].length()).toString();
String[] tmp=args[2].split("join:R|=S");
r_key=Integer.valueOf(tmp[1]);
s_key=Integer.valueOf(tmp[2]);
res_r=new ArrayList<Integer>();
res_s=new ArrayList<Integer>();
String[] resStrings=args[3].split("res:|,");
for(String res:resStrings){
if(res.startsWith("R")){
res_r.add(Integer.valueOf(res.subSequence(1, res.length()).toString()));
}else if(!res.equals("")){
res_s.add(Integer.valueOf(res.subSequence(1, res.length()).toString()));
}
}
}
}
private static ArrayList<String> subSequence(String[] tuple,List<Integer> res){
ArrayList<String> s=new ArrayList<String>();
for(int i:res){
s.add(tuple[i]);
}
return s;
}
public static void main(String[] args) throws IOException {
System.out.println(System.getProperty("java.class.path"));
if (args.length < 4) {
System.out.println("Please run with params like: R=<file 1> S=<file 2> join:R2=S3 res:R4,S5");
System.exit(1);
}
//Parse the request parameters and store them in the inner class object
Join_params jps = new Join_params(args);
// hashJoin(jps);
// read form data
// Establish a connection with HDFS
Configuration conf = new Configuration();
///Create a connection with an R table
FileSystem r_fs = FileSystem.get(conf);
FSDataInputStream in_stream = r_fs.open(new Path(jps.r_path));
BufferedReader in_r = new BufferedReader(new InputStreamReader(in_stream));
String s;
HashMap<String, ArrayList<ArrayList<String>>> r_hash=new HashMap<String,ArrayList<ArrayList<String>>>();
//Traverse R and create a Hash Table (using the HashMap class)
while ((s=in_r.readLine())!=null) {
// System.out.println(s);
String[] tuple = s.split("\\|");
// String tmp = r_hash.get(tuple[jps.r_key]);
if(r_hash.containsKey(tuple[jps.r_key])){
r_hash.get(tuple[jps.r_key]).add(subSequence(tuple, jps.res_r));
}else{
ArrayList<ArrayList<String>> list=new ArrayList<ArrayList<String>>();
list.add(subSequence(tuple, jps.res_r));
r_hash.put(tuple[jps.r_key], list);
}
}
in_r.close();
r_fs.close();
//traverse S
FileSystem s_fs = FileSystem.get(conf);
BufferedReader in_s = new BufferedReader(new InputStreamReader(s_fs.open(new Path(jps.s_path))));
String table_name="Result";
Logger.getRootLogger().setLevel(Level.WARN);
// create table descriptor
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table_name));
// create column descriptor
HColumnDescriptor cf = new HColumnDescriptor("res");
htd.addFamily(cf);
// configure HBase
Configuration configuration = HBaseConfiguration.create();
HBaseAdmin hAdmin = new HBaseAdmin(configuration);
if (hAdmin.tableExists(table_name)) {
System.out.println("Table already exists");
}
else {
hAdmin.createTable(htd);
System.out.println("table "+table_name+ " created successfully");
}
hAdmin.close();
HTable table = new HTable(configuration,table_name);
// ArrayList<ArrayList<String>> result = new ArrayList<ArrayList<String>>();
while ((s=in_s.readLine())!=null) {
// System.out.println(s);
String[] tuple = s.split("\\|");
if(r_hash.containsKey(tuple[jps.s_key])){
String row_key=tuple[jps.s_key];
Put put= new Put(row_key.getBytes());
ArrayList<ArrayList<String>> reses_r = r_hash.get(row_key);
ArrayList<String> res_s = subSequence(tuple, jps.res_s);
int cnt=0;
for(ArrayList<String> res_r:reses_r){
int i=0;
for(String value:res_r){
put.add("res".getBytes(), ("R"+jps.res_r.get(i)).getBytes(), value.getBytes());
++i;
}
i=0;
for(String value:res_s){
put.add("res".getBytes(), ("S"+jps.res_s.get(i)).getBytes(), value.getBytes());
++i;
}
table.put(put);
put= new Put((row_key+"."+cnt).getBytes());
++cnt;
}
}
}
in_s.close();
s_fs.close();
table.close();
System.out.println("join successfully");
}
}