0
点赞
收藏
分享

微信扫一扫

大数据课作业-HashJoin

刘员外__ 2022-04-04 阅读 68

代码

本来想改改框架的,但作业写完就不想动了,就这样吧~


```java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class Hw1Grp0 {
    static class Join_params {
        String r_path;
        String s_path;
        int r_key;
        int s_key;
        List<Integer> res_r, res_s;

        Join_params(String[] args){
            r_path=args[0].subSequence(2, args[0].length()).toString();
            s_path=args[1].subSequence(2, args[1].length()).toString();

            String[] tmp=args[2].split("join:R|=S");
            r_key=Integer.valueOf(tmp[1]);
            s_key=Integer.valueOf(tmp[2]);           

            res_r=new ArrayList<Integer>();
            res_s=new ArrayList<Integer>();
            String[] resStrings=args[3].split("res:|,");
            for(String res:resStrings){
                if(res.startsWith("R")){
                    res_r.add(Integer.valueOf(res.subSequence(1, res.length()).toString()));
                }else if(!res.equals("")){
                    res_s.add(Integer.valueOf(res.subSequence(1, res.length()).toString()));
                }
            }           
        }
    }


    private static ArrayList<String> subSequence(String[] tuple,List<Integer> res){
        ArrayList<String> s=new ArrayList<String>();
        for(int i:res){
            s.add(tuple[i]);
        }
        return s;       
    }


    public static void main(String[] args) throws IOException {
        System.out.println(System.getProperty("java.class.path"));
        if (args.length < 4) {
            System.out.println("Please run with params like: R=<file 1> S=<file 2> join:R2=S3 res:R4,S5");
            System.exit(1);
        }

        //Parse the request parameters and store them in the inner class object
        Join_params jps = new Join_params(args);

        // hashJoin(jps);

       // read form data
        // Establish a connection with HDFS
        Configuration conf = new Configuration();
        ///Create a connection with an R table
        FileSystem r_fs = FileSystem.get(conf);
        FSDataInputStream in_stream = r_fs.open(new Path(jps.r_path));
        BufferedReader in_r = new BufferedReader(new InputStreamReader(in_stream));
        String s;
        HashMap<String, ArrayList<ArrayList<String>>> r_hash=new HashMap<String,ArrayList<ArrayList<String>>>();
        //Traverse R and create a Hash Table (using the HashMap class)
        while ((s=in_r.readLine())!=null) {
            // System.out.println(s);
            String[] tuple =  s.split("\\|");
            //  String tmp = r_hash.get(tuple[jps.r_key]);
            if(r_hash.containsKey(tuple[jps.r_key])){
                r_hash.get(tuple[jps.r_key]).add(subSequence(tuple, jps.res_r));
            }else{
                ArrayList<ArrayList<String>> list=new ArrayList<ArrayList<String>>();
                list.add(subSequence(tuple, jps.res_r));
                r_hash.put(tuple[jps.r_key], list);
            }            
        }
        in_r.close();
        r_fs.close();

        //traverse S
        FileSystem s_fs = FileSystem.get(conf);
        BufferedReader in_s = new BufferedReader(new InputStreamReader(s_fs.open(new Path(jps.s_path))));

        String table_name="Result";
        Logger.getRootLogger().setLevel(Level.WARN);
        // create table descriptor
        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table_name));
        // create column descriptor
        HColumnDescriptor cf = new HColumnDescriptor("res");
        htd.addFamily(cf);
        // configure HBase
        Configuration configuration = HBaseConfiguration.create();
        HBaseAdmin hAdmin = new HBaseAdmin(configuration);
        if (hAdmin.tableExists(table_name)) {
            System.out.println("Table already exists");
        }
        else {
            hAdmin.createTable(htd);
            System.out.println("table "+table_name+ " created successfully");
        }
        hAdmin.close();
        HTable table = new HTable(configuration,table_name);

        // ArrayList<ArrayList<String>> result = new ArrayList<ArrayList<String>>(); 
        while ((s=in_s.readLine())!=null) {
            // System.out.println(s);
            String[] tuple =  s.split("\\|");
            if(r_hash.containsKey(tuple[jps.s_key])){
                String row_key=tuple[jps.s_key];
                Put put= new Put(row_key.getBytes());
                ArrayList<ArrayList<String>> reses_r = r_hash.get(row_key);
                ArrayList<String> res_s = subSequence(tuple, jps.res_s);
                int cnt=0;
                for(ArrayList<String> res_r:reses_r){
                    int i=0;
                    for(String value:res_r){
                        put.add("res".getBytes(), ("R"+jps.res_r.get(i)).getBytes(), value.getBytes());
                        ++i;
                    }
                    i=0;
                    for(String value:res_s){
                        put.add("res".getBytes(), ("S"+jps.res_s.get(i)).getBytes(), value.getBytes());
                        ++i;
                    }
                    table.put(put);
                    put= new Put((row_key+"."+cnt).getBytes());
                    ++cnt;
                }           
            }
        }
        in_s.close();
        s_fs.close();

        table.close();
        System.out.println("join successfully");
    }

}


举报

相关推荐

0 条评论