0
点赞
收藏
分享

微信扫一扫

等保测评考试重点题库分享上

案例背景

大数据技用可以海量的用为数据中掘分析根据分析结化平台的服务质最终满足用的需求。教育大据分析平台项目就是据技术应用于培训领业经营提供据支撑。

案例数据产生流程:

案例数据流转:

案例需求:

  1. 建立集团数仓库一集团数据中心把分散的业务数理和存
  2. 根据业务分析需要海量的用为数据中掘分析定制多维据集合形成数据集市供各个场景主题使用。
  3. 前端业务数据展示选择和控制取合适的前端统计分析结展示工具。

 案例架构:

 案例工具:

案例数据架构:

将调研需求转换为开发需求

 访问和咨询用户数据模块:

意向用户看模块:

有效线索模块:

报名用户模块:

学生出勤模块:

案例模块分析:

访问和咨询用户数据模块(全量分析):

需求指标:

总体分析: 

数据准备: 将原始数据加载到本地MySQL数据库中

访问客户量的数据来源于咨询系统的访问会话月表web_chat_ems:

表名的格式为web_chat_ems_年_月,年份为4位数字,月份为二位数字,如果为单数时,前面会用0来补全,比如web_chat_ems_2019_07

web_chat_text_ems表是访问附属月表

表名的格式和web_chat_ems相同。web_chat_ems和web_chat_text_ems是一一对应的,通过主键id进行关联。

 在当前数据库下执行sql脚本文件,将数据导入即可

​​​​​​​点击:访问和咨询用户数据模块sql文件

 建模分析:

 ODS层: 源数据层
 DIM层: 维度层
DWD层: 明细层
DWM层: 中间层 (省略)
DWS层: 业务层
DA层:

建模操作(建表):

 ODS层:将MySQL数据库中数据导入hive中
CREATE DATABASE IF NOT EXISTS `itcast_ods`;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;

-- 访问咨询表
CREATE EXTERNAL TABLE IF NOT EXISTS itcast_ods.web_chat_ems (
  id INT comment '主键',
  create_date_time STRING comment '数据创建时间',
  session_id STRING comment '七陌sessionId',
  sid STRING comment '访客id',
  create_time STRING comment '会话创建时间',
  seo_source STRING comment '搜索来源',
  seo_keywords STRING comment '关键字',
  ip STRING comment 'IP地址',
  area STRING comment '地域',
  country STRING comment '所在国家',
  province STRING comment '省',
  city STRING comment '城市',
  origin_channel STRING comment '投放渠道',
  user_match STRING comment '所属坐席',
  manual_time STRING comment '人工开始时间',
  begin_time STRING comment '坐席领取时间 ',
  end_time STRING comment '会话结束时间',
  last_customer_msg_time_stamp STRING comment '客户最后一条消息的时间',
  last_agent_msg_time_stamp STRING comment '坐席最后一下回复的时间',
  reply_msg_count INT comment '客服回复消息数',
  msg_count INT comment '客户发送消息数',
  browser_name STRING comment '浏览器名称',
  os_info STRING comment '系统名称')
comment '访问会话信息表'
PARTITIONED BY(starts_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
location '/user/hive/warehouse/itcast_ods.db/web_chat_ems_ods'
TBLPROPERTIES ('orc.compress'='ZLIB');

-- 访问咨询附属表
CREATE EXTERNAL TABLE IF NOT EXISTS itcast_ods.web_chat_text_ems (
  id INT COMMENT '主键来自MySQL',
  referrer STRING comment '上级来源页面',
  from_url STRING comment '会话来源页面',
  landing_page_url STRING comment '访客着陆页面',
  url_title STRING comment '咨询页面title',
  platform_description STRING comment '客户平台信息',
  other_params STRING comment '扩展字段中数据',
  history STRING comment '历史访问记录'
) comment 'EMS-PV测试表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t'
stored as orc
location '/user/hive/warehouse/itcast_ods.db/web_chat_text_ems_ods'
TBLPROPERTIES ('orc.compress'='ZLIB');
DWD层:
CREATE DATABASE IF NOT EXISTS `itcast_dwd`;

create table if not exists itcast_dwd.visit_consult_dwd(
  session_id STRING comment '七陌sessionId',
  sid STRING comment '访客id',
  create_time bigint comment '会话创建时间',
  seo_source STRING comment '搜索来源',
  ip STRING comment 'IP地址',
  area STRING comment '地域',
  msg_count int comment '客户发送消息数',
  origin_channel STRING COMMENT '来源渠道',
  referrer STRING comment '上级来源页面',
  from_url STRING comment '会话来源页面',
  landing_page_url STRING comment '访客着陆页面',
  url_title STRING comment '咨询页面title',
  platform_description STRING comment '客户平台信息',
  other_params STRING comment '扩展字段中数据',
  history STRING comment '历史访问记录',
  hourinfo string comment '小时'
)
comment '访问咨询DWD表'
partitioned by(yearinfo String,quarterinfo string, monthinfo String, dayinfo string)
row format delimited fields terminated by '\t'
stored as orc
location '/user/hive/warehouse/itcast_dwd.db/visit_consult_dwd'
tblproperties ('orc.compress'='SNAPPY');
 DWS层
CREATE DATABASE IF NOT EXISTS `itcast_dws`;
-- 访问量统计结果表
CREATE TABLE IF NOT EXISTS itcast_dws.visit_dws (
  sid_total INT COMMENT '根据sid去重求count',
  sessionid_total INT COMMENT '根据sessionid去重求count',
  ip_total INT COMMENT '根据IP去重求count',
  area STRING COMMENT '区域信息',
  seo_source STRING COMMENT '搜索来源',
  origin_channel STRING COMMENT '来源渠道',
  hourinfo STRING COMMENT '创建时间,统计至小时',
  time_str STRING COMMENT '时间明细',
  from_url STRING comment '会话来源页面',
  groupType STRING COMMENT '产品属性类型:1.地区;2.搜索来源;3.来源渠道;4.会话来源页面;5.总访问量',
  time_type STRING COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;')
comment 'EMS访客日志dws表'
PARTITIONED BY(yearinfo STRING,quarterinfo STRING,monthinfo STRING,dayinfo STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
location '/user/hive/warehouse/itcast_dws.db/visit_dws'
TBLPROPERTIES ('orc.compress'='SNAPPY');

-- 咨询量统计结果表
CREATE TABLE IF NOT EXISTS itcast_dws.consult_dws
(
  sid_total INT COMMENT '根据sid去重求count',
  sessionid_total INT COMMENT '根据sessionid去重求count',
  ip_total INT COMMENT '根据IP去重求count',
  area STRING COMMENT '区域信息',
  origin_channel STRING COMMENT '来源渠道',
  hourinfo STRING COMMENT '创建时间,统计至小时',
  time_str STRING COMMENT '时间明细',
  groupType STRING COMMENT '产品属性类型:1.地区;2.来源渠道',
  time_type STRING COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;'
)
COMMENT '咨询量DWS宽表'
PARTITIONED BY (yearinfo string,quarterinfo STRING, monthinfo STRING, dayinfo string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS ORC
LOCATION '/user/hive/warehouse/itcast_dws.db/consult_dws'
TBLPROPERTIES ('orc.compress'='SNAPPY');

数据采集

目的: 用sqoop将业务端(MySQL)的数据导入到ODS层对应表(hive)中。

注意:字段名字要一一对应,不对的要改名再导入。

-- 访问咨询主表
sqoop import \
--connect jdbc:mysql://192.168.52.150:3306/nev \
--username root \
--password 123456 \
--query 'SELECT
id,create_date_time,session_id,sid,create_time,seo_source,seo_keywords,ip,
AREA,country,province,city,origin_channel,USER AS user_match,manual_time,begin_time,end_time,
last_customer_msg_time_stamp,last_agent_msg_time_stamp,reply_msg_count,
msg_count,browser_name,os_info, "2021-09-24" AS starts_time
FROM web_chat_ems_2019_07 where 1=1 and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_ems \
-m 1 

-- 访问咨询附属表
sqoop import \
--connect jdbc:mysql://192.168.52.150:3306/nev \
--username root \
--password 123456 \
--query 'SELECT 
  *, "2021-09-24" AS start_time
FROM web_chat_text_ems_2019_07 where 1=1 and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_text_ems \
-m 1 

 校验数据是否导入成功:

1) 查看mysql共计有多少条数据
	SELECT COUNT(1) FROM web_chat_ems_2019_07; 211197
	SELECT COUNT(1) FROM web_chat_text_ems_2019_07; 105599
2) 到hive中对表查询一下一共多少条数据
	SELECT COUNT(1) FROM itcast_ods.web_chat_ems; 211197
	SELECT COUNT(1) FROM itcast_ods.web_chat_text_ems; 105599
3) 查询其中一部分数据, 观察数据映射是否OK
	select * from itcast_ods.web_chat_ems limit 10;
	SELECT * FROM itcast_ods.web_chat_text_ems limit 10;

数据清洗转换

 目的: 将ODS层数据导入到DWD层

--动态分区配置
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;


insert into table itcast_dwd.visit_consult_dwd partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    wce.session_id,
    wce.sid,
    unix_timestamp(wce.create_time) as create_time,  
    wce.seo_source,
    wce.ip,
    wce.area,
    wce.msg_count,
    wce.origin_channel,
    wcte.referrer,
    wcte.from_url,
    wcte.landing_page_url,
    wcte.url_title,
    wcte.platform_description,
    wcte.other_params,
    wcte.history,
    substr(wce.create_time,12,2) as hourinfo,
    substr(wce.create_time,1,4) as yearinfo, 
    quarter(wce.create_time) as quarterinfo,
    substr(wce.create_time,6,2) as monthinfo,
    substr(wce.create_time,9,2) as dayinfo
from itcast_ods.web_chat_ems wce join itcast_ods.web_chat_text_ems wcte
    on wce.id = wcte.id;

 数据分析

目的: 将DWD层数据灌入到DWS层,一张大表里面分了好几个需求,按照-1区分。

访问量表:
如何计算访问量:
以时间为基准,统计总访问量(5个需求)
-- 统计每年的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  yearinfo as time_str,
  '-1' as from_url,
  '5' as grouptype,
  '5' as time_type,
  yearinfo,
  '-1' as quarterinfo,
  '-1' as monthinfo,
  '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd
group by yearinfo;
-- 统计每年每季度的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  concat(yearinfo,'_',quarterinfo) as time_str,
  '-1' as from_url,
  '5' as grouptype,
  '4' as time_type,
  yearinfo,
  quarterinfo,
  '-1' as monthinfo,
  '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo;
-- 统计每年每季度每月的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  concat(yearinfo,'-',monthinfo) as time_str,
  '-1' as from_url,
  '5' as grouptype,
  '3' as time_type,
  yearinfo,
  quarterinfo,
  monthinfo,
  '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo;
-- 统计每年每季度每月每天的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
  '-1' as from_url,
  '5' as grouptype,
  '2' as time_type,
  yearinfo,
  quarterinfo,
  monthinfo,
  dayinfo
from  itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo;
-- 统计每年每季度每月每天每小时的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  hourinfo,
  concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
  '-1' as from_url,
  '5' as grouptype,
  '1' as time_type,
  yearinfo,
  quarterinfo,
  monthinfo,
  dayinfo
from  itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;
 基于时间统计各个受访页面的访问量(5个需求)
-- 统计每年各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  yearinfo as time_str,
  from_url,
  '4' as grouptype,
  '5' as time_type,
  yearinfo,
  '-1' as quarterinfo,
  '-1' as monthinfo,
  '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd
group by yearinfo,from_url;

-- 统计每年,每季度各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  concat(yearinfo,'_',quarterinfo) as time_str,
  from_url,
  '4' as grouptype,
  '4' as time_type,
  yearinfo,
  quarterinfo,
  '-1' as monthinfo,
  '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,from_url;

-- 统计每年,每季度,每月各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  concat(yearinfo,'-',monthinfo) as time_str,
  from_url,
  '4' as grouptype,
  '3' as time_type,
  yearinfo,
  quarterinfo,
  monthinfo,
  '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,from_url;

-- 统计每年,每季度,每月.每天各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
  from_url,
  '4' as grouptype,
  '2' as time_type,
  yearinfo,
  quarterinfo,
  monthinfo,
  dayinfo
from  itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,from_url;
-- 统计每年,每季度,每月.每天,每小时各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  hourinfo,
  concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
  from_url,
  '4' as grouptype,
  '1' as time_type,
  yearinfo,
  quarterinfo,
  monthinfo,
  dayinfo
from  itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,from_url;

咨询量表
如何计算咨询量:
基于时间统计总咨询量(5个需求)
-- 统计每年的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    yearinfo as time_str,
    '3' as grouptype,
    '5' as time_type,
    yearinfo,
    '-1' as quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo;
-- 统计每年每季度的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'_',quarterinfo) as time_str,
    '3' as grouptype,
    '4' as time_type,
    yearinfo,
    quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo;
-- 统计每年每季度每月的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo) as time_str,
    '3' as grouptype,
    '3' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo;
-- 统计每年每季度每月每天的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '3' as grouptype,
    '2' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from  itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo;
-- 统计每年每季度每月每天每小时的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as origin_channel,
    hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '3' as grouptype,
    '1' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from  itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;
基于时间,统计各个地区的咨询量(5个需求)

数据导出

 目的: 从hive的DWS层将数据导出到mysql中对应目标表中

第一步: 在mysql中创建目标表:创建数据库建表
create database scrm_bi default character set utf8mb4 collate utf8mb4_general_ci;

-- 访问量的结果表:
CREATE TABLE IF NOT EXISTS scrm_bi.visit_dws (
  sid_total INT COMMENT '根据sid去重求count',
  sessionid_total INT COMMENT '根据sessionid去重求count',
  ip_total INT COMMENT '根据IP去重求count',
  area varchar(32) COMMENT '区域信息',
  seo_source varchar(32) COMMENT '搜索来源',
  origin_channel varchar(32) COMMENT '来源渠道',
  hourinfo varchar(32) COMMENT '创建时间,统计至小时',
  time_str varchar(32) COMMENT '时间明细',
  from_url varchar(1000) comment '会话来源页面',
  groupType varchar(32) COMMENT '产品属性类型:1.地区;2.搜索来源;3.来源渠道;4.会话来源页面;5.总访问量',
  time_type varchar(32) COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;',
  yearinfo varchar(32) COMMENT '年' ,
  quarterinfo varchar(32) COMMENT '季度',
  monthinfo varchar(32) COMMENT '月',
  dayinfo varchar(32) COMMENT '天'
)comment 'EMS访客日志dws表';

-- 咨询量的结果表:
CREATE TABLE IF NOT EXISTS scrm_bi.consult_dws
(
  sid_total INT COMMENT '根据sid去重求count',
  sessionid_total INT COMMENT '根据sessionid去重求count',
  ip_total INT COMMENT '根据IP去重求count',
  area varchar(32) COMMENT '区域信息',
  origin_channel varchar(32) COMMENT '来源渠道',
  hourinfo varchar(32) COMMENT '创建时间,统计至小时',
  time_str varchar(32) COMMENT '时间明细',
  groupType varchar(32) COMMENT '产品属性类型:1.地区;2.来源渠道',
  time_type varchar(32) COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;',
  yearinfo varchar(32) COMMENT '年' ,
  quarterinfo varchar(32) COMMENT '季度',
  monthinfo varchar(32) COMMENT '月',
  dayinfo varchar(32) COMMENT '天'
)COMMENT '咨询量DWS宽表';
 第二步执行sqoop的数据导出
-- 先导出 咨询量数据
sqoop export \
--connect "jdbc:mysql://192.168.52.150:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table consult_dws \
--hcatalog-database itcast_dws \
--hcatalog-table consult_dws \
-m 1


-- 访问量数据导出
sqoop export \
--connect "jdbc:mysql://192.168.52.150:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table visit_dws \
--hcatalog-database itcast_dws \
--hcatalog-table visit_dws \
-m 1

访问和咨询用户数据模块(增量分析):

模拟新增数据,实际生产过程中由需求方自行提供新增数据:

-- 模拟访问咨询主表数据
CREATE TABLE web_chat_ems_2021_09 AS
SELECT *  FROM web_chat_ems_2019_07 WHERE  create_time BETWEEN '2019-07-01 00:00:00' AND '2019-07-01 23:59:59';

-- 修改表中的时间字段
UPDATE web_chat_ems_2021_09 SET create_time = CONCAT('2021-09-25 ',SUBSTR(create_time,12));


-- 模拟访问咨询附属表数据
CREATE TABLE web_chat_text_ems_2021_09 AS
SELECT 
  temp2.*
FROM
(SELECT *  FROM web_chat_ems_2019_07 
	WHERE  create_time BETWEEN '2019-07-01 00:00:00' AND '2019-07-01 23:59:59') temp1
	JOIN web_chat_text_ems_2019_07 temp2 ON temp1.id = temp2.id ;

 增量数据采集:

编写增量数据采集的shell脚本

edu_mode_1_collect.sh脚本:此脚本能够实现自动获取上一天的日期数据, 并且还支持采集指定日期下数据,将shell脚本放置到ooize中,完成自动化调度操作。

#!/bin/bash

export SQOOP_HOME=/usr/bin/sqoop

if [ $# == 1 ]
   then
      dateStr=$1
   else
      dateStr=`date -d '-1 day' +'%Y-%m-%d'`
fi

dateNowStr=`date +'%Y-%m-%d'`

yearStr=`date -d ${dateStr} +'%Y'`
monthStr=`date -d ${dateStr} +'%m'`

jdbcUrl='jdbc:mysql://192.168.52.150:3306/nev'
username='root'
password='123456'
m='1'

${SQOOP_HOME} import \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--query "SELECT 
id,create_date_time,session_id,sid,create_time,seo_source,seo_keywords,ip,
AREA,country,province,city,origin_channel,USER AS user_match,manual_time,begin_time,end_time,
last_customer_msg_time_stamp,last_agent_msg_time_stamp,reply_msg_count,
msg_count,browser_name,os_info, '${dateNowStr}' AS starts_time  
FROM web_chat_ems_${yearStr}_${monthStr} WHERE create_time BETWEEN '${dateStr} 00:00:00' AND '${dateStr} 
23:59:59' and \$CONDITIONS" \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_ems \
-m ${m}

${SQOOP_HOME} import \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--query "SELECT 
        temp2.*, '${dateNowStr}' AS start_time
FROM (SELECT id FROM web_chat_ems_${yearStr}_${monthStr} WHERE create_time BETWEEN '${dateStr} 00:00:00' 
AND '${dateStr} 23:59:59') temp1
        JOIN web_chat_text_ems_${yearStr}_${monthStr} temp2 ON temp1.id = temp2.id where 1=1 and \$CONDIT
IONS" \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_text_ems \
-m ${m}

测试脚本是否可以正常执行  

sh edu_mode_1_collect.sh 

可以在hive中查看对应分区下的数据, 如果有, 说明抽取成功了
	select * from itcast_ods.web_chat_ems where starts_time='2021-09-26' limit 10;

注意:starts_time是当天抽取的当天日期,抽取的数据库中的表的数据是前一天的,所以建表的时候要把ceratetimes字段改为前一天的。比如今天是2021-09-26,我创建的表的名字是

也可以在抽取表的时候带上日期参数,这样可以随机抽取任意一天的数据。 

将shell脚本放置到ooize中,完成自动化调度操作  

增量数据清洗转换

仅需要对新增的这一天数据进行清洗转换操作,因为此时数据粒度是一致的,不需要对之前的数据做处理。

 在shell下执行hive的SQL

./hive -e|-f 'sql语句|SQL脚本' -S

编写shell脚本, 实现增量数据清洗转换操作

#!/bin/bash

export HIVE_HOME=/usr/bin/hive

if [ $# == 1 ]
then
  dateStr=$1

  else
     dateStr=`date +'%Y-%m-%d'`
fi

sqlStr="
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set hive.exec.orc.compression.strategy=COMPRESSION;

insert into table itcast_dwd.visit_consult_dwd partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    wce.session_id,
    wce.sid,
    unix_timestamp(wce.create_time) as create_time,  
    wce.seo_source,
    wce.ip,
    wce.area,
    wce.msg_count,
    wce.origin_channel,
    wcte.referrer,
    wcte.from_url,
    wcte.landing_page_url,
    wcte.url_title,
    wcte.platform_description,
    wcte.other_params,
    wcte.history,
    substr(wce.create_time,12,2) as hourinfo,
    substr(wce.create_time,1,4) as yearinfo, 
    quarter(wce.create_time) as quarterinfo,
    substr(wce.create_time,6,2) as monthinfo,
    substr(wce.create_time,9,2) as dayinfo
from (select * from itcast_ods.web_chat_ems where starts_time='${dateStr}') wce join (select * from itcast_ods.web_chat_text_ems where start_time='${dateStr}') wcte   
    on wce.id = wcte.id;
"

${HIVE_HOME} -e "${sqlStr}" -S

测试脚本:

sh edu_mode_1_handle.sh

测试数据是否产生:

 配置工作流:

增量数据统计分析

第一张表25个需求,第二张表15个需求,有点多, 各选5条模拟一下:

dwd表是新增了上一天的数据,统计年的就把今年的全部统计,月的把这个月的全部统计,不是就统计前一天的,所以将昨天的数据加入后再统计一遍年的,月的,季度的,以此类推……

思考: 在统计的过程中, 比如以年统计, 得到一个新的年的统计结果, 那么在DWS层表中是不是还有一个历史的结果呢? 如何解决呢?

比如今天是2024-5-4,昨天已经统计了今年的数据了,今天又统计一遍,需要把昨天的历史数据删除,但hive不支持个别数据的删除。

 编写脚本:

#!/bin/bash
export HIVE_HOME=/usr/bin/hive

if [ $# == 1 ]

then
   dateStr=$1
   
   else 
      dateStr=`date -d '-1 day' +'%Y-%m-%d'`

fi

yearStr=`date -d ${dateStr} +'%Y'`

monthStr=`date -d ${dateStr} +'%m'`

month_for_quarter=`date -d ${dateStr} +'%-m'`
quarterStr=$((($month_for_quarter-1)/3+1))

dayStr=`date -d ${dateStr} +'%d'`


sqlStr="
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set hive.exec.orc.compression.strategy=COMPRESSION;

alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='-1',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='${monthStr}}',dayinfo='-1');

alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='-1',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='${monthStr}}',dayinfo='-1');

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  yearinfo as time_str,
  '-1' as from_url,
  '5' as grouptype,
  '5' as time_type,
  yearinfo,
  '-1' as quarterinfo,
  '-1' as monthinfo,
  '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}'
group by yearinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  concat(yearinfo,'_',quarterinfo) as time_str,
  '-1' as from_url,
  '5' as grouptype,
  '4' as time_type,
  yearinfo,
  quarterinfo,
  '-1' as monthinfo,
  '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}' and quarterinfo='${quarterStr}'
group by yearinfo,quarterinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  concat(yearinfo,'-',monthinfo) as time_str,
  '-1' as from_url,
  '5' as grouptype,
  '3' as time_type,
  yearinfo,
  quarterinfo,
  monthinfo,
  '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd where  yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}'
group by yearinfo,quarterinfo,monthinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  '-1' as hourinfo,
  concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
  '-1' as from_url,
  '5' as grouptype,
  '2' as time_type,
  yearinfo,
  quarterinfo,
  monthinfo,
  dayinfo
from  itcast_dwd.visit_consult_dwd where  yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
  count(distinct sid) as  sid_total,
  count(distinct session_id) as sessionid_total,
  count(distinct ip) as ip_total,
  '-1' as area,
  '-1' as seo_source,
  '-1' as origin_channel,
  hourinfo,
  concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
  '-1' as from_url,
  '5' as grouptype,
  '1' as time_type,
  yearinfo,
  quarterinfo,
  monthinfo,
  dayinfo
from  itcast_dwd.visit_consult_dwd where  yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;


insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    yearinfo as time_str,
    '1' as grouptype,
    '5' as time_type,
    yearinfo,
    '-1' as quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}'
group by yearinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'_',quarterinfo) as time_str,
    '1' as grouptype,
    '4' as time_type,
    yearinfo,
    quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}'
group by yearinfo,quarterinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo) as time_str,
    '1' as grouptype,
    '3' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    '-1' as dayinfo
from  itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}'
group by yearinfo,quarterinfo,monthinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '1' as grouptype,
    '2' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from  itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select  
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '1' as grouptype,
    '1' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from  itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,area;

"

${HIVE_HOME} -e "${sqlStr}" -S

测试:

sh  edu_mode_1_analyse.sh

 在hive中测试:

select  * from itcast_dws.visit_dws where  yearinfo='2024';

上传到工作流:

 

增量数据导出操作

编写脚本:

#!/bin/bash

export SQOOP_HOME=/usr/bin/sqoop

if [ $# == 1 ]
then
   TD_DATE=$1  
else
   TD_DATE=`date -d '-1 day' +'%Y-%m-%d'`
fi

TD_YEAR=`date -d ${TD_DATE} +%Y`

mysql -uroot -p123456 -h192.168.52.150 -P3306 -e "delete from scrm_bi.visit_dws where yearinfo='$TD_YEAR'; delete from scrm_bi.consult_dws where yearinfo='$TD_YEAR';"

jdbcUrl='jdbc:mysql://192.168.52.150:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8'
username='root'
password='123456'

${SQOOP_HOME} export \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--table visit_dws \
--hcatalog-database itcast_dws \
--hcatalog-table visit_dws \
--hcatalog-partition-keys yearinfo \
--hcatalog-partition-values $TD_YEAR \
-m 1

${SQOOP_HOME} export \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--table consult_dws \
--hcatalog-database itcast_dws \
--hcatalog-table consult_dws \
--hcatalog-partition-keys yearinfo \
--hcatalog-partition-values $TD_YEAR \
-m 1

测试:在本地MySQL中:

select * from scrm_bi.visit_dws where yearinfo='2024';

 上传到工作流:

 

 访问和咨询用户数据模块工作到此结束,后续模块开发中……

举报

相关推荐

0 条评论