一、分析
针对ods层表中的数据进行清洗,参考数据清洗规则,按照实际情况对数据进行清洗。
 由于数据库中的数据都是比较规整的,其实可以直接迁移到dwd层,不过为了以防万一,还是对ods层的数据进行过滤,主要过滤表中的id字段为null的数据,在关系型数据库中表中的id字段都是主键,肯定是不为Null的,我们在这里进行判断主要是为了避免数据在采集过程中出现问题。
二、构建dwd层
1、dwd_user
(1)源表
ods_user
 
(2)建表语句
create external table if not exists dwd_mall.dwd_user(
   user_id              bigint,
   user_name            string,
   user_gender          tinyint,
   user_birthday        string,
   e_mail               string,
   mobile               string,
   register_time        string,
   is_blacklist         tinyint
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/user/';
 
(3) 映射关系
insert overwrite table dwd_mall.dwd_user partition(dt='20220309')  select 
   user_id,
   user_name,
   user_gender,
   user_birthday,
   e_mail,
   mobile,
   register_time,
   is_blacklist
from ods_mall.ods_user
where dt = '20220309' and user_id is not null;
 
2、dwd_user_extend
(1)源表
ods_user_extend
 
(2)建表语句
create external table if not exists dwd_mall.dwd_user_extend(
   user_id              bigint,
   is_pregnant_woman    tinyint,
   is_have_children     tinyint,
   is_have_car          tinyint,
   phone_brand          string,
   phone_cnt            int,
   change_phone_cnt     int,
   weight               int,
   height               int
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/user_extend/';
 
(3) 映射关系
insert overwrite table dwd_mall.dwd_user_extend partition(dt='20220309')  select 
   user_id,
   is_pregnant_woman,
   is_have_children,
   is_have_car,
   phone_brand,
   phone_cnt,
   change_phone_cnt,
   weight,
   height
from ods_mall.ods_user_extend
where dt = '20220309' and user_id is not null;
 
3、dwd_user_addr
(1)源表
ods_user_addr
 
(2)建表语句
create external table if not exists dwd_mall.dwd_user_addr(
   addr_id              bigint,
   user_id              bigint,
   addr_name            string,
   order_flag           tinyint,
   user_name            string,
   mobile               string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/user_addr/';
 
(3) 映射关系
insert overwrite table dwd_mall.dwd_user_addr partition(dt='20220309')  select 
   addr_id,
   user_id,
   addr_name,
   order_flag,
   user_name,
   mobile
from ods_mall.ods_user_addr
where dt = '20220309' and addr_id is not null;
 
4、dwd_goods_info
(1)源表
ods_goods_info
 
(2)建表语句
create external table if not exists dwd_mall.dwd_goods_info(
   goods_id             bigint,
   goods_no             string,
   goods_name           string,
   curr_price           double,
   third_category_id    int,
   goods_desc           string,
   create_time          string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/goods_info/';
 
(3) 映射关系
insert overwrite table dwd_mall.dwd_goods_info partition(dt='20220309')  select 
   goods_id,
   goods_no,
   goods_name,
   curr_price,
   third_category_id,
   goods_desc,
   create_time
from ods_mall.ods_goods_info
where dt = '20220309' and goods_id is not null;
 
5、dwd_category_code
(1)源表
ods_category_code
 
(2)建表语句
create external table if not exists dwd_mall.dwd_category_code(
   first_category_id    int,
   first_category_name  string,
   second_category_id   int,
   second_catery_name   string,
   third_category_id    int,
   third_category_name  string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/category_code/';
 
(3) 映射关系
insert overwrite table dwd_mall.dwd_category_code partition(dt='20220309')  select 
   first_category_id,
   first_category_name,
   second_category_id,
   second_catery_name,
   third_category_id,
   third_category_name
from ods_mall.ods_category_code
where dt = '20220309' and first_category_id is not null;
 
6、dwd_user_order
(1)源表
ods_user_order
 
(2)建表语句
create external table if not exists dwd_mall.dwd_user_order(
   order_id             bigint,
   order_date           string,
   user_id              bigint,
   order_money          double,
   order_type           int,
   order_status         int,
   pay_id               bigint,
   update_time          string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/user_order/';
 
(3) 映射关系
insert overwrite table dwd_mall.dwd_user_order partition(dt='20220309')  select 
   order_id,
   order_date,
   user_id,
   order_money,
   order_type,
   order_status,
   pay_id,
   update_time 
from ods_mall.ods_user_order
where dt = '20220309' and order_id is not null;
 
7、dwd_order_item
(1)源表
ods_order_item
 
(2)建表语句
create external table if not exists dwd_mall.dwd_order_item(
   order_id             bigint,
   goods_id             bigint,
   goods_amount         int,
   curr_price           double,
   create_time          string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/order_item/';
 
(3) 映射关系
insert overwrite table dwd_mall.dwd_order_item partition(dt='20220309')  select 
   order_id,
   goods_id,
   goods_amount,
   curr_price,
   create_time
from ods_mall.ods_order_item
where dt = '20220309' and order_id is not null;
 
8、dwd_order_delivery
(1)源表
ods_order_delivery
 
(2)建表语句
create external table if not exists dwd_mall.dwd_order_delivery(
   order_id             bigint,
   addr_id              bigint,
   user_id              bigint,
   carriage_money       double,
   create_time          string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/order_delivery/';
 
(3) 映射关系
insert overwrite table dwd_mall.dwd_order_delivery partition(dt='20220309')  select 
   order_id,
   addr_id,
   user_id,
   carriage_money,
   create_time
from ods_mall.ods_order_delivery
where dt = '20220309' and order_id is not null;
 
9、dwd_payment_flow
(1)源表
ods_payment_flow
 
(2)建表语句
create external table if not exists dwd_mall.dwd_payment_flow(
   pay_id               bigint,
   order_id             bigint,
   trade_no             bigint,
   pay_money            double,
   pay_type             int,
   pay_time             string
)partitioned by(dt string) 
 row format delimited   
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/payment_flow/';
 
(3) 映射关系
insert overwrite table dwd_mall.dwd_payment_flow partition(dt='20220309')  select 
   pay_id,
   order_id,
   trade_no,
   pay_money,
   pay_type,
   pay_time
from ods_mall.ods_payment_flow
where dt = '20220309' and order_id is not null;
 
三、抽取脚本

1、初始化表的脚本(执行一次)
dwd_mall_init_table.sh
内容如下:
#!/bin/bash
# dwd层数据库和表初始化脚本,只需要执行一次即可
hive -e "
create database if not exists dwd_mall;
create external table if not exists dwd_mall.dwd_user(
   user_id              bigint,
   user_name            string,
   user_gender          tinyint,
   user_birthday        string,
   e_mail               string,
   mobile               string,
   register_time        string,
   is_blacklist         tinyint
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/user/';
 
 
create external table if not exists dwd_mall.dwd_user_extend(
   user_id              bigint,
   is_pregnant_woman    tinyint,
   is_have_children     tinyint,
   is_have_car          tinyint,
   phone_brand          string,
   phone_cnt            int,
   change_phone_cnt     int,
   weight               int,
   height               int
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/user_extend/';
 
create external table if not exists dwd_mall.dwd_user_addr(
   addr_id              bigint,
   user_id              bigint,
   addr_name            string,
   order_flag           tinyint,
   user_name            string,
   mobile               string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/user_addr/';
 
 
create external table if not exists dwd_mall.dwd_goods_info(
   goods_id             bigint,
   goods_no             string,
   goods_name           string,
   curr_price           double,
   third_category_id    int,
   goods_desc           string,
   create_time          string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/goods_info/';
 
 
create external table if not exists dwd_mall.dwd_category_code(
   first_category_id    int,
   first_category_name  string,
   second_category_id   int,
   second_catery_name   string,
   third_category_id    int,
   third_category_name  string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/category_code/';
create external table if not exists dwd_mall.dwd_user_order(
   order_id             bigint,
   order_date           string,
   user_id              bigint,
   order_money          double,
   order_type           int,
   order_status         int,
   pay_id               bigint,
   update_time          string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/user_order/';
 
create external table if not exists dwd_mall.dwd_order_item(
   order_id             bigint,
   goods_id             bigint,
   goods_amount         int,
   curr_price           double,
   create_time          string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/order_item/';
 
create external table if not exists dwd_mall.dwd_order_delivery(
   order_id             bigint,
   addr_id              bigint,
   user_id              bigint,
   carriage_money       double,
   create_time          string
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/order_delivery/';
create external table if not exists dwd_mall.dwd_payment_flow(
   pay_id               bigint,
   order_id             bigint,
   trade_no             bigint,
   pay_money            double,
   pay_type             int,
   pay_time             string
)partitioned by(dt string) 
 row format delimited   
 fields terminated by '\t'
 location 'hdfs://bigdata01:9000/data/dwd/payment_flow/';
"
 
2、添加数据分区脚本(每天执行一次)
dwd_mall_add_partition.sh
 内容如下:
#!/bin/bash
# 基于ods层的表进行清洗,将清洗之后的数据添加到dwd层对应表的对应分区中
# 每天凌晨执行一次
# 默认获取昨天的日期,也支持传参指定一个日期
if [ "z$1" = "z" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
hive -e "
 
insert overwrite table dwd_mall.dwd_user partition(dt='${dt}')  select 
   user_id,
   user_name,
   user_gender,
   user_birthday,
   e_mail,
   mobile,
   register_time,
   is_blacklist
from ods_mall.ods_user
where dt = '${dt}' and user_id is not null;
 
 
insert overwrite table dwd_mall.dwd_user_extend partition(dt='${dt}')  select 
   user_id,
   is_pregnant_woman,
   is_have_children,
   is_have_car,
   phone_brand,
   phone_cnt,
   change_phone_cnt,
   weight,
   height
from ods_mall.ods_user_extend
where dt = '${dt}' and user_id is not null;
 
insert overwrite table dwd_mall.dwd_user_addr partition(dt='${dt}')  select 
   addr_id,
   user_id,
   addr_name,
   order_flag,
   user_name,
   mobile
from ods_mall.ods_user_addr
where dt = '${dt}' and addr_id is not null;
 
 
insert overwrite table dwd_mall.dwd_goods_info partition(dt='${dt}')  select 
   goods_id,
   goods_no,
   goods_name,
   curr_price,
   third_category_id,
   goods_desc,
   create_time
from ods_mall.ods_goods_info
where dt = '${dt}' and goods_id is not null;
 
insert overwrite table dwd_mall.dwd_category_code partition(dt='${dt}')  select 
   first_category_id,
   first_category_name,
   second_category_id,
   second_catery_name,
   third_category_id,
   third_category_name
from ods_mall.ods_category_code
where dt = '${dt}' and first_category_id is not null;
 
insert overwrite table dwd_mall.dwd_user_order partition(dt='${dt}')  select 
   order_id,
   order_date,
   user_id,
   order_money,
   order_type,
   order_status,
   pay_id,
   update_time 
from ods_mall.ods_user_order
where dt = '${dt}' and order_id is not null;
 
insert overwrite table dwd_mall.dwd_order_item partition(dt='${dt}')  select 
   order_id,
   goods_id,
   goods_amount,
   curr_price,
   create_time
from ods_mall.ods_order_item
where dt = '${dt}' and order_id is not null;
insert overwrite table dwd_mall.dwd_order_delivery partition(dt='${dt}')  select 
   order_id,
   addr_id,
   user_id,
   carriage_money,
   create_time
from ods_mall.ods_order_delivery
where dt = '${dt}' and order_id is not null;
insert overwrite table dwd_mall.dwd_payment_flow partition(dt='${dt}')  select 
   pay_id,
   order_id,
   trade_no,
   pay_money,
   pay_type,
   pay_time
from ods_mall.ods_payment_flow
where dt = '${dt}' and order_id is not null;
"
 
四、执行脚本
1、先执行初始化脚本
sh dwd_mall_init_table.sh
 

2、再执行添加分区脚本
sh dwd_mall_add_partition.sh 20220309
 

这个要等一会,大概10分钟左右,如下就好了:
 
五、验证
连接到hive,随便查一张表,检查是否有数据。
select * from dwd_mall.dwd_user limit 1;
 











