文章目录
ETL
现阶段,越来越多的公司会以数据作为主要的驱动和战略方向,通过大数据技术从数据中挖掘巨大的经济价值,让数据说话,引导公司领导层发现产品问题,做出决策。
随着数据仓库的普及,ETL成了一个众所周知的词,它是三个单词:extract,transform和load的缩写,指的是数据抽取,数据转换和数据加载。
随着ETL工具的普及,ETL开发的难度也随着降低,因此对ETL开发工程师的要求也会随之被降低。把ETL比喻为快递配送,从下面几个方面,我们可以比较一下ETL和快递配送。对于快递配送,首要的目的就是将货物完好无损的送到客户手上,千万不要送错地方,也不能把货物损坏。而ETL也是一样,我们需要把数据准确的加载到目标库,也就是说我们必须保证数据质量。
1.1、岗位所需技能点
- mysql
- sql
- Hadoop
- spark
- 数据仓库
- ETL
- azkaban
数据分析师
2.1、岗位所需技能点
- sql
- hadoop
- hive
- excel
- ppt
- R
数据仓库
3.1、岗位所需技能点
- 数据仓库
- ETL
- Hadoop
- hive
- spark
- sql
1、数据仓库
英文名称为Data Warehouse,可简写为DW或DWH。数据仓库的目的是构建面向分析的集成化数据环境,为企业提供决策支持(Decision Support)。它出于分析性报告和决策支持目的而创建。
1.1、数据仓库与数据库区别
数据库与数据仓库的区别实际讲的是 OLTP 与 OLAP 的区别。
操作型处理,叫联机事务处理 OLTP(On-Line Transaction Processing,),也可以称面向交易的处理系统,它是针对具体业务在数据库联机的日常操作,通常对少数记录进行查询、修改。用户较为关心操作的响应时间、数据的安全性、完整性和并发支持的用户数等问题。传统的数据库系统作为数据管理的主要手段,主要用于操作型处理。
分析型处理,叫联机分析处理 OLAP(On-Line Analytical Processing)一般针对某些主题的历史数据进行分析,支持管理决策。
- 数据库是面向事务的设计,数据仓库是面向主题设计的。
- 数据库一般存储业务数据,数据仓库存储的一般是历史数据。
- 数据库设计是尽量避免冗余,一般针对某一业务应用进行设计,比如一张简单的User表,记录用户名、密码等简单数据即可,符合业务应用,但是不符合分析。数据仓库在设计是有意引入冗余,依照分析需求,分析维度、分析指标进行设计。
- 数据库是为捕获数据而设计,数据仓库是为分析数据而设计。
1.2、数据仓库分层架构
按照数据流入流出的过程,数据仓库架构可分为三层——源数据、数据仓库、数据应用。
-
源数据层(ODS):此层数据无任何更改,直接沿用外围系统数据结构和数据,不对外开放;为临时存储层,是接口数据的临时存储区域,为后一步的数据处理做准备。
-
数据仓库层(DW):也称为细节层,DW层的数据应该是一致的、准确的、干净的数据,即对源系统数据进行了清洗(去除了杂质)后的数据。
-
数据应用层(DA或APP):前端应用直接读取的数据源;根据报表、专题分析需求而计算生成的数据。
数据仓库从各数据源获取数据及在数据仓库内的数据转换和流动都可以认为是ETL(抽取Extra, 转化Transfer, 装载Load)的过程,ETL是数据仓库的流水线,也可以认为是数据仓库的血液,它维系着数据仓库中数据的新陈代谢,而数据仓库日常的管理和维护工作的大部分精力就是保持ETL的正常和稳定。
1.3 、为什么要分层
分层的主要原因是在管理数据的时候,能对数据有一个更加清晰的掌控,详细来讲,主要有下面几个原因:
清晰数据结构:
每一个数据分层都有它的作用域,这样我们在使用表的时候能更方便地定位和理解。
数据血缘追踪:
简单来说,我们最终给业务呈现的是一个能直接使用业务表,但是它的来源有很多,如果有一张来源表出问题了,我们希望能够快速准确地定位到问题,并清楚它的危害范围。
减少重复开发:
规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算。
把复杂问题简单化:
将一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解。而且便于维护数据的准确性,当数据出现问题之后,可以不用修复所有的数据,只需要从有问题的步骤开始修复。
屏蔽原始数据的异常:
屏蔽业务的影响,不必改一次业务就需要重新接入数据
1.4、维度数据模型
1.4.1、 简介
维度数据模型简称为维度模型,用于 数据仓库 设计
- 不同于关系数据模型,维度模型不一定要引入关系数据库
- 在逻辑上相同的维度模型,可以被用于多种物理形式,比如
- 维度数据或是简单的文本文件
- 维度模型是一种趋向于支持最终用户对数据仓库进行查询的设计技术,是围绕性能和易理解性构建的
- 关系模型对于事务处理系统表现非常出色,但它并不是面向最终用户的
1.4.2 维度数据模型的特点
- 易理解
- 更容易理解且更直观
- 信息按业务种类或维度进行分组,提高可读性,便于对数据含义的解释
- 简化模型,访问数据库更高效
- 而关系模型,数据被分布到多个离散的实体中
- 高性能
- 维度模型更倾向于非规范化,这样可以优化查询的性能
- 关系模型时,规范化的实质是减少数据冗余,以优化事务处理数据更新的性能
- 维度设计的整体观点是要简化、加速查询
- 可扩展
- 维度模型是可扩展的
- 允许数据冗余,向一个维度表或事实表中添加字段时,不会像关系模型那样产生巨大影响
- 基于数据仓库的查询和应用不需要做过多改变就能适应表结构的变化,老的查询和应用会继续工作而不会产生错误的结果
1.4.3 事实和维度
- 事实和维度是两个维度模型中的核心概念
- 事实表示对业务数据的度量,维度是观察数据的角度
- 事实通常是数字类型的,可以进行聚合和计算
- 维度通常是一组层次关系或描述信息,用来定义事实。例如:
- 销售金额是一个事实
- 销售时间、销售的产品、购买的顾客、商店都是销售事实的维度
- 维度模型按照 业务流程领域 即主题域建立,例如:
- 进货
- 销售
- 库存
- 配送
- 不同主题域可能共享某些维度,为了提高数据操作的性能和数据一致性,需要使用一致性维度
1.4.4 维度数据模型建模过程(Kimball)
- 通常以星型模型构建
- 一个事实表
- 周围环绕多个维度表
- 建模过程
- 选择业务流程
- 声明粒度
- 确认维度
- 确认事实
1.4.4.1 选择业务流程
- 确认哪些业务处理流程是数仓要覆盖的,是维度建模的基础
- 建模的第一个步骤:描述需要建模的业务流程
- 描述业务流程
- 使用简单的纯文本描述
- 业务流程建模标注(BPMN)——工作流的工具
1.4.4.2 声明粒度
- 确认数据的粒度,其实就是对应不同的维度的粒度
- 建议从原始粒度数据开始设计
- 例如:
- 时间粒度——秒级
- 地域粒度——乡镇级
- 商品分类——三级分类
1.4.4.3 维度建模过程 - 确认维度
- 维度的粒度必须和声明的粒度一致
- 维度表是事实表的基础,也说明了事实表的数据是从哪里采集来的
- 典型的维度都是名词
- 日期
- 商店
- 库存等
- 维度表存储了某一维度的所有相关数据,例如:日期维度应该包含:
- 年
- 季度
- 月
- 周
- 日
1.4.4.4 维度建模过程 - 确认事实
- 识别数字化的度量,构成事实表的记录
- 用户通过对事实表的访问获取数据仓库存储的数据
- 大部分事实表的度量都是数值类型的,可累加,可计算
1.4.5 维度规范化
- 与关系模型类似,维度也可以进行规范化
- 维度的规范化(又叫雪花化),可以去除冗余属性
- 一个非规范化维度对应一个维度表,规范化后,一个维度会对应多个维度表,维度被严格地以子维度的形式连接到一起
- 维度规范化后的结构等同于一个低范式级别的关系型结构
- 设计维度数据模型时,会因为如下原因而不对维度做规范化处理
- 规范化会增加表的数量,使结构跟复杂
- 不可避免的多表连接,使查询跟复杂
- 不适合使用位图索引
- 查询性能原因
总体来说,但多个维度共用某些通用的属性时,做规范化是有益的。例如:客户和供应商都有省、市、区县、街道等地理位置的属性,此时分离出一个地区属性就比较合适
1.4.6 星型模型
- 星型模型是维度模型最简单的形式,是数据仓库以及数据集市开发中使用最广泛的形式
- 星型模式由以下两类表构成
- 事实表
- 维度表
- 一个星型模式中可以有一个或多个事实表,每个事实表应用任意数量的维度表
- 中心是一个事实表,围绕表周围的维度表示星星的放射状分支
- 星型模式将业务流程分为事实和维度
- 事实包含业务的度量,如:销售价格、销售数量、距离、速度、重量都是事实
- 维度是对事实数据属性的描述,如:
- 日期
- 产品
- 客户
- 地址位置等是维度
- 事实表
- 记录特定事件的数字化的考量
- 由数字值和指向维度的外键组成
- 通常会把事实表的粒度级别设计得比较低,使得事实表可以记录很原始的操作型事件
- 三种类型
- 事务事实表。记录特定事件的事实,如:销售
- 快照事实表。记录给定时间点的事实,如:月底账户余额
- 累积事实表。记录给定时间点的聚合事实,如:当月总的销售金额
- 维度表
- 维度表的记录数通常比事实表少,但每条记录包含有大量用于描述事实数据的属性字段
- 维度表可以定义各种各样的特性,以下是几种最常用的维度表
- 时间维度表
- 地理维度表
- 产品维度表
- 人员维度表
- 范围维度表
- 优点
- 星型模式是非规范化的。在星型模式的设计开发过程中,不受应用于事务性关系数据库的范式规则的约束
- 简化查询。查询数据时,星型模型的连接逻辑比较简单,而从高度规范化的事务模型查询数据时,往往需要更多的表连接
- 简化业务报表逻辑
- 获得查询性能力
- 快速聚合
- 便于向立方体提供数据
- 星型模式被广泛用于高效建立OLAP立方体
- 缺点
- 不能保证数据的完整性。一次性地插入或更新操作可能造成数据异常。所以数据装载一般都是以高度受控方式,用批处理或准实时过程执行
- 对于分析需求不够灵活,跟偏重于特定目的建造数据视图,很难进行全面的数据分析
1.4.7 雪花模型
- 一种多维模型中表的逻辑布局
- 也由事实表和维度表组成
- 所谓雪花,就是将星型模式中的维度表进行规范化处理
- 但所有的维度表完成规范化后,就形成了以事实表为中心的雪花型结构,即雪花模式
- 将维度表进行规范化的具体做法是:
- 把低基数的属性从维度表走过来移除形成单独的表
- 基数指的是一个字段中不同值的个数(例如:性别的基数就很低)
- 雪花模式中,一个维度被规范化成多个关联的表,而在星型模式中,每个维度由一个单一的维度表所表示
- 一个规范化的维度对应一组具有层次关系的维度表,而事实表作为雪花模式的子表,存在具有层次关系的多个附表
- 一般维度表设计成一个低于3NF的级别
- 优点
- 一些OLAP多维数据库建模工具专为雪花模型进行了优化
- 规范化的维度属性节省了存储空间
- 缺点
- 增加了查询的连接操作和复杂度
- 雪花模式的表中装载数据,一定要有严格的控制和管理,避免数据的异常插入或更细腻
2、etl
2.1、etl介绍
ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。(数据仓库结构)通俗的说法就是从数据源抽取数据出来,进行清洗加工转换,然后加载到定义好的数据仓库模型中去。目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据。ETL是BI项目重要的一个环节,其设计的好坏影响生成数据的质量,直接关系到BI项目的成败。
2.2、etl的工作内容
- 参与数据仓库ETL流程及解决ETL相关技术问题。
- 海量数据的ETL开发,抽取成各种数据需求。
- 会数据etl开发工具
- 熟悉主流数据库技术,熟练使用sql语言
3、etl工具
-
Congos
- ibm一站式(etl,数据分析,数据可视化,仪表盘)的bi平台
-
使用SQL语句实现etl。
- hivesql
- sparksql
-
用代码实现etl。
- mapreduce
- spark
4、hive
4.1、hive的介绍
Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。
本质是将SQL转换为MapReduce程序。
主要用途:用来做离线数据分析,比直接用MapReduce开发效率更高。
4.2、hive的架构
用户接口:包括 CLI、JDBC/ODBC、WebGUI。其中,CLI(command line interface)为shell命令行;JDBC/ODBC是Hive的JAVA实现,与传统数据库JDBC类似;WebGUI是通过浏览器访问Hive。
元数据存储:通常是存储在关系数据库如 mysql/derby中。Hive 将元数据存储在数据库中。Hive 中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),表的数据所在目录等。
解释器、编译器、优化器、执行器:完成 HQL 查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在 HDFS 中,并在随后有 MapReduce 调用执行。
4.3、Hive 数据模型
Hive中所有的数据都存储在HDFS中,没有专门的数据存储格式
在创建表时指定数据中的分隔符,Hive 就可以映射成功,解析数据。
Hive中包含以下数据模型:
**db:**在hdfs中表现为hive.metastore.warehouse.dir目录下一个文件夹
**table:**在hdfs中表现所属db目录下一个文件夹
**external table:**数据存放位置可以在HDFS任意指定路径
**partition:**在hdfs中表现为table目录下的子目录
**bucket:**在hdfs中表现为同一个表目录下根据hash散列之后的多个文件
4.4、常用操作
4.4.1、数据库相关
Hive配置单元包含一个名为 default 默认的数据库.
- —创建数据库
create database [if not exists] ;
- –显示所有数据库
show databases;
- –删除数据库
drop database if exists [restrict|cascade];
默认情况下,hive不允许删除含有表的数据库,要先将数据库中的表清空才能drop,否则会报错
–加入cascade关键字,可以强制删除一个数据库
hive> drop database if exists users cascade;
- –切换数据库
use ;
4.4.2、内部表外部表
建内部表
create table
student(Sno int,Sname string,Sex string,Sage int,Sdept string)
row format delimited fields terminated by ',';
建外部表
create external table
student_ext(Sno int,Sname string,Sex string,Sage int,Sdept string)
row format delimited fields terminated by ',' location '/stu';
内、外部表加载数据:
load data local inpath '/root/hivedata/students.txt' overwrite into table student;
load data inpath '/stu' into table student_ext;
4.4.3、创建分区表
- 分区建表分为2种,一种是单分区,也就是说在表文件夹目录下只有一级文件夹目录。另外一种是多分区,表文件夹下出现多文件夹嵌套模式。
- 单分区建表语句
create table day_table (id int, content string) partitioned by (dt string);
单分区表,按天分区,在表结构中存在id,content,dt三列。
- 双分区建表语句
create table day_hour_table (id int, content string) partitioned by (dt string, hour string);
双分区表,按天和小时分区,在表结构中新增加了dt和hour两列。
导入数据
load data local inpath '/root/hivedata/dat_table.txt' into table day_table partition(dt='2017-07-07');
load data local inpath '/root/hivedata/dat_table.txt' into table day_hour_table partition(dt='2017-07-07', hour='08');
基于分区的查询:
SELECT day_table.* FROM day_table WHERE day_table.dt = '2017-07-07';
查看分区
show partitions day_hour_table;
总的说来partition就是辅助查询,缩小查询范围,加快数据的检索速度和对数据按照一定的规格和条件进行管理。
-
指定分隔符
—指定分隔符创建分区表
create table day_table (id int, content string) partitioned by (dt string) row format delimited fields terminated by ',';
—复杂类型的数据表指定分隔符
数据如下
zhangsan beijing,shanghai,tianjin,hangzhou wangwu shanghai,chengdu,wuhan,haerbin
建表语句
create table complex_array(name string,work_locations array<string>) row format delimited fields terminated by '\t' collection items terminated by ',';
4.4.4、增删分区
- 增加分区
alter table t_partition add partition (dt='2008-08-08') location 'hdfs://node-21:9000/t_parti/';
执行添加分区 /t_parti文件夹下的数据不会被移动。并且没有分区目录dt=2008-08-08
- 删除分区
alter table t_partition drop partition (dt='2008-08-08');
执行删除分区时/t_parti下的数据会被删除并且连同/t_parti文件夹也会被删除
注意区别于load data时候添加分区:会移动数据 会创建分区目录
4.4.5、hive中的join
准备数据
1,a
2,b
3,c
4,d
7,y
8,u
2,bb
3,cc
7,yy
9,pp
建表:
create table a(id int,name string)
row format delimited fields terminated by ',';
create table b(id int,name string)
row format delimited fields terminated by ',';
导入数据:
load data local inpath '/root/hivedata/a.txt' into table a;
load data local inpath '/root/hivedata/b.txt' into table b;
实验:
** inner join
select * from a inner join b on a.id=b.id;
+-------+---------+-------+---------+--+
| a.id | a.name | b.id | b.name |
+-------+---------+-------+---------+--+
| 2 | b | 2 | bb |
| 3 | c | 3 | cc |
| 7 | y | 7 | yy |
+-------+---------+-------+---------+--+
**left join
select * from a left join b on a.id=b.id;
+-------+---------+-------+---------+--+
| a.id | a.name | b.id | b.name |
+-------+---------+-------+---------+--+
| 1 | a | NULL | NULL |
| 2 | b | 2 | bb |
| 3 | c | 3 | cc |
| 4 | d | NULL | NULL |
| 7 | y | 7 | yy |
| 8 | u | NULL | NULL |
+-------+---------+-------+---------+--+
**right join
select * from a right join b on a.id=b.id;
select * from b right join a on b.id=a.id;
+-------+---------+-------+---------+--+
| a.id | a.name | b.id | b.name |
+-------+---------+-------+---------+--+
| 2 | b | 2 | bb |
| 3 | c | 3 | cc |
| 7 | y | 7 | yy |
| NULL | NULL | 9 | pp |
+-------+---------+-------+---------+--+
**
select * from a full outer join b on a.id=b.id;
+-------+---------+-------+---------+--+
| a.id | a.name | b.id | b.name |
+-------+---------+-------+---------+--+
| 1 | a | NULL | NULL |
| 2 | b | 2 | bb |
| 3 | c | 3 | cc |
| 4 | d | NULL | NULL |
| 7 | y | 7 | yy |
| 8 | u | NULL | NULL |
| NULL | NULL | 9 | pp |
+-------+---------+-------+---------+--+
**hive中的特别join
select * from a left semi join b on a.id = b.id;
select a.* from a inner join b on a.id=b.id;
+-------+---------
| a.id | a.name
+-------+---------
| 2 | b
| 3 | c
| 7 | y
+-------+---------
相当于
select a.id,a.name from a where a.id in (select b.id from b); 在hive中效率极低
select a.id,a.name from a join b on (a.id = b.id);
select * from a inner join b on a.id=b.id;
cross join(##慎用)
返回两个表的笛卡尔积结果,不需要指定关联键。
select a.*,b.* from a cross join b;
4.4.6、json解析
1、先加载rating.json文件到hive的一个原始表 rat_json
样例:{"movie":"1193","rate":"5","timeStamp":"978300760","uid":"1"}
create table rat_json(line string) row format delimited;
load data local inpath '/root/hivedata/rating.json' into table rat_json;
2、需要解析json数据成四个字段,插入一张新的表 t_rating
drop table if exists t_rating;
create table t_rating(movieid string,rate int,timestring string,uid string)
row format delimited fields terminated by '\t';
3、json表数据解析到rating表中
insert overwrite table t_rating
select
get_json_object(line,'$.movie') as moive,
get_json_object(line,'$.rate') as rate,
get_json_object(line,'$.timeStamp') as timestring, get_json_object(line,'$.uid') as uid
from rat_json limit 10;
4.5、常用函数
4.5.1、数值函数
-
指定精度取整函数 : round
语法: round(double a, int d)
返回值: DOUBLE
说明: 返回指定精度d的double类型
举例:
hive> select round(3.1415926,4); 3.1416
-
向下取整函数 : floor
语法: floor(double a)
返回值: BIGINT
说明: 返回等于或者小于该double变量的最大的整数
举例:
hive> select floor(3.1415926); 3 hive> select floor(25) from dual; 25
-
向上取整函数 : ceil
语法: ceil(double a)
返回值: BIGINT
说明: 返回等于或者大于该double变量的最小的整数
举例:
hive> select ceil(3.1415926); 4 hive> select ceil(46); 46
-
取随机数函数 : rand
语法: rand(),rand(int seed)
返回值: double
说明: 返回一个0到1范围内的随机数。如果指定种子seed,则会等到一个稳定的随机数序列
举例:
hive> select rand(); 0.5577432776034763
-
绝对值函数 : abs
语法: abs(double a) abs(int a)
返回值: double int
说明: 返回数值a的绝对值
举例:
hive> select abs(-3.9) from dual; 3.9 hive> select abs(10.9) from dual; 10.9
4.5.2、日期函数
- to_date(string timestamp):返回时间字符串中的日期部分,
- 如to_date(‘1970-01-01 00:00:00’)=‘1970-01-01’
- current_date:返回当前日期
- year(date):返回日期date的年,类型为int
- 如year(‘2019-01-01’)=2019
- month(date):返回日期date的月,类型为int,
- 如month(‘2019-01-01’)=1
- day(date): 返回日期date的天,类型为int,
- 如day(‘2019-01-01’)=1
- weekofyear(date1):返回日期date1位于该年第几周。
- 如weekofyear(‘2019-03-06’)=10
- datediff(date1,date2):返回日期date1与date2相差的天数
- 如datediff(‘2019-03-06’,‘2019-03-05’)=1
- date_add(date1,int1):返回日期date1加上int1的日期
- 如date_add(‘2019-03-06’,1)=‘2019-03-07’
- date_sub(date1,int1):返回日期date1减去int1的日期
- 如date_sub(‘2019-03-06’,1)=‘2019-03-05’
- months_between(date1,date2):返回date1与date2相差月份
- 如months_between(‘2019-03-06’,‘2019-01-01’)=2
- add_months(date1,int1):返回date1加上int1个月的日期,int1可为负数
- 如add_months(‘2019-02-11’,-1)=‘2019-01-11’
- last_day(date1):返回date1所在月份最后一天
- 如last_day(‘2019-02-01’)=‘2019-02-28’
- next_day(date1,day1):返回日期date1的下个星期day1的日期。day1为星期X的英文前两字母
- 如next_day(‘2019-03-06’,‘MO’) 返回’2019-03-11’
- **trunc(date1,string1)😗*返回日期最开始年份或月份。string1可为年(YYYY/YY/YEAR)或月(MONTH/MON/MM)。
- 如trunc(‘2019-03-06’,‘MM’)=‘2019-03-01’,trunc(‘2019-03-06’,‘YYYY’)=‘2019-01-01’
- unix_timestamp():返回当前时间的unix时间戳,可指定日期格式。
- 如unix_timestamp(‘2019-03-06’,‘yyyy-mm-dd’)=1546704180
- from_unixtime():返回unix时间戳的日期,可指定格式。
- 如select from_unixtime(unix_timestamp(‘2019-03-06’,‘yyyy-MM-dd’),‘yyyyMMdd’)=‘20190306’
- 如select from_unixtime(unix_timestamp(‘2019-03-06’,‘yyyy-MM-dd’),‘yyyyMMdd’)=‘20190306’
4.5.3、条件函数
- if(boolean,t1,t2):若布尔值成立,则返回t1,反正返回t2。
- 如if(1>2,100,200)返回200
- case when boolean then t1 else t2 end:若布尔值成立,则t1,否则t2,可加多重判断
- coalesce(v0,v1,v2):返回参数中的第一个非空值,若所有值均为null,则返回null。
- 如coalesce(null,1,2)返回1
- isnull(a):若a为null则返回true,否则返回false
4.5.4、字符串函数
- length(string1):返回字符串长度
- concat(string1,string2):返回拼接string1及string2后的字符串
- concat_ws(sep,string1,string2):返回按指定分隔符拼接的字符串
- lower(string1):返回小写字符串,同lcase(string1)。upper()/ucase():返回大写字符串
- trim(string1):去字符串左右空格,ltrim(string1):去字符串左空格。rtrim(string1):去字符串右空
- repeat(string1,int1):返回重复string1字符串int1次后的字符串
- reverse(string1):返回string1反转后的字符串。
- 如reverse(‘abc’)返回’cba’
- rpad(string1,len1,pad1):以pad1字符右填充string1字符串,至len1长度。
- 如rpad(‘abc’,5,‘1’)返回’abc11’。lpad():左填充
- split(string1,pat1):以pat1正则分隔字符串string1,返回数组。
- 如split(‘a,b,c’,‘,’)返回[“a”,“b”,“c”]
- substr(string1,index1,int1):以index位置起截取int1个字符。
- 如substr(‘abcde’,1,2)返回’ab’
- 如substr(‘abcde’,1,2)返回’ab’
4.5.5、类型转换
Hive的原子数据类型是可以进行隐式转换的,类似于Java的类型转换,例如某表达式使用INT类型,TINYINT会自动转换为INT类型,但是Hive不会进行反向转化,例如,某表达式使用TINYINT类型,INT不会自动转换为TINYINT类型,它会返回错误,除非使用CAST操作。
- cast(value AS TYPE)
- select cast(‘1’ as DOUBLE); 返回1.0
4.5.6、爆炸函数
EXPLODE(col):将hive一列中复杂的array或者map结构拆分成多行。
LATERAL VIEW
用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias
解释:用于和split, explode等UDTF一起使用,它能够将一列数据拆成多行
-
原数据
《疑犯追踪》 悬疑,动作,科幻,剧情 《Lie to me》 悬疑,警匪,动作,心理,剧情 《战狼2》 战争,动作,灾难
-
需求:将电影分类中的数组数据展开。结果如下:
《疑犯追踪》 悬疑 《疑犯追踪》 动作 《疑犯追踪》 科幻 《疑犯追踪》 剧情 《Lie to me》 悬疑 《Lie to me》 警匪 《Lie to me》 动作 《Lie to me》 心理 《Lie to me》 剧情 《战狼2》 战争 《战狼2》 动作 《战狼2》 灾难
-
创建hive表并加载数据
create table movie_info( movie string, category array<string>) row format delimited fields terminated by "\t" collection items terminated by ","; load data local inpath "/export/servers/hivedatas/movie.txt" into table movie_info;
-
实现sql
select movie, category_name from movie_info lateral view explode(category) table_tmp as category_name;
4.5.7、udf / udtf / udaf
-
UDF:用户定义(普通)函数,只对单行数值产生作用;
UDF只能实现一进一出的操作。
定义udf 计算两个数最小值 public class Min extends UDF { public Double evaluate(Double a, Double b) { if (a == null) a = 0.0; if (b == null) b = 0.0; if (a >= b) { return b; } else { return a; } } } a)把程序打成jar包 b)添加jar包:add jar /run/jar/udf_test.jar; c)创建临时函数:hive>CREATE TEMPORARY FUNCTION add_example AS 'hive.udf.Add'; d)销毁临时函数:hive> DROP TEMPORARY FUNCTION add_example;
-
UDTF:User-Defined Table-Generating Functions,用户定义表生成函数
用来解决输入一行输出多行
继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF, 实现initialize, process, close三个方法。 UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。 初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。 最后close()方法调用,对需要清理的方法进行清理
例子:切分”key:value;key:value”字符串,返回结果为key, value两个字段。 import java.util.ArrayList; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; public class ExplodeMap extends GenericUDTF{ @Override public void close() throws HiveException { // TODO Auto-generated method stub } @Override public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if (args.length != 1) { throw new UDFArgumentLengthException("ExplodeMap takes only one argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("ExplodeMap takes string as a parameter"); } ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); fieldNames.add("col1"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col2"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs); } @Override public void process(Object[] args) throws HiveException { String input = args[0].toString(); String[] test = input.split(";"); for(int i=0; i<test.length; i++) { try { String[] result = test[i].split(":"); forward(result); } catch (Exception e) { continue; } } } }
a)把程序打成jar包 b)添加jar包:add jar /run/jar/udf_test.jar; c)创建临时函数:CREATE TEMPORARY FUNCTION explode_map AS 'cn.itcast.hive.udtf.ExplodeMap'; d)销毁临时函数:hive> DROP TEMPORARY FUNCTION add_example;
udtf的使用:
UDTF有两种使用方法,一种直接放到select后面,一种和lateral view一起使用。 create table src(properties String); vi src.txt key1:value1;key2:value2; load data local inpath '/root/hivedata/src.txt' into table src; 1:直接select中使用 select explode_map(properties) as (col1,col2) from src; 不可以添加其他字段使用 select a, explode_map(properties) as (col1,col2) from src; 不可以嵌套调用 select explode_map(explode_map(properties)) from src; 不可以和group by/cluster by/distribute by/sort by一起使用 select explode_map(properties) as (col1,col2) from src group by col1, col2;
2:和lateral view一起使用
select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;
-
UDAF:User- Defined Aggregation Funcation;用户定义聚合函数,可对多行数据产生作用;
等同与SQL中常用的SUM(),AVG(),也是聚合函数;
UDAF实现多进一出
UDAF实现有简单与通用两种方式:
import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; import org.apache.hadoop.io.IntWritable; //UDAF是输入多个数据行,产生一个数据行 //用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类 public class MaxiNumber extends UDAF { public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator { // 最终结果 private IntWritable result; // 负责初始化计算函数并设置它的内部状态,result是存放最终结果的 @Override public void init() { result = null; } // 每次对一个新值进行聚集计算都会调用iterate方法 public boolean iterate(IntWritable value) { if (value == null) return false; if (result == null) result = new IntWritable(value.get()); else result.set(Math.max(result.get(), value.get())); return true; } // Hive需要部分聚集结果的时候会调用该方法 // 会返回一个封装了聚集计算当前状态的对象 public IntWritable terminatePartial() { return result; } // 合并两个部分聚集值会调用这个方法 public boolean merge(IntWritable other) { return iterate(other); } // Hive需要最终聚集结果时候会调用该方法 public IntWritable terminate() { return result; } } }
4.5.8、窗口函数
1、NTILE,ROW_NUMBER,RANK,DENSE_RANK
-
数据准备
cookie1,2018-04-10,1 cookie1,2018-04-11,5 cookie1,2018-04-12,7 cookie1,2018-04-13,3 cookie1,2018-04-14,2 cookie1,2018-04-15,4 cookie1,2018-04-16,4 cookie2,2018-04-10,2 cookie2,2018-04-11,3 cookie2,2018-04-12,5 cookie2,2018-04-13,6 cookie2,2018-04-14,3 cookie2,2018-04-15,9 cookie2,2018-04-16,7 CREATE TABLE itcast_t2 ( cookieid string, createtime string, --day pv INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile; 加载数据: load data local inpath '/root/hivedata/itcast_t2.dat' into table itcast_t2;
-
NTILE
背景:
有时会有这样的需求:如果数据排序后分为三部分,业务人员只关心其中的一部分,如何将这中间的三分之一数据拿出来呢?NTILE函数即可以满足。
ntile可以看成是:把有序的数据集合平均分配到指定的数量(num)个桶中, 将桶号分配给每一行。 如果不能平均分配,则优先分配较小编号的桶,并且各个桶中能放的行数最多相差1。 语法是:ntile (num) over ([partition_clause] order_by_clause) as xxx 然后可以根据桶号,选取前或后 n分之几的数据。 数据会完整展示出来,只是给相应的数据打标签;具体要取几分之几的数据,需要再嵌套一层根据标签取出。 NTILE不支持ROWS BETWEEN,比如 NTILE(2) OVER(PARTITION BY cookieid ORDER BY createtime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)
设置本地模式 set hive.exec.mode.local.auto=true;
SELECT
cookieid,
createtime,
pv,
NTILE(2) OVER(PARTITION BY cookieid ORDER BY createtime) AS rn1,
NTILE(3) OVER(PARTITION BY cookieid ORDER BY createtime) AS rn2,
NTILE(4) OVER(ORDER BY createtime) AS rn3
FROM itcast_t2
ORDER BY cookieid,createtime;比如,统计一个cookie,pv数最多的前1/3的天 ```sql SELECT cookieid, createtime, pv, NTILE(3) OVER(PARTITION BY cookieid ORDER BY pv DESC) AS rn FROM itcast_t2; 其中rn = 1 的记录,就是我们想要的结果
-
ROW_NUMBER
ROW_NUMBER() 从1开始,按照顺序,生成分组内记录的序列
SELECT cookieid, createtime, pv, ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rn FROM itcast_t2;
-
RANK 和 DENSE_RANK
RANK() 生成数据项在分组中的排名,排名相等会在名次中留下空位
DENSE_RANK() 生成数据项在分组中的排名,排名相等会在名次中不会留下空位SELECT cookieid, createtime, pv, RANK() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rn1, DENSE_RANK() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rn2, ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY pv DESC) AS rn3 FROM itcast_t2 WHERE cookieid = 'cookie1';
2、sum_over,AVG_over,MIN_over,MAX_over
-
数据准备
建表语句: create table itcast_t1( cookieid string, createtime string, --day pv int ) row format delimited fields terminated by ','; 加载数据: load data local inpath '/root/hivedata/itcast_t1.dat' into table itcast_t1; cookie1,2018-04-10,1 cookie1,2018-04-11,5 cookie1,2018-04-12,7 cookie1,2018-04-13,3 cookie1,2018-04-14,2 cookie1,2018-04-15,4 cookie1,2018-04-16,4 开启智能本地模式 SET hive.exec.mode.local.auto=true;
-
SUM(结果和ORDER BY相关,默认为升序)
设置本地模式 set hive.exec.mode.local.auto=true; select cookieid,createtime,pv, sum(pv) over(partition by cookieid order by createtime) as pv1 from itcast_t1; select cookieid,createtime,pv, sum(pv) over(partition by cookieid order by createtime rows between unbounded preceding and current row) as pv2 from itcast_t1; select cookieid,createtime,pv, sum(pv) over(partition by cookieid) as pv3 from itcast_t1; --如果每天order by排序语句 默认把分组内的所有数据进行sum操作 select cookieid,createtime,pv, sum(pv) over(partition by cookieid order by createtime rows between 3 preceding and current row) as pv4 from itcast_t1; select cookieid,createtime,pv, sum(pv) over(partition by cookieid order by createtime rows between 3 preceding and 1 following) as pv5 from itcast_t1; select cookieid,createtime,pv, sum(pv) over(partition by cookieid order by createtime rows between current row and unbounded following) as pv6 from itcast_t1;
-
解释
pv1: 分组内从起点到当前行的pv累积,如,11号的pv1=10号的pv+11号的pv, 12号=10号+11号+12号 pv2: 同pv1 pv3: 分组内(cookie1)所有的pv累加 pv4: 分组内当前行+往前3行,如,11号=10号+11号, 12号=10号+11号+12号, 13号=10号+11号+12号+13号, 14号=11号+12号+13号+14号 pv5: 分组内当前行+往前3行+往后1行,如,14号=11号+12号+13号+14号+15号=5+7+3+2+4=21 pv6: 分组内当前行+往后所有行,如,13号=13号+14号+15号+16号=3+2+4+4=13, 14号=14号+15号+16号=2+4+4=10
-
如果不指定rows between,默认为从起点到当前行;
-
如果不指定order by,则将分组内所有值累加;
-
关键是理解rows between含义,也叫做window子句:
- preceding:往前
- following:往后
- current row:当前行
- unbounded:起点
- unbounded preceding 表示从前面的起点
- unbounded following:表示到后面的终点
- AVG,MIN,MAX,和SUM用法一样
select cookieid,createtime,pv,
avg(pv) over(partition by cookieid order by createtime rows between unbounded preceding and current row) as pv2
from itcast_t1;
select cookieid,createtime,pv,
max(pv) over(partition by cookieid order by createtime rows between unbounded preceding and current row) as pv2
from itcast_t1;
select cookieid,createtime,pv,
min(pv) over(partition by cookieid order by createtime rows between unbounded preceding and current row) as pv2
from itcast_t1;
select cookieid,createtime,pv,
count(pv) over(partition by cookieid order by createtime rows between unbounded preceding and current row) as pv2
from itcast_t1;
3、LAG,LEAD,FIRST_VALUE,LAST_VALUE
-
cookie1,2018-04-10 10:00:02,url2 cookie1,2018-04-10 10:00:00,url1 cookie1,2018-04-10 10:03:04,1url3 cookie1,2018-04-10 10:50:05,url6 cookie1,2018-04-10 11:00:00,url7 cookie1,2018-04-10 10:10:00,url4 cookie1,2018-04-10 10:50:01,url5 cookie2,2018-04-10 10:00:02,url22 cookie2,2018-04-10 10:00:00,url11 cookie2,2018-04-10 10:03:04,1url33 cookie2,2018-04-10 10:50:05,url66 cookie2,2018-04-10 11:00:00,url77 cookie2,2018-04-10 10:10:00,url44 cookie2,2018-04-10 10:50:01,url55 CREATE TABLE itcast_t4 ( cookieid string, createtime string, --页面访问时间 url STRING --被访问页面 ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile; 加载数据: load data local inpath '/root/hivedata/itcast_t4.dat' into table itcast_t4;
-
LAG
LAG(col,n,DEFAULT) 用于统计窗口内往上第n行值
第一个参数为列名,第二个参数为往上第n行(可选,默认为1),第三个参数为默认值(当往上第n行为NULL时候,取默认值,如不指定,则为NULL)SELECT cookieid, createtime, url, ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn, LAG(createtime,1,'1970-01-01 00:00:00') OVER(PARTITION BY cookieid ORDER BY createtime) AS last_1_time, LAG(createtime,2) OVER(PARTITION BY cookieid ORDER BY createtime) AS last_2_time FROM itcast_t4;
last_1_time: 指定了往上第1行的值,default为’1970-01-01 00:00:00’
cookie1第一行,往上1行为NULL,因此取默认值 1970-01-01 00:00:00
cookie1第三行,往上1行值为第二行值,2015-04-10 10:00:02
cookie1第六行,往上1行值为第五行值,2015-04-10 10:50:01
last_2_time: 指定了往上第2行的值,为指定默认值
cookie1第一行,往上2行为NULL
cookie1第二行,往上2行为NULL
cookie1第四行,往上2行为第二行值,2015-04-10 10:00:02
cookie1第七行,往上2行为第五行值,2015-04-10 10:50:01 -
LEAD
与LAG相反
LEAD(col,n,DEFAULT) 用于统计窗口内往下第n行值
第一个参数为列名,第二个参数为往下第n行(可选,默认为1),第三个参数为默认值(当往下第n行为NULL时候,取默认值,如不指定,则为NULL)SELECT cookieid, createtime, url, ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn, LEAD(createtime,1,'1970-01-01 00:00:00') OVER(PARTITION BY cookieid ORDER BY createtime) AS next_1_time, LEAD(createtime,2) OVER(PARTITION BY cookieid ORDER BY createtime) AS next_2_time FROM itcast_t4;
-
FIRST_VALUE
取分组内排序后,截止到当前行,第一个值
SELECT cookieid, createtime, url, ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn, FIRST_VALUE(url) OVER(PARTITION BY cookieid ORDER BY createtime) AS first1 FROM itcast_t4;
-
LAST_VALUE
取分组内排序后,截止到当前行,最后一个值
SELECT cookieid, createtime, url, ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn, LAST_VALUE(url) OVER(PARTITION BY cookieid ORDER BY createtime) AS last1 FROM itcast_t4;
如果想要取分组内排序后最后一个值,则需要变通一下:
SELECT cookieid, createtime, url, ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn, LAST_VALUE(url) OVER(PARTITION BY cookieid ORDER BY createtime) AS last1, FIRST_VALUE(url) OVER(PARTITION BY cookieid ORDER BY createtime DESC) AS last2 FROM itcast_t4 ORDER BY cookieid,createtime;
特别注意order by
如果不指定ORDER BY,则进行排序混乱,会出现错误的结果
SELECT cookieid,
createtime,
url,
FIRST_VALUE(url) OVER(PARTITION BY cookieid) AS first2
FROM itcast_t4;
4、GROUPING SETS,GROUPING__ID,CUBE,ROLLUP
这几个分析函数通常用于OLAP中。
-
数据准备
2018-03,2018-03-10,cookie1 2018-03,2018-03-10,cookie5 2018-03,2018-03-12,cookie7 2018-04,2018-04-12,cookie3 2018-04,2018-04-13,cookie2 2018-04,2018-04-13,cookie4 2018-04,2018-04-16,cookie4 2018-03,2018-03-10,cookie2 2018-03,2018-03-10,cookie3 2018-04,2018-04-12,cookie5 2018-04,2018-04-13,cookie6 2018-04,2018-04-15,cookie3 2018-04,2018-04-15,cookie2 2018-04,2018-04-16,cookie1 CREATE TABLE itcast_t5 ( month STRING, day STRING, cookieid STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile; 加载数据: load data local inpath '/root/hivedata/itcast_t5.dat' into table itcast_t5;
-
GROUPING SETS
grouping sets是一种将多个group by 逻辑写在一个sql语句中的便利写法。
等价于将不同维度的GROUP BY结果集进行UNION ALL。
GROUPING__ID,表示结果属于哪一个分组集合。
SELECT month, day, COUNT(DISTINCT cookieid) AS uv, GROUPING__ID FROM itcast_t5 GROUP BY month,day GROUPING SETS (month,day) ORDER BY GROUPING__ID; grouping_id表示这一组结果属于哪个分组集合, 根据grouping sets中的分组条件month,day,1是代表month,2是代表day 等价于 SELECT month,NULL,COUNT(DISTINCT cookieid) AS uv,1 AS GROUPING__ID FROM itcast_t5 GROUP BY month UNION ALL SELECT NULL as month,day,COUNT(DISTINCT cookieid) AS uv,2 AS GROUPING__ID FROM itcast_t5 GROUP BY day;
再如:
SELECT month, day, COUNT(DISTINCT cookieid) AS uv, GROUPING__ID FROM itcast_t5 GROUP BY month,day GROUPING SETS (month,day,(month,day)) ORDER BY GROUPING__ID; 等价于 SELECT month,NULL,COUNT(DISTINCT cookieid) AS uv,1 AS GROUPING__ID FROM itcast_t5 GROUP BY month UNION ALL SELECT NULL,day,COUNT(DISTINCT cookieid) AS uv,2 AS GROUPING__ID FROM itcast_t5 GROUP BY day UNION ALL SELECT month,day,COUNT(DISTINCT cookieid) AS uv,3 AS GROUPING__ID FROM itcast_t5 GROUP BY month,day;
-
CUBE(立方体 数据立方体 多维数据分析)
举个栗子:某个事情有A、B、C三个维度,根据这三个维度进行组合分析,共有多少种情况?
这些情况加起来就是所谓多维分析中数据立方体。
没有维度:[] 一个维度:[A] [B] [C] 两个维度:[AB] [AC] [BC] 三个维度:[ABC] 共有8个结果。 规律:假如有n个维度 所有的维度组合情况是2的n次方
根据GROUP BY的维度的所有组合进行聚合。
SELECT month, day, COUNT(DISTINCT cookieid) AS uv, GROUPING__ID FROM itcast_t5 GROUP BY month,day WITH CUBE ORDER BY GROUPING__ID; 等价于 SELECT NULL,NULL,COUNT(DISTINCT cookieid) AS uv,0 AS GROUPING__ID FROM itcast_t5 UNION ALL SELECT month,NULL,COUNT(DISTINCT cookieid) AS uv,1 AS GROUPING__ID FROM itcast_t5 GROUP BY month UNION ALL SELECT NULL,day,COUNT(DISTINCT cookieid) AS uv,2 AS GROUPING__ID FROM itcast_t5 GROUP BY day UNION ALL SELECT month,day,COUNT(DISTINCT cookieid) AS uv,3 AS GROUPING__ID FROM itcast_t5 GROUP BY month,day;
-
ROLLUP
是CUBE的子集,以最左侧的维度为主,从该维度进行层级聚合。
比如,以month维度进行层级聚合: SELECT month, day, COUNT(DISTINCT cookieid) AS uv, GROUPING__ID FROM itcast_t5 GROUP BY month,day WITH ROLLUP ORDER BY GROUPING__ID; --把month和day调换顺序,则以day维度进行层级聚合: SELECT day, month, COUNT(DISTINCT cookieid) AS uv, GROUPING__ID FROM itcast_t5 GROUP BY day,month WITH ROLLUP ORDER BY GROUPING__ID; (这里,根据天和月进行聚合,和根据天聚合结果一样,因为有父子关系,如果是其他维度组合的话,就会不一样)
4.6、hive常用的优化
4.6.1、 Fetch抓取(Hive可以避免进行MapReduce)
Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。
在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。
案例实操:
1)把hive.fetch.task.conversion设置成none,然后执行查询语句,都会执行mapreduce程序。
hive (default)> set hive.fetch.task.conversion=none;
hive (default)> select * from score;
hive (default)> select s_score from score;
hive (default)> select s_score from score limit 3;
2)把hive.fetch.task.conversion设置成more,然后执行查询语句,如下查询方式都不会执行mapreduce程序。
hive (default)> set hive.fetch.task.conversion=more;
hive (default)> select * from score;
hive (default)> select s_score from score;
hive (default)> select s_score from score limit 3;
4.6.2、本地模式
大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务时消耗可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
用户可以通过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。
set hive.exec.mode.local.auto=true; //开启本地mr
//设置local mr的最大输入数据量,当输入数据量小于这个值时采用local mr的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=51234560;
//设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;
案例实操:
1)开启本地模式,并执行查询语句
hive (default)> set hive.exec.mode.local.auto=true;
hive (default)> select * from score cluster by s_id;
18 rows selected (1.568 seconds)
2)关闭本地模式,并执行查询语句
hive (default)> set hive.exec.mode.local.auto=false;
hive (default)> select * from score cluster by s_id;
18 rows selected (11.865 seconds)
4.6.3、分区表分桶表
- 分区表对sql过滤查询是一种优化
- 分桶表对join操作时提升性能很大,桶为表加上了额外的结构,Hive 在处理有些查询时能利用这个结构。具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用 Map 端连接 (Map-side join)高效的实现。比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量。
4.6.4、join优化
4.6.4.1、小表Join大表
- (新的版本当中已经没有区别了,旧的版本当中需要使用小表)
1)将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率;再进一步,可以使用Group让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。
2)多个表关联时,最好分拆成小段,避免大sql(无法控制中间Job)
3)大表Join大表
(1)空KEY过滤
有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据是异常数据,我们需要在SQL语句中进行过滤。例如key对应的字段为空。
对比如下:
不过滤
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;
结果:
No rows affected (152.135 seconds)
过滤
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;
结果:
No rows affected (141.585 seconds)
4.6.4.2、mapjoin
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。
1)开启MapJoin参数设置:
(1)设置自动选择Mapjoin
set hive.auto.convert.join = true; 默认为true
(2)大表小表的阈值设置(默认25M以下认为是小表):
set hive.mapjoin.smalltable.filesize=25123456;
4.6.5、group by
默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。
并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。
1)开启Map端聚合参数设置
(1)是否在Map端进行聚合,默认为True
set hive.map.aggr = true;
(2)在Map端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
(3)有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true;
当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。
4.6.6、Map数
-
通常情况下,作业会通过input的目录产生一个或者多个map任务。
主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M,可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);
-
举例:
a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数。
b) 假设input目录下有3个文件a,b,c大小分别为10m,20m,150m,那么hadoop会分隔成4个块(10m,20m,128m,22m),从而产生4个map数。即,如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块。
-
是不是map数越多越好?
答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。
-
是不是保证每个map处理接近128m的文件块,就高枕无忧了?
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
针对上面的问题3和4,我们需要采取两种方式来解决:即减少map数和增加map数;
-
如何增加map数
如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,这样就可以用多个map任务去完成。
set mapreduce.job.reduces =10; create table a_1 as select * from a distribute by rand(123);
这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。
4.6.7、reduce数
- 调整reduce个数方法一
(1)每个Reduce处理的数据量默认是256MB
hive.exec.reducers.bytes.per.reducer=256123456
(2)每个任务最大的reduce数,默认为1009
hive.exec.reducers.max=1009
-
调整reduce个数方法二
在hadoop的mapred-default.xml文件中修改
设置每个job的Reduce个数
set mapreduce.job.reduces = 15;
-
reduce个数并不是越多越好
1)过多的启动和初始化reduce也会消耗时间和资源;
2)另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的reduce数;使单个reduce任务处理数据量大小要合适;
4.6.8、jvm重用
JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。
JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。
<property>
<name>mapreduce.job.jvm.numtasks</name>
<value>10</value>
<description>How many tasks to run per jvm. If set to -1, there is
no limit.
</description>
</property>
我们也可以在hive当中通过
set mapred.job.reuse.jvm.num.tasks=10;
这个设置来设置我们的jvm重用
缺点:
开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。
4.6.9、数据压缩与存储格式
1.压缩方式
压缩可以节约磁盘的空间,基于文本的压缩率可达40%+; 压缩可以增加吞吐量和性能量(减小载入内存的数据量),但是在压缩和解压过程中会增加CPU的开销。所以针对IO密集型的jobs(非计算密集型)可以使用压缩的方式提高性能。 几种压缩算法:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FBlZDWq6-1650717486838)(…/大数据开发/assert/1582183253358.png)]
2.存储格式
-
TextFile
Hive数据表的默认格式,存储方式:行存储。 可以使用Gzip压缩算法,但压缩后的文件不支持split 在反序列化过程中,必须逐个字符判断是不是分隔符和行结束符,因此反序列化开销会比SequenceFile高几十倍。
-
Sequence Files
Hadoop中有些原生压缩文件的缺点之一就是不支持分割。支持分割的文件可以并行 的有多个mapper程序处理大数据文件,大多数文件不支持可分割是因为这些文件只能从头开始读。Sequence File是可分割的文件格式,支持Hadoop的block级压缩。 Hadoop API提供的一种二进制文件,以key-value的形式序列化到文件中。存储方式:行存储。 sequencefile支持三种压缩选择:NONE,RECORD,BLOCK。Record压缩率低,RECORD是默认选项,通常BLOCK会带来较RECORD更好的压缩性能。 优势是文件和hadoop api中的MapFile是相互兼容的
-
RCFile
存储方式:数据按行分块,每块按列存储。结合了行存储和列存储的优点: 首先,RCFile 保证同一行的数据位于同一节点,因此元组重构的开销很低 其次,像列存储一样,RCFile 能够利用列维度的数据压缩,并且能跳过不必要的列读取 数据追加:RCFile不支持任意方式的数据写操作,仅提供一种追加接口,这是因为底层的 HDFS当前仅仅支持数据追加写文件尾部。 行组大小:行组变大有助于提高数据压缩的效率,但是可能会损害数据的读取性能,因为这样增加了 Lazy 解压性能的消耗。而且行组变大会占用更多的内存,这会影响并发执行的其他MR作业。
-
ORCFile
存储方式:数据按行分块,每块按照列存储。 压缩快,快速列存取。效率比rcfile高,是rcfile的改良版本。
-
Parquet
Parquet也是一种行式存储,同时具有很好的压缩性能;同时可以减少大量的表扫描和反序列化的时间
-
自定义格式
可以自定义文件格式,用户可通过实现InputFormat和OutputFormat来自定义输入输出格式。
结论,一般选择orcfile/parquet + snappy 的方式
create table tablename (
xxx,string
xxx, bigint
)
ROW FORMAT DELTMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties("orc.compress" = "SNAPPY")
4.6.10、并行执行
- 当一个sql中有多个job时候,且这多个job之间没有依赖,则可以让顺序执行变为并行执行(一般为用到union all )
// 开启任务并行执行
set hive.exec.parallel=true;
// 同一个sql允许并行任务的最大线程数
set hive.exec.parallel.thread.number=8;
4.6.11、合并小文件
小文件的产生有三个地方,map输入,map输出,reduce输出,小文件过多也会影响hive的分析效率:
设置map输入的小文件合并
set mapred.max.split.size=256000000;
//一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;
//一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;
//执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
设置map输出和reduce输出进行合并的相关参数:
//设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true
//设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true
//设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000
//当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。
set hive.merge.smallfiles.avgsize=16000000
4.7、hive的数据倾斜
表现:任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。
原因:某个reduce的数据输入量远远大于其他reduce数据的输入量
1)、key分布不均匀
2)、业务数据本身的特性
3)、建表时考虑不周
4)、某些SQL语句本身就有数据倾斜
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mO26KHJ5-1650717486839)(…/大数据开发/assert/1582184703426.png)]