0
点赞
收藏
分享

微信扫一扫

大数据学习教程SD版第十六篇【Hbase】

烟中雯城 2022-01-07 阅读 61

1. Hbase 简介

  • 逻辑结构

r列 --rf列族 表纵向切分 Store --rs行键 – region表横向切分 切片

  • 物理存储结构

RowKey 行键 --Column Family 列族 --Column Qualifier 列 --TimeStamp 时间戳 --Type 操作类型 --Value 值

  • 数据模型
  1. Name Space --> Database
  2. Region --> Table
  3. Row --> 一个Rowkey+ 多个列
  4. Column --> Column
  5. Time Stamp --> Version
  6. Ceil --> {rowkey,columnfamily,columnqualifier,timestamp} 最小单元

2. Hbase 简单架构

  1. RegionServer DML
    1. Data:get 、put、delete
    2. Region:split、compact
  2. Master DDL
    1. Table:create 、delete 、alter
    2. RegionServer : 分配 regions 到 每个 RgionServer

3. Hbase 安装

  1. 下载并解压安装包
  2. 修改配置文件

hbase-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_144
#export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
#export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
export HBASE_MANAGES_ZK=false

hbase-site.xml

  <property >
    <name>hbase.rootdir</name>
    <value>hdfs://hadoop102:8020/hbase/</value>
  </property>  
    <property >
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>
    <property>
    <name>hbase.zookeeper.quorum</name>
    <value>hadoop102,hadoop103,hadoop104</value>
  </property>
    <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/opt/module/zookeeper-3.5.7/zkData</value>
  </property>

regionservers

hadoop102
hadoop103
hadoop104
  1. 设置hadoop的core,hdfs-site.xml软连接到hbase下
ln -s /opt/module/hadoop-2.7.1/etc/hadoop/core-site.xml /opt/module/hbase-1.3.1/conf/
ln -s /opt/module/hadoop-2.7.1/etc/hadoop/hdfs-site.xml /opt/module/hbase-1.3.1/conf/
  1. 分发配置

  2. 启动

bin/start-hbase.sh

4. Hbase Shell

bin/hbase shell
# DDL
## 1.query
list
describe 'test1'
## 2.create
create 'test1','info'
## 3.update
alter 'test1', {NAME => 'info', VERSIONS => '3'}
## 4.delete
disable 'test1'
drop 'test1'
## ns 删除ns,必须先删除ns下所有table
list_namespace
list_namespace_tables 'default'

# DML
## 1.put
put 'test1','1001','info:id','001'
## 2.get、scan: 获取最新时间戳(版本)的数据
get 'test1','1001'
get 'test1','1001','info'
get 'test1','1001','info:id'
scan 'test1'
scan 'test1',{STARTROW => '1001',STOPROW => '1003'}
## 扫描10个版本内的数据
scan 'test1',{ RAW => true  ,VERSIONS => 10 }
## 3.update:加一条最新时间戳的数据,不会删除原来版本数据
put 'test1','1003','info:name','wangwuwu'
## 4.delete :标记删除
delete 'test1','1001','info:age'
deleteall 'test1','1001'
truncate 'test1'

##多 version 存储与查询
alter 'test2',{ NAME => 'info', VERSIONS => 3 }
get 'test2', '1001', {COLUMN => 'info:name',VERSIONS => 3}

COLUMN                                    CELL
 info:name                                timestamp=1640829889032, value=ccc
 info:name                                timestamp=1640829885058, value=bbb
 info:name                                timestamp=1640829878870, value=aaa
 
 # flush -> hdfs
 flush 'test2'

5. Hbase 详细架构

在这里插入图片描述

  • 三大组件
  1. HDFS

    • HDFS Client 把StoreFile 存储在DataNode
    • HDFS DataNode 实际存储数据
  2. Hbase

  • HMaster 元数据管理,RegionServer集群管理
  • HRegionServer
    • HLog 预写日志,记录数据操作
    • HRegion 表级数据,表的切片
      • Store 列族级别数据
        • MemoStore 内存数据
        • StoreFile 磁盘数据
          • HFile 存储格式K-V
  1. ZK
    • 与Client交互,对接数据的操作
    • 与HMater交互 ,存储元数据位置

6. Hbase 读写流程

6.1 Hbase 写数据

  1. Client 向ZK 请求 meta表 位置
  2. Client 获取到meta 所在节点,请求meta 信息,数据存在哪个节点,写入缓存
  3. Client 获取到 put数据 所在节点,请求put数据
  4. put 数据的节点 把数据先写入WAL和MemoStore,并返回ack

6.2 Hbase Flush

  1. Regionserver大小级别:默认 headp 0.4 , 0.95
        <name>hbase.regionserver.global.memstore.size</name>  
        <value></value> 
        <name>hbase.regionserver.global.memstore.size.lower.limit</name>  
        <value></value>  
  1. Region大小级别:默认128M
        <name>hbase.hregion.memstore.flush.size</name>  
        <value>134217728</value>  
  1. 时间级别:默认1h
        <name>hbase.regionserver.optionalcacheflushinterval</name>  
        <value>3600000</value>  

6.3 Hbase 读数据

  1. Clinet 向 ZK 请求 meta 位置
  2. Client 向meta 所在节点请求 数据所在节点
  3. Client 向数据所在节点请求 读操作
  4. 读的是memostore+storefile 数据,只不过会把storefile 加载进block cache,与memostore 数据比较时间戳,返回最大时间戳的数据

6.4 Hbase Compact

  1. Minor Compactions 大量小文件合并,不会删除小版本数据
  2. Major Compactions 大文件合并,会删除数据 默认7天

6.5 Hbase Split

7. Hbase API

7.1 DDL API

package com.ipinyou.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

public class APITest {

    private static Connection conn;
    private static Admin admin;

    static {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
        try {
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            admin = conn.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 1.判断表是否存在
     *
     * @param tableName
     * @return
     * @throws IOException
     */
    public static boolean isTableExists(String tableName) throws IOException {
        boolean exists = admin.tableExists(TableName.valueOf(tableName));
        return exists;
    }

    /**
     * 2.创建表
     *
     * @param tableName
     * @param family
     */
    public static void createTable(String tableName, String... family) throws IOException {

        if (isTableExists(tableName)) {
            return;
        }

        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
        for (String f : family) {
            HColumnDescriptor familyDesc = new HColumnDescriptor(f);
            tableDesc.addFamily(familyDesc);
        }
        admin.createTable(tableDesc);
    }

    /**
     * 3.删除表
     *
     * @param tableName
     */
    public static void dropTable(String tableName) throws IOException {
        if (!isTableExists(tableName)) {
            return;
        }
        admin.disableTable(TableName.valueOf(tableName));
        admin.deleteTable(TableName.valueOf(tableName));
    }

    /**
     * 4.创建命名空间
     *
     * @param ns
     */
    public static void createNameSpace(String ns) {
        NamespaceDescriptor nsDesc = NamespaceDescriptor.create(ns).build();
        try {
            admin.createNamespace(nsDesc);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws IOException {
        System.out.println(isTableExists("bigdata:test4"));
//        createTable("bigdata:test4", "info1", "info2");
//        dropTable("test4");
//        createNameSpace("bigdata");
        System.out.println(isTableExists("bigdata:test4"));

    }

}

7.2 DML API

package com.ipinyou.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class APITest2 {

    private static Connection conn;

    static {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
        try {
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 1. 插入数据
     */
    public static void putData(String tableName, String rowKey, String family, String column, String value) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));

        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));

        table.put(put);
        table.close();
    }

    /**
     * 2. 获取数据
     *
     * @param tableName
     * @param rowKey
     * @param family
     * @param column
     */
    public static void getData(String tableName, String rowKey, String family, String column) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
//        get.addFamily(Bytes.toBytes(family));
        get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
        Result result = table.get(get);
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            String cf = Bytes.toString(CellUtil.cloneFamily(cell));
            String c = Bytes.toString(CellUtil.cloneQualifier(cell));
            String v = Bytes.toString(CellUtil.cloneValue(cell));
            System.out.println("Family:" + cf + " Column:" + c + " Value:" + v);
        }
    }

    /**
     * 3. 扫描数据
     *
     * @param tableName
     */
    public static void scanData(String tableName) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                String row = Bytes.toString(CellUtil.cloneRow(cell));
                String cf = Bytes.toString(CellUtil.cloneFamily(cell));
                String c = Bytes.toString(CellUtil.cloneQualifier(cell));
                String v = Bytes.toString(CellUtil.cloneValue(cell));
                System.out.println("Rowkey:" + row + " Family:" + cf + " Column:" + c + " Value:" + v);
            }
        }
    }

    /**
     * 4. 删除数据
     * ## 1.删除指定rowKey下数据
     * ## 2.删除指定列下所有版本数据
     * ## 3.删除指定列下单个版本数据
     *
     * @param tableName
     * @param rowKey
     * @param cf
     * @param column
     * @throws IOException
     */
    public static void deleteData(String tableName, String rowKey, String cf, String column) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));

        table.delete(delete);
    }


    public static void main(String[] args) throws IOException {
//        putData("bigdata:test4", "1001", "info1", "age", "18");
//        getData("bigdata:test4", "1001", "info1", "name");
        scanData("bigdata:test4");
//        deleteData("bigdata:test4", "1003", "", "");
//        scanData("bigdata:test4");
    }

}

8. Hbase MR

# 1.准备工作

# 1.1 在 hadoop-env.sh hadoop_classpath for循环下面添加
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase-1.3.1/lib/*
# 1.2 加一个参数变量
export HBASE_HOME=/opt/module/hbase-1.3.1

# 重启集群
  1. 统计表行数(官方自带)
yarn jar lib/hbase-server-1.3.1.jar rowcounter {tablename}
  1. 导入数据到Hbase Table(官方自带)
# 需要创建table
create 'fruit','info'

yarn jar lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit hdfs://hadoop102:8020/test.tsv
  1. 当然也可以自定义MapReduce程序
# 按照官方文档实例编写
https://hbase.apache.org/book.html#mapreduce

9. Hbase Hive

# 1.copy hbase jar -> hive 
# 2. 把zk信息配置到hive-site.xml中

 <property>
    <name>hive.zookeeper.quorum</name>
    <value>hadoop102,hadoop103,hadoop104</value>
  </property>
  <property>
    <name>hive.zookeeper.client.port</name>
    <value>2181</value>
  </property>

# 3. 在hive中创建hbase关联的表

create table hive_hbase_test(id string,name string, age int) 
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
with serdeproperties("hbase.columns.mapping" = ":key,cf:name,cf:age")
tblproperties ("hbase.table.name" = "hive_hbase_test");

# 4.在hive表中进行查询分析即可

10. Hbase 存储优化

10.1 建表预分区

# 分5个区:根据数据量适当分
create 't1', 'f1', SPLITS => ['10', '20', '30', '40']

10.2 Rowkey设计

  • 散列Rowkey
    1. 随机数、hash
    2. 反转、拼接(加盐)

10.3 基础参数

  1. Hstore大小,默认10G
  2. flush,compact,split大小
    eHandler’
    with serdeproperties(“hbase.columns.mapping” = “:key,cf:name,cf:age”)
    tblproperties (“hbase.table.name” = “hive_hbase_test”);
举报

相关推荐

0 条评论