Hive数据仓库——拉链表
拉链表介绍
拉链表:维护历史状态,以及最新状态数据的一种表,拉链表根据拉链粒度的不同,实际上相当于快照,只不过做了优化,去除了一部分不变的记录,通过拉链表可以很方便的还原出拉链时点的客户记录
拉链表的应用场景
数据仓库的数据模型设计过程中,经常会遇到这样的需求:
表中的部分字段会被update,例如: 用户的地址,产品的描述信息,品牌信息等等; 需要查看某一个时间点或者时间段的历史快照信息,例如: 查看某一个产品在历史某一时间点的状态 查看某一个用户在过去某一段时间内,更新过几次等等 变化的比例和频率不是很大,例如: 总共有1000万的会员,每天新增和发生变化的有10万左右
拉链表案例
需求
将MySQL中的商品表数据用拉链表的形式抽取到Hive数据仓库中
环境准备
-
在 mysql 中创建表和加载数据
-- 创建MySQL数据库 CREATE DATABASE demo DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci; -- 创建商品表 create table if not exists `demo`.`t_product`( goods_id varchar(50), -- 商品编号 goods_status varchar(50), -- 商品状态 createtime varchar(50), -- 商品创建时间 modifytime varchar(50) -- 商品修改时间 ); -- 导入12-20数据 insert into `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) values ('001', '待审核', '2019-12-18', '2019-12-20'), ('002', '待售', '2019-12-19', '2019-12-20'), ('003', '在售', '2019-12-20', '2019-12-20'), ('004', '已删除', '2019-12-15', '2019-12-20'); -- 全量同步结束后,在导入下面的数据 -- 导入12-21数据 UPDATE `demo`.`t_product` SET goods_status = '待售', modifytime = '2019-12-21' WHERE goods_id = '001'; INSERT INTO `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) VALUES ('005', '待审核', '2019-12-21', '2019-12-21'), ('006', '待审核', '2019-12-21', '2019-12-21');
-
开启 Hive 服务
# 启动hive nohup hive --service metastore > /dev/null 2>&1 & nohup hive --service hiveserver2 > /dev/null 2>&1 & #启动beeline beeline # 连接Hive !connect jdbc:hive2://node1:10000 root 123456
-
启动
-- 删除之前存在的数据库, cascade是级联删除的意思,否则数据库下有数据,是不然删的. drop database demo cascade; -- 创建Hive的数据库 create database if not exists `demo`; -- 创建ods层表 create table if not exists `demo`.`ods_product`( goods_id string, -- 商品编号 goods_status string, -- 商品状态 createtime string, -- 商品创建时间 modifytime string -- 商品修改时间 ) partitioned by (dt string) row format delimited fields terminated by ',' stored as TEXTFILE; -- 创建拉链表 create table if not exists `demo`.`dw_product`( goods_id string, -- 商品编号 goods_status string, -- 商品状态 createtime string, -- 商品创建时间 modifytime string, -- 商品修改时间 dw_start_date string, -- 生效日期 dw_end_date string -- 失效日期 ) row format delimited fields terminated by ',' stored as TEXTFILE;
ETL导出方式
使用 sqoop
-
全量导入数据到拉链表
bin/sqoop import \ --connect jdbc:mysql://node1:3306/demo \ --username root \ --password 123456 \ --table t_product \ --delete-target-dir \ --target-dir /user/hive/warehouse/demo.ods/ods_product \ --fields-terminated-by ',' \ -m 1
-
Upsert数据到表
sqoop import \ --connect jdbc:mysql://node1:3306/demo \ --username root \ --password 123456 \ --table t_product \ --target-dir /user/hive/warehouse/demo.ods/ods_product \ --fields-terminated-by ',' \ --merge-key goods_id \ --incremental lastmodified \ --check-column lastmode \ --last-value '2021-01-06 17:25:25' \ -m 1
使用 kettle
-
-
将文件上传到 分区目录下
hdfs dfs -cat /user/hive/warehouse/demo.db/ods_product/dt=20191220/00.txt
-
需要修复分区才能看到数据
msck repair table ods_product;
将 ODS 层数据同步到 DW 层
因为目前DW层表有2个额外的字段: dw_start_date: 当前记录的生效时间 dw_end_date: 当前记录的失效时间,注意: 9999-12-31这个日期代表的是最新的数据记录. 本次是全量同步,我们全部设置如下: dw_start_date: 使用数据的修改时间作为本次记录的开始时间 dw_end_date: 失效时间为9999-12-31就可以了.
-- 全量加载ODS的数据到DW层 insert into table dw_product -- 查询ODS的所有数据 select ods.goods_id, -- 商品编号 ods.goods_status, -- 商品状态 ods.createtime, -- 商品创建时间 ods.modifytime, -- 商品的修改时间 -- 开始时间 ods.modifytime as dw_start_date, -- 结束时间 '9999-12-31' as dw_end_date from ods_product ods; -- 验证结果 select * from dw_product;
增量导入数据到拉链表
-
在 MySQL 中新增和修改数据
UPDATE `demo`.`t_product` SET goods_status = '待售', modifytime = '2019-12-21' WHERE goods_id = '001'; INSERT INTO `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) VALUES ('005', '待审核', '2019-12-21', '2019-12-21'), ('006', '待审核', '2019-12-21', '2019-12-21');
-
-
抽取数据到 HDFS
配置转换
-
MySQL查询增加查询条件
-
-
HDFS使用dt命名参数:
-
-
配置命名参数
-
-
修复分区数据
msck repair table ods_product; select * from ods_product;
-
-
增量同步ODS数据到DW
-- 整体分为2步走. -- 插入数据使用overwrite,这样可以将之前的数据覆盖掉. insert overwrite table dw_product -- 1. 先把之前的历史数据结束时间修改掉 select dw.goods_id, -- 商品编号 dw.goods_status, -- 商品状态 dw.createtime, -- 商品创建时间 dw.modifytime, -- 商品的修改时间 -- 开始时间 dw.dw_start_date, -- 结束时间 -- ods的商品ID不为null,并且dw的结束时间为9999-12-31 case when (ods.goods_id is not null and dw.dw_end_date = '9999-12-31') -- 条件满足修改结束时间 then ods.modifytime -- 如果条件不满足,那么我们就不修改结束时间 else dw.dw_end_date end as dw_end_date from -- 拉链表 dw_product dw -- 左外连接ODS新增数据 left join (select * from ods_product where dt='20191221') ods on dw.goods_id = ods.goods_id -- 2. 关联本次新增数据 union all select ods.goods_id, -- 商品编号 ods.goods_status, -- 商品状态 ods.createtime, -- 商品创建时间 ods.modifytime, -- 商品的修改时间 -- 开始时间 ods.modifytime as dw_start_date, -- 结束时间 '9999-12-31' as dw_end_date from ods_product ods where dt = '20191221'; -- 验证结果 select * from dw_product; -- 获取拉链表中最新的商品信息 select * from dw_product where dw_end_date = '9999-12-31';