0
点赞
收藏
分享

微信扫一扫

【车载开发系列】汽车嵌入式开发常用工具介绍

以前干嘛去了 2024-06-22 阅读 30

一、环境准备

  • 创建一个 Maven 工程并引入依赖

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.3.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.3.1</version>
    </dependency>
    
  • 创建 API 测试类并获取 HBase 客户端连接

    public class TestAPI {
        private static Connection conn = null;
        private static Admin admin = null;  // DDL 操作客户端
        
        static {
            try { 
                // 1.获取配置信息对象
                // 过时API
                // HBaseConfiguration conf1 = new HBaseConfiguration();
                // conf1.set("hbase.zookeeper.quorum","hadoop102,hadoop103,hadoop104");
    
                // 新API
                Configuration conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.quorum","hadoop102,hadoop103,hadoop104");
    
                // 2.创建连接对象(新API)
                conn = ConnectionFactory.createConnection(conf);
    
                // 3.创建hbase管理员对象
                // 过时API
                // HBaseAdmin admin1 = new HBaseAdmin(conf1);
    
                // 新API
                admin = conn.getAdmin();
                
            } catch(Exception e) {
            	e.printStackTrace();    
            }
        }
        
        // 关闭资源
        public static void close() {
            try {
                if(admin != null) {
                    admin.close();
                }
                
                if(conn != null) {
                    conn.close();
                }
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    }
    

二、HBase API 实操

1. 判断表是否存在

public static boolean isTableExist(String tableName) {
    // 过时API
    // boolean exist = admin1.tableExists(tableName);
    
    // 新API
    boolean exist = admin.tableExists(TableName.valueOf(tableName));
    
    return exist;
}

2. 创建表

public static void createTable(String tableName, String... cfs) {
    // 1.判断传入列族信息
    if(cfs.length()<=0) {
        System.out.println("必须设置一个列族信息");
        return;
    }
    
    // 2.判断表是否存在
    if(isTableExist(tableName)) {
        System.out.println(tableName + "表已经存在");
        return;
    }
    
    // 3.创建表描述器
    HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
    
    // 4.循环添加列族
    for(String cfs : cfs) {
        // 5.创建列族描述器
        HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfs);
        // 6.添加单个列族
        hTableDescriptor.addFamily(hColumnDescriptor);
    }
    
    // 7.创建表
    admin.createTable(hTableDescriptor);
    
    System.out.println(tableName + "表创建成功!");
}

3. 删除表

public static void deleteTable(String tableName) {
    // 1.判断表是否存在
    if(!isTableExist(tableName)) {
        System.out.println(tableName + "表已经不存在!");
        return;
    }
    
    // 2.使表下线
    admin.disableTable(TableName.valueOf(tableName));
    
    // 3.删除表
    admin.deleteTable(TableName.valueOf(tableName));
    
    System.out.println(tableName + "表删除成功!");
}

4. 创建命名空间

public static void createNamespace(String ns) {
    // 1.创建命名空间描述器
    NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(ns).build();
    
    try {
    	// 2.创建命名空间
    	admin.createNamespace(namespaceDescriptor); 
        
        System.out.println(ns + "命名空间创建成功!");
    } catch(NamespaceExistException e) { // 生产上判断 ns 是否存在
        System.out.println(ns + "命名空间已经存在!");
    } catch(Exception e) {
        e.printStackTrace();
    }
    
}

5. 插入数据

public static void putData(String tableName, String rowKey, String cf, String cn, String value) {
    // 1.创建hbase表操作对象
    Table table = conn.getTable(TableName.valueOf(tableName));
    
    // 2.创建 put 对象
    // HBase 底层存储的数据格式都是 byte[],可以通过 hbase.util 包下的 Bytes 类进行数据类型转换
    Put put = new Put(Bytes.toBytes(rowKey));
    
    // 3.给put对象赋值
    put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
    
    // 4.插入数据
    table.put(put);
    
    // 5.关闭资源
    table.close();
    
}

6. 获取数据

6.1 get
public static void getData(String tableName, String rowKey, String cf, String cn) {
    // 1.创建hbase表操作对象
    Table table = conn.getTable(TableName.valueOf(tableName));
    
    // 2.创建get对象
    Get get = new Get(Bytes.toBytes(rowKey));
    // 2.1.指定获取的列族
    // get.addFamily(Bytes.toBytes(cf));
    // 2.2.指定获取的列族和列
    // get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
    // 2.3.指定获取数据的版本数
    // get.setMaxVersions();  // 相当于 scan 'tb',{RAW=>TRUE,VERSIONS=>10}
    
    // 3.获取一行的数据
    Result result = table.get(get);
    
    // 4.获取一行的所有 cell
    for(Cell cell : result.rawCells()) {
        // 5.打印数据信息
        System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
                          ",qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) 
                           + ",value:" + Bytes.toString(CellUtil.cloneValue(cell))
                          );
    }
    
    // 6.关闭资源
    table.close();
}
6.2 scan
public static void scanTable(String tableName) {
    // 1.创建hbase表操作对象
    Table table = conn.getTable(TableName.valueOf(tableName));
    
    // 2.构建scan对象
    Scan scan = new Scan(); // 全表
    // Scan scan = new Scan(Bytes.toBytes("1001"),Bytes.toBytes("1003")); // 限定 rowKey 范围,左闭右开
    
    // 3.扫描表
    ResultScanner resultScanner = table.getScanner(scan);
    
    // 4.解析resultScanner
    for(Result result : resultScanner) { // 按 rowKey 从小到大遍历获取
        // 5.解析result
        for(Cell cell : result.rawCells()) {
            // 6.打印数据信息
        System.out.println("rk:" + Bytes.toString(CellUtil.cloneRow(cell)) +
            ",family:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
                          ",qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) 
                           + ",value:" + Bytes.toString(CellUtil.cloneValue(cell))
                          );
        }
    }
    
    // 7.关闭资源
    table.close();
}

7. 删除数据

public static void deleteData(String tableName, String rowKey, String cf, String cn) {
    // 1.创建hbase表操作对象
    Table table = conn.getTable(TableName.valueOf(tableName));
    
    // 2.构建delete对象
    Delete delete = new Delete(Bytes.toBytes(rowKey)); // 相当于 deleteall 命令
    // 2.1.设置删除列
    // delete.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn)); // 删除指定版本的列,不指定则为最大版本,小于该版本的列信息可以查询出来(慎用)
    // delete.addColumns(Bytes.toBytes(cf),Bytes.toBytes(cn)); // 删除列的所有版本
    
    // 2.2.设置删除的列族
    // delete.addFamily(Bytes.toBytes(cf));
    
    // 3.删除数据
    table.delete(delete);
    
    // 4.关闭资源
    table.close();
}
  • 只指定 rowKey 删除:删除指定 rowKey 的所有列族所有版本数据,标记为 deleteFamily
  • 指定 rowKey + 列族 [+ 版本]:删除指定 rowKey 的指定列族所有版本 (小于等于该版本) 数据,标记为 deleteFamily
  • 指定 rowKey + 列族 + 列 [+ 版本]:
    • addColumns():删除指定 rowKey 的指定列族的指定列的所有版本 (小于等于该版本) 数据,标记为 deleteColumn
    • addColumn():删除指定 rowKey 的指定列族的指定列的最新版本 (该指定版本) 数据,标记为 delete,生产上慎用

三、与 MapReduce 交互

1. 环境搭建

  • 查看 MR 操作 HBase 所需的 jar 包:

    cd /opt/module/hbase
    bin/hbase mapredcp
    
  • 在 Hadoop 中导入环境变量

    • 临时生效:

      # 在命令行执行
      export HBASE_HOME=/opt/module/hbase
      export HADOOP_HOME=/opt/module/hadoop-2.7.2
      export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
      
    • 永久生效:

      # 在 /etc/profile.d/my_env.sh 配置
      export HBASE_HOME=/opt/module/hbase
      export HADOOP_HOME=/opt/module/hadoop-2.7.2
      
      # 在 hadoop-env.sh 中配置,在有关 HADOOP_CLASSPATH 的 for 循环之后添加
      export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
      
  • 分发配置到其他节点

  • 启动 Hadoop 集群和 HBase 集群

2. 案例实操

2.1 官方案例
  • 统计 stu 表中有多少行数据

    /opt/module/hadoop-2.7.2/bin/yarn jar \
    /opt/module/hbase/lib/hbase-server-1.3.1.jar \ 
    rowcounter student
    
  • 使用 MapReduce 将本地数据导入到 HBase

    # 在本地创建一个 tsv 格式的文件:fruit.tsv
    vim fruit.tsv
    1001  Apple  Red
    1002  Pear  Yellow
    1003  Pineapple Yellow
    
    # 在 HDFS 中创建 input_fruit 文件夹并上传 fruit.tsv 文件
    /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
    /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
    
    # 创建 Hbase 表
    Hbase(main):001:0> create 'fruit','info'
    
    # 执行 MapReduce 到 HBase 的 fruit 表中(若表不存在则报错)
    /opt/module/hadoop-2.7.2/bin/yarn jar \
    /opt/module/hbase/lib/hbase-server-1.3.1.jar \
    importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
    hdfs://hadoop102:9000/input_fruit/fruit.tsv
    
    # 使用 scan 命令查看导入后的结果
    Hbase(main):001:0> scan 'fruit'
    
2.2 自定义案例
  • 案例 1:实现将 HDFS 中的数据写入到 Hbase 表中

    • 编码:

      // 构建 FruitMapper 用于读取 HDFS 中的文件数据
      public class FruitMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
          @override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              context.write(key, value);
          }
      }
      
      // 构建 FruitReducer 用于将 HDFS 中的文件数据写入 Hbase
      // TableReducer 默认的输出value类型是 Mutation,其子类有 put/delete 等
      public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> {
          @override
          protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
              // 1.遍历获取每行数据
              for(Text value : values) {
                  // 2.切割一行数据
                  String[] fields = value.toString().split("\t");
                  // 3.构建put对象
                  Put put = new Put(Bytes.toBytes(fields[0]));
                  // 4.为put对象赋值
                  put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1]));
                  put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2]));
                  // 5.数据写入hbase
                  context.write(NullWritable.get(),put);
              }
          }
      }
      
      // 构建 FruitDriver
      public class FruitDriver implements Tool {
          private Configuration conf = null;
          
          @override
          public int run(String[] args) throws Exception {
              // 1.获取job
              Job job = Job.getInstance(conf);
              // 2.设置驱动类路径
              job.setJarByClass(FruitDriver.class);
              // 3.设置Mapper及输出KV类型
              job.setMapperClass(FruitMapper.class);
              job.setMapOutputKeyClass(LongWritable.class);
              job.setMapOutputValueClass(Text.class);
              
              // 4.设置Reducer(不能使用普通的 Reducer 设置)
              TableMapReduceUtil.initTableReducerJob(args[1], FruitReducer.class, job);
              
              // 5.设置输入路径
              FileInputFormat.setInputPaths(job, new Path(args[0]));
              
              // 6.提交job
              boolean result = job.waitForCompletion(true);
              
              return result ? 0 : 1;
          }
          
          @override
          public void setConf(Configuration conf) {
              this.conf = conf;
          }
          
          @override
          public Configuration getConf() {
              return conf;
          }
          
          public static void main(String[] args) {
              try {
              	Configuration conf = new Configuration();
      			int status = ToolRunner.run(conf, new FruitDriver(), args);
      			System.exit(status);    
              } catch(Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
    • 将代码打包后上传到 hbase 集群服务器并执行

      # 首先创建 hbase fruit 表
      # 执行:
      /opt/module/hadoop-2.7.2/bin/yarn jar \
      /opt/module/hbase/Hbase-0.0.1-SNAPSHOT.jar com.xxx.FruitDriver \ /input_fruit/fruit.tsv fruit
      
  • 案例 2:将 fruit 表中的一部分数据,通过 MR 迁入到 fruit2 表中

    • 编码

      // 构建 Fruit2Mapper 用于读取 Hbase 中的 Fruit 表数据
      // TableMapper 默认的KV输入类型为 ImmutableBytesWritable, Result
      public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> {
          @override
          protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
              // 1.构建PUT对象
              Put put = new Put(key.get()); // key 是 rowKey 值
              // 2.解析Result
              for(Cell cell : value.rawCells()) {
                  // 3.获取列为 name 的 cell
                  if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                      // 4.为put对象赋值
                      put.add(cell);
                  }
              }
              
              // 5.写出
              context.write(key, put);
          }
      }
      
      // 构建 Fruit2Reducer 用于将数据写入 Hbase
      // TableReducer 默认的输出value类型是 Mutation,其子类有 put/delete 等
      public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
          @override
          protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
      		// 遍历写出
              for(Put put : values) {
                  context.write(NullWritable.get(), put);
              }
          }
      }
      
      // 构建 Fruit2Driver
      public class Fruit2Driver implements Tool {
          private Configuration conf = null;
          
          @override
          public int run(String[] args) throws Exception {
              // 1.获取job
              Job job = Job.getInstance(conf);
              // 2.设置驱动类路径
              job.setJarByClass(Fruit2Driver.class);
              
              // 3.设置Mapper及输出KV类型(不能使用普通的 Mapper 设置)
              TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), Fruit2Mapper.class, ImmutableBytesWritable.class, Put.class, job);
              
              // 4.设置Reducer(不能使用普通的 Reducer 设置)
              TableMapReduceUtil.initTableReducerJob(args[1], Fruit2Reducer.class, job);
              
              // 5.提交job
              boolean result = job.waitForCompletion(true);
              
              return result ? 0 : 1;
          }
          
          @override
          public void setConf(Configuration conf) {
              this.conf = conf;
          }
          
          @override
          public Configuration getConf() {
              return conf;
          }
          
          public static void main(String[] args) {
              try {
              	// Configuration conf = new Configuration();
                  Configuration conf = HbaseConfiguration.create();
      			int status = ToolRunner.run(conf, new Fruit2Driver(), args);
      			System.exit(status);    
              } catch(Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
    • 集群测试:将代码打包后上传到 hbase 集群服务器并执行

      # 首先创建 hbase fruit2 表
      # 执行:
      /opt/module/hadoop-2.7.2/bin/yarn jar \
      /opt/module/hbase/Hbase-0.0.1-SNAPSHOT.jar com.xxx.Fruit2Driver \
      fruit fruit2
      
    • 本地测试:

      • 在 Maven 工程的 resources 目录下创建 hbase-site.xml 文件
      • hbase/conf/hbase-site.xml 的内容拷贝到上面创建的文件中
      • 执行 Fruit2Driver 程序 main 方法

四、集成 Hive

1. 与 Hive 对比

  • Hive
    • 数据仓库:Hive 的本质其实就相当于将 HDFS 中已经存储的文件在 Mysql 中做了一个双射关系,以方便使用 HQL 去管理查询
    • 用于数据分析、清洗:Hive 适用于离线的数据分析和清洗,延迟较高
    • 基于 HDFS、MapReduce:Hive 存储的数据依旧在 DataNode 上,编写的 HQL 语句终将是转换为 MapReduce 代码执行
  • HBase
    • 数据库:是一种面向列族存储的非关系型数据库
    • 用于存储结构化和非结构化的数据:适用于单表非关系型数据的存储,不适合做关联查询,类似 JOIN 等操作
    • 基于 HDFS:数据持久化存储的体现形式是 HFile,存放于 DataNode 中,被 ResionServer 以 region 的形式进行管理
    • 延迟较低,接入在线业务使用:面对大量的企业数据, HBase 可以直线单表大量数据的存储,同时提供了高效的数据访问速度

2. 集成使用

2.1 环境搭建
  • /etc/profile.d/my_env.sh 中配置 Hive 和 HBase 环境变量

    vim /etc/profile.d/my_env.sh
    
    export HBASE_HOME=/opt/module/hbase
    export HIVE_HOME=/opt/module/hive
    
  • 使用软链接将操作 HBase 的 Jar 包关联到 Hive

    ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar
    ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbaseserver-1.3.1.jar
    ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
    ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
    ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
    ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
    ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
    ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
    
  • hive-site.xml 中添加 Zookeeper 连接信息

    <property>
        <name>hive.zookeeper.quorum</name>
        <value>hadoop102,hadoop103,hadoop104</value>
        <description>The  list  of  ZooKeeper  servers  to  talk  to.  This  is 
        only needed for read/write locks.</description>
    </property>
    <property>
        <name>hive.zookeeper.client.port</name>
        <value>2181</value>
        <description>The  port  of  ZooKeeper  servers  to  talk  to.  This  is 
        only needed for read/write locks.</description>
    </property>
    
  • 下载 Hive 对应版本的源码包,使用 Eclipse 工具创建一个普通 Java 工程 import 这个 hbase-handler 文件夹到工程,在工程下创建一个 lib 文件并导入 hive/lib 下所有的 .jar 文件(add build path),使用 export 重新编译 hive-hbase-handler-1.2.2.jar (取消勾选 lib 目录),将新编译的 jar 包替换掉 hive/lib 下的 jar 包

2.2 案例实操
  • 案例 1:建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase 表

    # 在 Hive 中创建表同时关联 HBase,然后分别进入 Hive 和 HBase 查看
    CREATE TABLE hive_hbase_emp_table
    (
        empno int,
        ename string,
        job string,
        mgr int,
        hiredate string,
        sal double,
        comm double,
        deptno int
    )
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
    TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
    
    # 在 Hive 中创建临时中间表,用于 load 文件中的数据
    # 提示:不能将数据直接 load 进 Hive 所关联 HBase 的那张表中
    CREATE TABLE emp
    (
        empno int,
        ename string,
        job string,
        mgr int, 
        hiredate string,
        sal double,
        comm double,
        deptno int
    )
    row format delimited fields terminated by '\t';
    
    # 向 Hive 中间表中 load 数据
    load data local inpath '/home/admin/softwares/data/emp.txt' into table emp;
    
    # 通过 insert 命令将中间表中的数据导入到 Hive 关联 Hbase 的那张表中
    insert into table hive_hbase_emp_table select * from emp;
    
    # 查看 Hive 以及关联的 HBase 表中是否已经成功的同步插入了数据
    select * from hive_hbase_emp_table;
    
    scan 'hbase_emp_table'
    
  • 案例 2:针对已经存在的 HBase 表 hbase_emp_table,在 Hive 中创建一个外部表来关联这张表,并使用 HQL 操作

    # 在 Hive 中创建外部表关联 HBase 表(必须创建外部表)
    CREATE EXTERNAL TABLE relevance_hbase_emp
    (
        empno int,
        ename string,
        job string,
        mgr int,
        hiredate string,
        sal double,
        comm double,
        deptno int
    )
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") 
    TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
    
    # 使用 HQL 操作
    select * from relevance_hbase_emp;
    
举报

相关推荐

0 条评论