0
点赞
收藏
分享

微信扫一扫

拉链表的介绍及运用

芷兮离离 2022-01-23 阅读 81

Hive数据仓库——拉链表

拉链表介绍

拉链表:维护历史状态,以及最新状态数据的一种表,拉链表根据拉链粒度的不同,实际上相当于快照,只不过做了优化,去除了一部分不变的记录,通过拉链表可以很方便的还原出拉链时点的客户记录

拉链表的应用场景

数据仓库的数据模型设计过程中,经常会遇到这样的需求:

表中的部分字段会被update,例如: 用户的地址,产品的描述信息,品牌信息等等; 需要查看某一个时间点或者时间段的历史快照信息,例如: 查看某一个产品在历史某一时间点的状态 查看某一个用户在过去某一段时间内,更新过几次等等 变化的比例和频率不是很大,例如: 总共有1000万的会员,每天新增和发生变化的有10万左右

拉链表案例

需求

将MySQL中的商品表数据用拉链表的形式抽取到Hive数据仓库中

 

环境准备

  • 在 mysql 中创建表和加载数据

    -- 创建MySQL数据库
    CREATE DATABASE demo DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
    ​
    -- 创建商品表
    create table if not exists `demo`.`t_product`(
        goods_id varchar(50),       -- 商品编号
        goods_status varchar(50),   -- 商品状态
        createtime varchar(50),     -- 商品创建时间
        modifytime varchar(50)      -- 商品修改时间
    );
    ​
    -- 导入12-20数据
    insert into `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) values
    ('001', '待审核', '2019-12-18', '2019-12-20'),
    ('002', '待售', '2019-12-19', '2019-12-20'),
    ('003', '在售', '2019-12-20', '2019-12-20'),
    ('004', '已删除', '2019-12-15', '2019-12-20');
    ​
    -- 全量同步结束后,在导入下面的数据
    -- 导入12-21数据
    UPDATE `demo`.`t_product` SET goods_status = '待售', modifytime = '2019-12-21' WHERE goods_id = '001';
    INSERT INTO `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) VALUES
    ('005', '待审核', '2019-12-21', '2019-12-21'),
    ('006', '待审核', '2019-12-21', '2019-12-21');
  • 开启 Hive 服务

    # 启动hive
    nohup hive --service metastore > /dev/null 2>&1 &
    nohup hive --service hiveserver2 > /dev/null 2>&1 &
    ​
    #启动beeline
    beeline
    # 连接Hive
    !connect jdbc:hive2://node1:10000
    root
    123456
  • 启动

    -- 删除之前存在的数据库, cascade是级联删除的意思,否则数据库下有数据,是不然删的.
    drop database demo cascade;
    ​
    -- 创建Hive的数据库
    create database if not exists `demo`;
    ​
    -- 创建ods层表
    create table if not exists `demo`.`ods_product`(
        goods_id string,        -- 商品编号
        goods_status string,    -- 商品状态
        createtime string,      -- 商品创建时间
        modifytime string       -- 商品修改时间
    )
    partitioned by (dt string)
    row format delimited fields terminated by ',' stored as TEXTFILE;
    ​
    -- 创建拉链表
    create table if not exists `demo`.`dw_product`(
        goods_id string,        -- 商品编号
        goods_status string,    -- 商品状态
        createtime string,      -- 商品创建时间
        modifytime string,       -- 商品修改时间
        dw_start_date string,   -- 生效日期
        dw_end_date string      -- 失效日期
    )
    row format delimited fields terminated by ',' stored as TEXTFILE;

ETL导出方式

使用 sqoop

  • 全量导入数据到拉链表

    bin/sqoop import \
    --connect jdbc:mysql://node1:3306/demo \
    --username root \
    --password 123456 \
    --table t_product \
    --delete-target-dir  \
    --target-dir /user/hive/warehouse/demo.ods/ods_product \
    --fields-terminated-by ',' \
    -m 1
  • Upsert数据到表

    sqoop import \
    --connect jdbc:mysql://node1:3306/demo \
    --username root \
    --password 123456 \
    --table t_product \
    --target-dir /user/hive/warehouse/demo.ods/ods_product \
    --fields-terminated-by ',' \
    --merge-key goods_id \
    --incremental lastmodified \
    --check-column lastmode \
    --last-value '2021-01-06 17:25:25' \
    -m 1

使用 kettle

  •  

  • 将文件上传到 分区目录下

    hdfs dfs -cat /user/hive/warehouse/demo.db/ods_product/dt=20191220/00.txt
  • 需要修复分区才能看到数据

    msck repair table ods_product;

将 ODS 层数据同步到 DW 层

因为目前DW层表有2个额外的字段: dw_start_date: 当前记录的生效时间 dw_end_date: 当前记录的失效时间,注意: 9999-12-31这个日期代表的是最新的数据记录. 本次是全量同步,我们全部设置如下: dw_start_date: 使用数据的修改时间作为本次记录的开始时间 dw_end_date: 失效时间为9999-12-31就可以了.

-- 全量加载ODS的数据到DW层
insert into table dw_product
-- 查询ODS的所有数据
select
ods.goods_id, -- 商品编号
ods.goods_status, -- 商品状态
ods.createtime, -- 商品创建时间
ods.modifytime, -- 商品的修改时间
-- 开始时间
ods.modifytime as dw_start_date,
-- 结束时间
'9999-12-31' as dw_end_date
from ods_product ods;
​
-- 验证结果
select * from dw_product;

增量导入数据到拉链表

  • 在 MySQL 中新增和修改数据

    UPDATE `demo`.`t_product` SET goods_status = '待售', modifytime = '2019-12-21' WHERE goods_id = '001';
    INSERT INTO `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) VALUES
    ('005', '待审核', '2019-12-21', '2019-12-21'),
    ('006', '待审核', '2019-12-21', '2019-12-21');
  • 抽取数据到 HDFS

    配置转换

    • MySQL查询增加查询条件

    •  

    • HDFS使用dt命名参数:

    •  

    • 配置命名参数

    •  

    • 修复分区数据

      msck repair table ods_product;
      select * from ods_product;
      

增量同步ODS数据到DW

 

-- 整体分为2步走.
-- 插入数据使用overwrite,这样可以将之前的数据覆盖掉.
insert overwrite table dw_product
-- 1. 先把之前的历史数据结束时间修改掉
select
dw.goods_id, -- 商品编号
dw.goods_status, -- 商品状态
dw.createtime, -- 商品创建时间
dw.modifytime, -- 商品的修改时间
-- 开始时间
dw.dw_start_date,
-- 结束时间
-- ods的商品ID不为null,并且dw的结束时间为9999-12-31
case when (ods.goods_id is not null and dw.dw_end_date = '9999-12-31')
-- 条件满足修改结束时间
then ods.modifytime
-- 如果条件不满足,那么我们就不修改结束时间
else dw.dw_end_date
end as dw_end_date
from
-- 拉链表
dw_product dw
-- 左外连接ODS新增数据
left join
(select * from ods_product where dt='20191221') ods
on
dw.goods_id = ods.goods_id
-- 2. 关联本次新增数据
union all
select
ods.goods_id, -- 商品编号
ods.goods_status, -- 商品状态
ods.createtime, -- 商品创建时间
ods.modifytime, -- 商品的修改时间
-- 开始时间
ods.modifytime as dw_start_date,
-- 结束时间
'9999-12-31' as dw_end_date
from ods_product ods where dt = '20191221';

-- 验证结果
select * from dw_product;
-- 获取拉链表中最新的商品信息
select * from dw_product where dw_end_date = '9999-12-31';

 

举报

相关推荐

0 条评论