0
点赞
收藏
分享

微信扫一扫

Day5-Hive的结构和优化、数据文件存储格式

覃榜言 04-05 22:30 阅读 2
hive

Hive

窗口函数

案例

  1. 需求:连续三天登陆的用户数据

  2. 步骤:

    -- 建表
    create table logins (
        username string,
        log_date string
    ) row format delimited fields terminated by ' ';
    -- 加载数据
    load data local inpath '/opt/hive_data/login' into table logins;
    -- 查询数据
    select *
    from logins tablesample (5 rows);
    -- 按用户分组,将登陆日期进行排序
    -- over(partition by xxx order by xxx)
    -- 获取每一条数据两行之前的数据
    select *, 
           lag(log_date, 2) over (partition by username order by log_date) as 2d_log_date
    from logins;
    -- 获取连续三天登录的数据
    select distinct username
    from (
        select *, 
               lag(log_date, 2) over (partition by username order by log_date) as 2d_log_date
        from logins
    ) t where datediff(log_date, 2d_log_date) = 2;
    

其他操作

join

  1. 同MySQL类似,在Hive中也提供了表之间的join,包含:内连接inner join,左连接left join,右连接right join和全外连接full outer join以及极少使用的笛卡尔积

  2. 除此之外,Hive还提供了特殊的连接:left semi join。当a left semi join b,表示获取a表哪些数据在b表中出现过

  3. 案例

    -- 建表
    drop table if exists orders;
    create table orders (
        order_id   int,
        order_date string,
        product_id int,
        number     int
    ) row format delimited fields terminated by ' ';
    drop table if exists products;
    create table products (
        product_id   int,
        product_name string,
        price        double
    ) row format delimited fields terminated by ' ';
    -- 加载数据
    load data local inpath '/opt/hive_data/orders' into table orders;
    load data local inpath '/opt/hive_data/products' into table products;
    select *
    from orders;
    select *
    from products;
    -- 需求一:获取每天卖了多少钱
    select o.order_date,
           sum(o.number * p.price)
    from orders o left join products p 
        on o.product_id = p.product_id
    group by o.order_date;
    -- 需求二:获取哪些商品被卖出去过
    -- 获取商品表中的哪些数据在订单表中出现过
    -- 方式一:left semi join
    select *
    from products p left semi join orders o
                  on p.product_id = o.product_id;
    -- 方式二:
    select * from products
    where product_id in (select product_id from orders);
    

排序

  1. 不同于MySQL的地方在于,在Hive中,提供了两种排序方式

    1. order by:全局排序。在排序的时候,会忽略掉ReduceTask的数量,对所有的数据进行整体的排序
    2. sort by:局部排序。这种方式,在每一个ReduceTask内部排序。如果没有指定,那么会根据排序字段,计算字段的哈希码,然后将字段分发到对应到ReduceTask上来进行排序
  2. 案例

    1. 原始数据

      2 henry 84
      3 jack 76
      1 david 92
      1 bruce 78
      1 balley 77
      2 hack 85
      1 tom 79
      3 peter 96
      2 eden 92
      1 mary 85
      3 pard 61
      3 charles 60
      2 danny 94
      3 cindy 75
      
    2. 案例

      -- 建表
      drop table if exists scores;
      create table scores (
          class int,
          name  string,
          score int
      ) row format delimited fields terminated by ' ';
      -- 加载数据
      load data local inpath '/opt/hive_data/scores' into table scores;
      -- 查询数据
      select *
      from scores tablesample (5 rows);
      -- order by
      insert overwrite local directory '/opt/hive_demo/order_by1'
          row format delimited fields terminated by '\t'
      select * from scores order by score;
      -- sort by
      insert overwrite local directory '/opt/hive_demo/sort_by1'
          row format delimited fields terminated by '\t'
      select * from scores sort by score;
      -- Hive中的SQL默认会转化为MapReduce任务来执行
      -- 在MapReduce中,如果不指定,默认只有1个ReduceTask
      -- 因此也只产生1个结果文件
      -- 指定ReduceTask的个数
      set mapreduce.job.reduces = 3;
      insert overwrite local directory '/opt/hive_demo/order_by2'
          row format delimited fields terminated by '\t'
      select * from scores order by score;
      insert overwrite local directory '/opt/hive_demo/sort_by2'
          row format delimited fields terminated by '\t'
      select * from scores sort by score;
      -- 在实际过程中,其实极少单独使用sort by
      -- sort by一般是结合distribute by来使用
      -- 案例:将每一个班的学生按照成绩降序排序
      insert overwrite local directory '/opt/hive_demo/distribute_by'
          row format delimited fields terminated by '\t'
      select * from scores distribute by class sort by score desc;
      
  3. 如果distribute bysort by的字段一致,那么可以省略为cluster by。注意,cluster by默认只能升序不能降序排序

beeline和JDBC

  1. JDBC(Java Database Connection):类似于MySQL,Hive也提供了JDBC操作,代码和MySQL的JDBC操作一模一样

    package com.fesco.jdbc;
    
    import java.sql.*;
    
    public class HiveJDBCDemo {
    
        public static void main(String[] args) throws ClassNotFoundException, SQLException {
    
            // 注册驱动
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            // 获取连接
            // Hive对外提供的连接端口是10000
            Connection connection = DriverManager.getConnection("jdbc:hive2://101.36.69.196:31177/demo", "root", "root");
            // 获取表述
            Statement statement = connection.createStatement();
            // 执行SQL
            ResultSet resultSet = statement.executeQuery("select * from products");
            // 遍历结果集
            while(resultSet.next()){
                System.out.println(resultSet.getString("product_name"));
            }
            // 关闭
            resultSet.close();
            statement.close();
            connection.close();
    
        }
    
    }
    
  2. 利用Datagrip连接Hive,实际上就是用JDBC的方式连接的Hive

  3. Hive提供了原生的远程连接方式:beeline

    beeline -u jdbc:hive2://hadoop01:10000/demo -n root
    

    -u表示url,连接地址;-n表示name,用户名

  4. 注意:如果想要使用JDBC的方式连接Hive,那么必须开启hiveserver2服务!!!

SerDe

  1. SerDe(Serializar-Deserializar):是Hive中提供的一套用于进行序列化和反序列化操作的机制

  2. 可以利用SerDe来解决数据的格式问题

  3. 案例

    -- 不使用SerDe
    -- 1. 建表管理原始数据
    create table logs_tmp (
        log string
    );
    load data local inpath '/opt/hive_data/logs' into table logs_tmp;
    select * from logs_tmp tablesample (5 rows);
    -- 2. 建表
    create table logs (
        user_ip     string, -- 用户ip
        log_date    string, -- 访问日期和时间
        timezone    string, -- 时区
        request_way string, -- 请求方式
        resources   string, -- 请求资源
        protocol    string, -- 请求协议
        state_id    int     -- 状态码
    ) row format delimited fields terminated by '\t';
    -- 3. 先将数据替换为规则形式,然后利用split拆分
    insert into table logs
    select arr[0], arr[1], arr[2], arr[3], arr[4], arr[5], cast(arr[6] as int)
    from (
       select split(regexp_replace(log, '(.*) \-\- \\[(.*) (.*)\\] \"(.*) (.*) (.*)\" (.*) \-', '$1 $2 $3 $4 $5 $6 $7'), ' ') arr from logs_tmp
    ) t;
    -- 查询数据
    select * from logs;
    -- 使用SerDe
    -- 建表
    drop table if exists logs;
    create table logs (
        user_ip     string, -- 用户ip
        log_date    string, -- 访问日期和时间
        timezone    string, -- 时区
        request_way string, -- 请求方式
        resources   string, -- 请求资源
        protocol    string, -- 请求协议
        state_id    int     -- 状态码
    ) row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe'
      with serdeproperties (
          'input.regex'='(.*) \-\- \\[(.*) (.*)\\] \"(.*) (.*) (.*)\" (.*) \-'
      ) stored as textfile ;
    load data local inpath '/opt/hive_data/logs' into table logs;
    select * from logs;
    

视图(view)

  1. 视图是对原表中部分字段进行抽取,可以看作是原表的子表,但是本质上是一个虚拟表

  2. 当不需要表中所有的字段,而只需要这个表中的部分字段的时候,那么此时就可以使用视图来对数据进行封装

  3. 视图只能看不能修改!

  4. 视图的优点

    1. 简单。使用视图的时候,完全不需要关心视图背后依赖的表结构是否发生变化,是否产生关联。对于用户而言,视图就是已经符合过滤条件的结果集
    2. 安全。用户只被允许访问视图中已经过滤好的数据,并且视图中的数据只能查不能改,此时不会影响基表的数据
    3. 数据独立。一旦视图创建,那么此时基表发生结构变化,不会影响视图的操作
  5. Hive中,在定义视图的时候,需要封装一个select语句。此时,被封装的这个select在创建视图的时候并不会执行;而是会在第一次查询视图的时候才会触发封装的select执行

  6. 视图分为虚拟视图(只存储在内存中,可以认为是一个虚拟表)和物化视图(会落地到磁盘上,此时就是一个真正的子表)。需要注意的是,Hive只支持虚拟视图不支持物化视图

  7. 案例

    -- 创建视图
    create view logs_view as select user_ip, log_date, resources from logs order by log_date;
    -- 查询视图
    select * from logs_view;
    -- 删除视图
    drop view logs_view;
    

Hive存储

概述

  1. Hive中的数据最终是以文件形式落地到HDFS上,目前Hive官方原生的文件格式有6种:textfileRCFileorcparquetavrosequencefile
    1. avrosequencefile将文件以序列形式来存储(序列化文件)
    2. 如果不指定,那么HDFS默认将文件以textfile格式存储
    3. textfileavrosequencefile是行存储格式,RCFileorcparquet是列存储格式
    4. textfile不支持修改(delete和update),但是列存储格式都支持delete和update操作,效率非常低

textfile

  1. Hive中的文件格式默认就是textfile
  2. 默认情况下,textfile不对数据进行压缩,因此占用磁盘空间相对较大;在进行数据分析的时候,开销相对也较大
  3. textfile支持Gzip和Bzip2的压缩格式

orc

  1. orc(Optimized Row Columnar,优化的行列格式),是Hive0.11版本引入的一种文件格式,是基于RCFile格式机型优化,本身是以存储形式来存放数据
  2. 每一个ORC文件,由1个File Footer、1个Postscript以及1到多个Stripe来组成
  3. Stripe:用于存储数据的。默认情况下,每一个Stripe的大小是250M。Stripe由三部分组成
    1. Index Data:索引数据。默认每隔10000行数据形成一次索引,会记录每一列的最小值、最大值、以及每一列中的行索引
    2. Row Data:行数据。在存储数据的时候,不是将整个表的列来进行拆分,而是先截取部分行,然后将每一行数据的字段来进行拆分。因为同一列中的字段类型是一致的,所以可以给不同的列来指定的压缩机制进行更好的压缩
    3. Stripe Footer:记录每一列的数据类型、每一列的字节长度
  4. File Footer:记录每一个Stripe中包含的行数以及每一列的数据类型。初次之外,File Footer还可以记录每一列的聚合信息,例如sum、max等
  5. Postscript:记录了整个文件的信息,例如文件是否压缩,压缩编码是什么,以及File Footer在orc文件中的从存储位置
  6. 如果需要在ORC文件中查询某一条数据:
    1. 首先从文件末尾读取Postscript,从Postscript中获取到File Footer在文件中的存储位置
    2. 然后读取File Footer,从File Footer中获取这条数据所在的Stripe的位置
    3. 读取Stripe中的Index Data,锁定这条数据对应的索引位置,最后再通过Row Data获取到这条数据

parquet格式

  1. parquet格式是从Hive0.10版本开始提供的一种二进制的文件格式,所以不能直接读取
  2. 每一个parquet文件中,包含了四部分
    1. Magic Code:魔数,用于确保当前的文件是一个parquet文件
    2. Footer Length:记录元数据的大小。通过这个值以及parquet文件的大小,可以计算出元数据在parquet文件中的偏移量
    3. Metastore:元数据存储,记录了当前parquet文件的文件信息,以及文件大小、Row Group的数量
    4. Row Group:行组
      1. 将文件从行方向上进行切分,每一部分就是一个Row Group。默认情况下,Row Group和Block是等大的
      2. 每一个行组中,又包含了1个到多个Column Chunk(列块)。每一列对应了一个列块。因为同一个列块中的数据类型相同,所以可以给不同的列块指定不同的压缩编码
      3. 每一个列块中包含了一个到多个Page(页)。Page是parquet文件中数据存储的最小单位
      4. Page分为三种
        1. 数据页:存储数据
        2. 字典页:存储编码信息
        3. 索引页:记录存储的数据在文件中的偏移量
      5. 需要注意的是,Hive提供的原生的parquet文件不支持索引页
  3. parquet格式支持LZO和snappy压缩

Hive压缩

  1. Hive支持对结果文件进行压缩。其中,经常对orc和parquet文件进行压缩

  2. orc文件压缩可以通过属性orc.compress来配置压缩,可以使用的值:NONEZLIBSNAPPYNONE表示不压缩

    create table orc_test (
        id   int,
        name string,
        age  int
    ) row format delimited fields terminated by ' '
        stored as orc; -- 以orc格式来存储
    insert into table orc_test values (1, 'Amy', 15);
    
    create table orc_zlib (
        id   int,
        name string,
        age  int
    ) row format delimited fields terminated by ' '
        stored as orc
        tblproperties ('orc.compress' = 'ZLIB');
    insert into table orc_zlib values (1, 'Amy', 15);
    
    create table orc_snappy (
        id   int,
        name string,
        age  int
    ) row format delimited fields terminated by ' '
        stored as orc
        tblproperties ("orc.compress" = "SNAPPY");
    insert into table orc_snappy values (1, 'Amy', 15);
    
  3. parquet文件压缩可以通过属性parquet.compression进行配置。可以使用的值:NONESNAPPY

    create table parquet_test (
        id   int,
        name string,
        age  int
    ) row format delimited fields terminated by ' '
        stored as parquet; -- 以parquet形式来存储数据
    insert into table parquet_test values (1, 'May', 15);
    
    create table parquet_snappy (
        id   int,
        name string,
        age  int
    ) row format delimited fields terminated by ' '
        stored as parquet
        tblproperties ("parquet.compression" = "SNAPPY");
    insert into table parquet_snappy values (1, 'May', 15);
    

Hive结构和优化

结构

Hive结构
  1. Client Interface:客户端端口,包含CLI(Command-line,命令行)和JDBC两种方式
  2. 客户端连接Client Interface,提交要执行的SQL。这个SQL会被提交给Driver(驱动器)
  3. Driver包含了4部分
    1. SQL Parser:SQL解析器,SQL提交给Driver之后,会先有SQL Parser进行解析,在解析的时候,先去检查SQL的语法是否正确,会连接元数据库查询/修改元数据,然后将SQL转化为抽象语法树(AST)
    2. Physical Plan:物理计划。SQL Parser将SQL解析成AST之后,将AST交给Physical PLAN,将AST编译成具体的执行逻辑
    3. Query Optimizer:查询优化器。Physical PLAN将执行逻辑交给Query Optimizer进行优化
    4. Execution:执行器。负责将优化之后的执行逻辑转化成具体的执行任务,例如将执行逻辑转化为MapReduce程序

优化

  1. 列裁剪或者分区裁剪。在实际生产环境中,经常需要处理大量的数据,那么此时使用select * from x的形式,会对整个表进行扫描,从而导致查询效率变低。因此在实际过程中,最好执行列或者指定分区进行查询;如果需要进行按行的查询,那么最好限制查询的行数,例如使用limit n或者tablesample(n rows)

  2. group by的优化。在进行group by的时候,那么此时相同的键对应的值会被分到一组,会被分发到某一个ReduceTask来处理这一组数据。如果某一个键对应的值比较多,那么此时处理这个键的ReduceTask的任务量就相对较大,此时就产生了数据倾斜。针对这个问题,提供了两种优化方案

    1. map combine:map端的聚合,就是将数据在MapTask处先进行一次聚合,然后再将聚合后的结果发送给ReduceTask处理

      -- 开启聚合机制
      set hive.map.aggr = true;
      -- 指定聚合的值
      set hive.groupby.mapaggr.checkinterval = 10000;
      
    2. 二阶段聚合(负载均衡方式):将Hive的执行过程拆分成2个MapReduce任务执行。第一个MapReduce中,先将数据打散之后进行聚合,第二个MapReduce中,再根据实际的要求进行聚合

      -- 开启二阶段聚合
      set hive.groupby.skewindata = true;
      
  3. CBO(Cost based Optimizer,基于花费的优化器)

    1. CBO是从Hive0.10开始添加的一种优化机制,从Hive1.1.0开始,CBO优化默认是开启的,可以通过属性hive.cbo.enable来调节
    2. CBO遵循的原则:谁的执行代价最小就是最好的执行计划
  4. 谓词下推。在保证结果不发生改变的前提下,尽量将where条件(谓词)提前执行,来减少下游处理的数据量,这个过程就称之为谓词下推

    -- 开启谓词下推
    -- ppd是PredicatePushDown,预测/谓词下推
    set hive.optimize.ppd = true;
    
  5. map join。当小表和大表进行join的时候, 将小表放入内存中分发给每一个MapTask,MapTask在处理数据时候就直接从内存中获取数据,此时join过程在Map端完成,从而减少了最终交给ReduceTask的数据量

    -- 默认是25M
    set hive.mapjoin.smalltable.filesize = 25000000;
    
  6. SMB join

    1. SMB join(sort merge bucket join):基于分桶机制和map join的前提下实现的一种join方式,用于解决大表和大表之间的join问题
    2. 当大表和大表进行join的时候,可以考虑先将大表的数据进行分桶,每一个桶中都只包含部分数据,此时每一个桶就相当于是一个小表,在此时join的时候,就是小表和大表join,那么可以进行map join。本质上就是"分而治之"的思想
    3. SMB join的条件:A join B
      1. A表和B表都必须分桶,并且B表的桶数必须是A表桶数的整数倍。例如A分了4个桶,那么B表的桶数必须是4n
      2. 分桶字段和join字段必须一致。A join B on a.id = b.id,那么此时A表和B表必须以id字段来进行分桶
  7. 启用严格模式

    1. hive.strict.checks.no.partition.filter:默认为false,如果设置为true,那么在查询分区表的时候,必须以分区作为查询条件

    2. hive.strict.checks.orderby.no.limit:默认为false,如果设置为true,那么进行order by的时候,必须添加limit语句

    3. hive.strict.checks.cartesian.product:默认为false,如果设置为true,那么严禁进行笛卡尔积

举报

相关推荐

0 条评论