Hive基本概念
-
什么是 Hive
Hive:由 Facebook 开源用于解决海量结构化日志的数据统计。
Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并提供类 SQL 查询功能。基于一个统一的查询分析层,通过SQL语句的方式对HDFS上的数据进行查询、统计和分析。
Hive本质是将 HQL 转化成 MapReduce 程序
Hive的表本质就是Hadoop的目录/文件
Hive默认表存放路径一般都是在你工作目录的hive目录里面,按表名做文件夹分开,如果你有分区表的话,分区值是子文件夹,可以直接在其它的M/R job里直接应用这部分数据 -
引入原因:
– 对存在HDFS上的文件或HBase中的表进行查询时,是要手工写一堆MapReduce代码
– 对于统计任务,只能由懂MapReduce的程序员才能搞定
– 耗时耗力,更多精力没有有效的释放出来
Hive是一个SQL解析引擎,将SQL语句转译成MR Job,然后再Hadoop平台上运行,达到快速开发的目的。
Hive中的表是纯逻辑表,就只是表的定义等,即表的元数据。本质就是Hadoop的目录/文件,达到了元数据与数据存储分离的目的
Hive本身不存储数据,它完全依赖HDFS和MapReduce。
Hive的内容是读多写少,不支持对数据的改写和删除
Hive中没有定义专门的数据格式,由用户指定,需要指定三个属性:
– 列分隔符
– 行分隔符
– 读取文件数据的方法 -
与传统关系数据特点比较
hive和关系数据库存储文件的系统不同,hive使用的是hadoop的HDFS(hadoop的分布式文件系统),关系数据库则是服务器本地的文件系统;
• hive使用的计算模型是mapreduce,而关系数据库则是自己设计的计算模型;
• 关系数据库都是为实时查询的业务进行设计的,而hive则是为海量数据做数据挖掘设计的,实时性很差
• Hive很容易扩展自己的存储能力和计算能力,这个是继承hadoop的,而关系数据库在这个方面要比数据库差很多。 -
Hive 通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的 Driver,结合元数据(MetaStore),将这些指令翻译成 MapReduce,提交到 Hadoop 中执行,最后,将执行返回的结果输出到用户交互接口。
Hive 体系架构
- 1.用户接口:Client
CLI(hive shell)、JDBC/ODBC(java 访问 hive)、WEBUI(浏览器访问 hive) - 2.元数据:Metastore
元数据包括:表名、表所属的数据库(默认是 default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;默认存储在自带的 derby 数据库中,推荐使用 MySQL 存储 Metastore - 3.Hadoop
使用 HDFS 进行存储,使用 MapReduce 进行计算。 - 4.驱动器:Driver
(1)解析器(SQL Parser):将 SQL 字符串转换成抽象语法树 AST,这一步一般都用第三方工具库完成,比如 antlr;对 AST 进行语法分析,比如表是否存在、字段是否存在、SQL 语义是否有误。
(2)编译器(Physical Plan):将 AST 编译生成逻辑执行计划。
(3)优化器(Query Optimizer):对逻辑执行计划进行优化。
(4)执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。对于 Hive 来说,就是 MR/Spark。 - Hive 的优缺点
- 优点
操作接口采用类 SQL 语法,提供快速开发的能力(简单、容易上手)。
避免了去写 MapReduce,减少开发人员的学习成本。
Hive 的执行延迟比较高,因此 Hive 常用于数据分析,对实时性要求不高的场合。
Hive 优势在于处理大数据,对于处理小数据没有优势,因为 Hive 的执行延迟比较高。
Hive 支持用户自定义函数,用户可以根据自己的需求来实现自己的函数。 - 缺点
Hive 的 HQL 表达能力有限
(1)迭代式算法无法表达
(2)数据挖掘方面不擅长
Hive 的效率比较低
(1)Hive 自动生成的 MapReduce 作业,通常情况下不够智能化
(2)Hive 调优比较困难,粒度较粗
Hive参数配置方式
- 参数的配置三种方式
(1)配置文件方式
默认配置文件:hive-default.xml
用户自定义配置文件:hive-site.xml
注意:用户自定义配置会覆盖默认配置。另外,Hive 也会读入 Hadoop 的配置,因为 Hive 是作为 Hadoop 的客户端启动的,Hive 的配置会覆盖 Hadoop 的配置。配置文件的设定对本机启动的所有 Hive 进程都有效。
(2)命令行参数方式
启动 Hive 时,可以在命令行添加-hiveconf param=value 来设定参数。
例如:
[atguigu@hadoop103 hive]$ bin/hive -hiveconf mapred.reduce.tasks=10;
注意:仅对本次 hive 启动有效
查看参数设置:
hive (default)> set mapred.reduce.tasks;
(3)参数声明方式
可以在 HQL 中使用 SET 关键字设定参数
例如:
hive (default)> set mapred.reduce.tasks=100;
注意:仅对本次 hive 启动有效。
查看参数设置
hive (default)> set mapred.reduce.tasks;
- 上述三种设定方式的优先级依次递增。即配置文件<命令行参数<参数声明。注意某些系统级的参数,例如 log4j 相关的设定,必须用前两种方式设定,因为那些参数的读取在会话建立以前已经完成了。
Hive 数据类型
- 基本数据类型
- 集合数据类型
- 类型转化
Hive 的原子数据类型是可以进行隐式转换的,类似于 Java 的类型转换,例如某表达式使用 INT 类型,TINYINT 会自动转换为 INT 类型,但是 Hive 不会进行反向转化,例如,某表达式使用 TINYINT 类型,INT 不会自动转换为 TINYINT 类型,它会返回错误,除非使用 CAST 操作。
1.隐式类型转换规则如下
(1)任何整数类型都可以隐式地转换为一个范围更广的类型,如 TINYINT 可以转换成 INT,INT 可以转换成 BIGINT。
(2)所有整数类型、FLOAT 和 STRING 类型都可以隐式地转换成 DOUBLE。
(3)TINYINT、SMALLINT、INT 都可以转换为 FLOAT。
(4)BOOLEAN 类型不可以转换为任何其它的类型。
2.可以使用 CAST 操作显示进行数据类型转换
例如 CAST(‘1’ AS INT)将把字符串’1’ 转换成整数 1;如果强制类型转换失败,如执行CAST(‘X’ AS INT),表达式返回空值 NULL。
DDL 数据定义
- 创建数据库
- 查询数据库
- 显示数据库
- 查看数据库详情
- 切换当前数据库
- 修改数据库
用户可以使用 ALTER DATABASE 命令为某个数据库的 DBPROPERTIES 设置键-值对属性值,来描述这个数据库的属性信息。数据库的其他元数据信息都是不可更改的,包括数据库名和数据库所在的目录位置。
- 删除数据库
- 创建表
1.建表语法
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], …)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], …)]
[CLUSTERED BY (col_name, col_name, …)
[SORTED BY (col_name [ASC|DESC], …)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
2.字段解释说明
(1)CREATE TABLE 创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用 IF NOT EXISTS 选项来忽略这个异常。
(2)EXTERNAL 关键字可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径(LOCATION),Hive 创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。在删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。
(3)COMMENT:为表和列添加注释。
(4)PARTITIONED BY 创建分区表
(5)CLUSTERED BY 创建分桶表
(6)SORTED BY 不常用
(7)ROW FORMAT
DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS
TERMINATED BY char]
[MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
| SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value,property_name=property_value, …)]
用户在建表的时候可以自定义 SerDe 或者使用自带的 SerDe。如果没有指定 ROWFORMAT 或者 ROW FORMAT DELIMITED,将会使用自带的 SerDe。在建表的时候,用户还需要为表指定列,用户在指定表的列的同时也会指定自定义的 SerDe,Hive 通过 SerDe确定表的具体的列的数据。SerDe 是 Serialize/Deserilize 的简称,目的是用于序列化和反序列化。
(8)STORED AS 指定存储文件类型
常用的存储文件类型:SEQUENCEFILE(二进制序列文件)、TEXTFILE(文本)、RCFILE(列式存储格式文件)如果文件数据是纯文本,可以使用 STORED AS TEXTFILE。如果数据需要压缩,使用 STORED AS SEQUENCEFILE。
(9)LOCATION :指定表在 HDFS 上的存储位置。
(10)LIKE 允许用户复制现有的表结构,但是不复制数据。
更新列
ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment] [FIRST|AFTER column_name];
增加和替换列
ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], …);
注:ADD 是代表新增一字段,字段位置在所有列后面(partition 列前),REPLACE 则是表示替换表中所有字段。 - 删除表
hive (default)> drop table dept_partition;
管理表与外部表
-
管理表
默认创建的表都是所谓的管理表,有时也被称为内部表。因为这种表,Hive 会(或多或少地)控制着数据的生命周期。Hive 默认情况下会将这些表的数据存储在由配置项hive.metastore.warehouse.dir(例如,/user/hive/warehouse)所定义的目录的子目录下。当我们删除一个管理表时,Hive 也会删除这个表中数据。管理表不适合和其他工具共享数据。 -
外部表
因为表是外部表,所以 Hive 并非认为其完全拥有这份数据。删除该表并不会删除掉这份数据,不过描述表的元数据信息会被删除掉。
Hive的create创建表的时候,选择的创建方式:
– create table
– create external table
特点:
– 在导入数据到外部表,数据并没有移动到自己的数据仓库目录下,也就是说外部表中的数据并不是由它自己来管理的而内部表则不一样;
– 在删除表的时候,Hive将会把属于表的元数据和数据全部删掉;而删除外部表的时候,Hive仅仅删除外部表的元数据,数据是不会删除的! -
管理表与外部表的互相转换
分区表
-
分区表实际上就是对应一个 HDFS 文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。Hive 中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。在查询时通过 WHERE 子句中的表达式选择查询所需要的指定的分区,这样的查询效率会提高很多。
• 在 Hive 中,表中的一个 Partition 对应于表下的一个目录,所有的 Partition 的数据都存储在对应的目录中
– 例如:pvs 表中包含 ds 和 city 两个 Partition,则
– 对应于 ds = 20090801, ctry = US 的 HDFS 子目录为:/wh/pvs/ds=20090801/ctry=US;
– 对应于 ds = 20090801, ctry = CA 的 HDFS 子目录为;/wh/pvs/ds=20090801/ctry=CA
• partition是辅助查询,缩小查询范围,加快数据的检索速度和对数据按照一定的规格和条件进行管理。 -
创建分区表语法
hive (default)> create table dept_partition(
deptno int, dname string, loc string
)
partitioned by (month string)
row format delimited fields terminated by ‘\t’; -
加载数据到分区表中
hive (default)> load data local inpath
‘/opt/module/datas/dept.txt’ into table default.dept_partition
partition(month=‘201709’);
hive (default)> load data
local inpath
‘/opt/module/datas/dept.txt’ into table default.dept_partition
partition(month=‘201708’);
hive (default)> load data local inpath
‘/opt/module/datas/dept.txt’ into table default.dept_partition
partition(month='201707’); -
查询分区表中数据
-
单分区查询
hive (default)> select * from dept_partition where month=‘201709’;
多分区联合查询
hive (default)> select * from dept_partition where month=‘201709’
union
select * from dept_partition where month=‘201708’
union
select * from dept_partition where month=‘201707’; -
增加分区
创建单个分区
hive (default)> alter table dept_partition add partition(month=‘201706’) ;
同时创建多个分区
hive (default)> alter table dept_partition add partition(month=‘201705’) partition(month=‘201704’); -
删除分区
删除单个分区
hive (default)> alter table dept_partition drop partition (month=‘201704’);
同时删除多个分区
hive (default)> alter table dept_partition drop partition (month=‘201705’), partition (month=‘201706’); -
查看分区表有多少分区
hive> show partitions dept_partition; -
查看分区表结构
hive> desc formatted dept_partition;
# Partition Information
# col_name data_type comment
-
创建二级分区表
hive (default)> create table dept_partition2(
deptno int, dname string, loc string
)partitioned by (month string, day string)
row format delimited fields terminated by ‘\t’; -
正常加载数据
(1)加载数据到二级分区表中
hive (default)>
load data local inpath ‘/opt/module/datas/dept.txt’ into table default.dept_partition2 partition(month=‘201709’, day=‘13’);
(2)查询分区数据
hive (default)> select * from dept_partition2 where month=‘201709’ and day=‘13’; -
把数据直接上传到分区目录上,让分区表和数据产生关联的三种方式
(1)方式一:上传数据后修复
上传数据
hive (default)> dfs -mkdir -p /user/hive/warehouse/dept_partition2/month=201709/day=12;
hive (default)> dfs -put /opt/module/datas/dept.txt /user/hive/warehouse/dept_partition2/month=201709/day=12;
查询数据(查询不到刚上传的数据)
hive (default)> select * from dept_partition2 where month=‘201709’ and day=‘12’;
执行修复命令
hive> msck repair table dept_partition2;
再次查询数据
hive (default)> select * from dept_partition2 where month=‘201709’ and day=‘12’;
(2)方式二:上传数据后添加分区
上传数据
hive (default)> dfs -mkdir -p /user/hive/warehouse/dept_partition2/month=201709/day=11;
hive (default)> dfs -put /opt/module/datas/dept.txt /user/hive/warehouse/dept_partition2/month=201709/day=11;
执行添加分区
hive (default)> alter table dept_partition2 add partition(month=‘201709’, day=‘11’);
查询数据
hive (default)> select * from dept_partition2 where
month=‘201709’ and day=‘11’;
(3)方式三:创建文件夹后 load 数据到分区
创建目录
hive (default)> dfs -mkdir -p /user/hive/warehouse/dept_partition2/month=201709/day=10;
上传数据
hive (default)> load data local inpath ‘/opt/module/datas/dept.txt’ into table dept_partition2 partition(month=‘201709’,day=‘10’);
查询数据
hive (default)> select * from dept_partition2 where month=‘201709’ and day=‘10’;
DML数据操作
- 数据导入
- 向表中装载数据(Load)
- 语法
hive> load data [local] inpath ‘/opt/module/datas/student.txt’ [overwrite] | into table student
[partition (partcol1=val1,…)];
(1)load data:表示加载数据
(2)local:表示从本地加载数据到 hive 表;否则从 HDFS 加载数据到 hive 表
(3)inpath:表示加载数据的路径
(4)overwrite:表示覆盖表中已有数据,否则表示追加
(5)into table:表示加载到哪张表
(6)student:表示具体的表
(7)partition:表示上传到指定分区
- 实操案例
-
创建一张表
hive (default)> create table student(id string, name string) row format delimited fields terminated by ‘\t’; -
加载本地文件到 hive
hive (default)> load data local inpath ‘/opt/module/datas/student.txt’ into table default.student; -
加载 HDFS 文件到 hive 中
上传文件到 HDFS
hive (default)> dfs -put /opt/module/datas/student.txt /user/atguigu/hive;
加载 HDFS 上数据
hive (default)> load data inpath ‘/user/atguigu/hive/student.txt’ into table default.student; -
加载数据覆盖表中已有的数据
上传文件到 HDFS
hive (default)> dfs -put /opt/module/datas/student.txt /user/atguigu/hive;
加载数据覆盖表中已有的数据
hive (default)> load data inpath ‘/user/atguigu/hive/student.txt’ overwrite into table
default.student; -
通过查询语句向表中插入数据(Insert)
1.创建一张分区表
hive (default)> create table student(id int, name string) partitioned by (month string) row format delimited fields terminated by ‘\t’;
2.基本插入数据
hive (default)> insert into table student partition(month=‘201709’) values(1,‘wangwu’);
3.基本模式插入(根据单张表查询结果)
hive (default)> insert overwrite table student partition(month=‘201708’) select id, name from student where month=‘201709’;
4.多插入模式(根据多张表查询结果)
hive (default)> from student
insert overwrite table student partition(month=‘201707’) select id, name where month=‘201709’ insert overwrite table student partition(month=‘201706’) select id, name where month=‘201709’; -
查询语句中创建表并加载数据(As Select)
create table if not exists student3 as select id, name from student; -
创建表时通过 Location 指定加载数据路径
1.创建表,并指定在 hdfs 上的位置
hive (default)> create table if not exists student5(
id int, name string
)
row format delimited fields terminated by ‘\t’
location ‘/user/hive/warehouse/student5’;
2.上传数据到 hdfs 上
hive (default)> dfs -put /opt/module/datas/student.txt /user/hive/warehouse/student5;
3.查询数据
hive (default)> select * from student5; -
Import 数据到指定 Hive 表中
注意:先用 export 导出后,再将数据导入。
hive (default)> import table student2 partition(month=‘201709’) from ‘/user/hive/warehouse/export/student’;
- 数据导出
-
Insert 导出
1.将查询的结果导出到本地
hive (default)> insert overwrite local directory ‘/opt/module/datas/export/student’
select * from student;
2.将查询的结果格式化导出到本地
hive(default)>insert overwrite local directory ‘/opt/module/datas/export/student1’
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’ select * from student;
3.将查询的结果导出到 HDFS 上(没有 local)
hive (default)> insert overwrite directory ‘/user/atguigu/student2’ ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’ select * from student; -
Hadoop 命令导出到本地
hive (default)> dfs -get /user/hive/warehouse/student/month=201709/000000_0 /opt/module/datas/export/student3.txt; -
Hive Shell 命令导出
基本语法:(hive -f/-e 执行语句或者脚本 > file)
[atguigu@hadoop102 hive]$ bin/hive -e ‘select * from default.student;’ >
/opt/module/datas/export/student4.txt; -
Export 导出到 HDFS 上
hive (default)> export table default.student to ‘/user/hive/warehouse/export/student’; -
Sqoop 导出
- 清除表中数据(Truncate)
注意:Truncate 只能删除管理表,不能删除外部表中数据
hive (default)> truncate table student;
排序
- Order By:全局排序,一个 Reducer。
- Sort By:每个 Reducer 内部进行排序,对全局结果集来说不是排序。
- Distribute By:类似 MR 中 partition,进行分区,结合 sort by 使用。注意,Hive 要求 DISTRIBUTE BY 语句要写在 SORT BY 语句之前。
- Cluster by:当 distribute by 和 sorts by 字段相同时,可以使用 cluster by 方式。cluster by 除了具有 distribute by 的功能外还兼具 sort by 的功能。但是排序只能是升序排序,不能指定排序规则为 ASC 或者 DESC。
分桶
-
分区针对的是数据的存储路径;分桶针对的是数据文件。
分区提供一个隔离数据和优化查询的便利方式。不过,并非所有的数据集都可形成合理的分区,特别是之前所提到过的要确定合适的划分大小这个疑虑。
分桶是将数据集分解成更容易管理的若干部分的另一个技术。
• hive中table可以拆分成partition,table和partition可以通过‘CLUSTERED BY’进一步分bucket,bucket中的数据可以通过‘SORT BY’排序。
• ‘set hive.enforce.bucketing = true’ 可以自动控制上一轮reduce的数量从而适配bucket的个数,当然,用户也可以自主设置mapred.reduce.tasks去适配bucket个数
• Bucket主要作用:
– 数据sampling
– 提升某些查询操作效率,例如mapside join -
查看sampling数据:
– hive> select * from student tablesample(bucket 1 out of 2 on id);
– tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y)
– y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32
时,抽取(64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。
–x表示从哪个bucket开始抽取,如果需要取多个分区,以后的分区号为当前分区号加上y。例如,table总bucket数为32,tablesample(bucket 3 out of 16),表示总共抽取(32/16=)2个bucket的数据,分别为第3个bucket和第(3+16=)19个bucket的数据。
注意:x 的值必须小于等于 y 的值,否则
FAILED: SemanticException [Error 10061]: Numerator should not be bigger than denominator in sample clause for table stu_buck -
创建分桶表
create table stu_buck(id int, name string) clustered by(id) into 4 buckets
row format delimited fields terminated by ‘\t’; -
查看表结构
hive (default)> desc formatted stu_buck;
Num Buckets: 4 -
导入数据到分桶表中
hive (default)> load data local inpath ‘/opt/module/datas/student.txt’ into table stu_buck;
常用查询函数
-
空字段赋值
NVL:给值为 NULL 的数据赋值,它的格式是 NVL( string1, replace_with)。它的功能是如果string1 为 NULL,则 NVL 函数返回 replace_with 的值,否则返回 string1 的值,如果两个参数都为 NULL ,则返回 NULL。 -
时间类
date_format:格式化时间
select date_format(‘2019-06-29’,‘yyyy-MM-dd’);
date_add:时间跟天数相加
select date_add(‘2019-06-29’,5);
date_sub:时间跟天数相减
select date_sub(‘2019-06-29’,5);
datediff:两个时间相减
select datediff(‘2019-06-29’,‘2019-06-24’); -
CASE WHEN
select
dept_id,
sum(case sex when ‘男’ then 1 else 0 end) male_count,
sum(case sex when ‘女’ then 1 else 0 end) female_count
from emp_sex group by dept_id; -
行转列
CONCAT(string A/col, string B/col…):返回输入字符串连接后的结果,支持任意个输入字符串;
CONCAT_WS(separator, str1, str2,…):它是一个特殊形式的 CONCAT()。第一个参数剩余参数间的分隔符。分隔符可以是与剩余参数一样的字符串。如果分隔符是 NULL,返回值也将为 NULL。这个函数会跳过分隔符参数后的任何 NULL 和空字符串。分隔符将被加到被连接的字符串之间;
COLLECT_SET(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生 array 类型字段。
select
t1.base,
concat_ws(’|’, collect_set(t1.name)) name
from
(select
name,
concat(constellation, “,”, blood_type) base
from
person_info) t1
group by
t1.base; -
列转行
EXPLODE(col):将 hive 一列中复杂的 array 或者 map 结构拆分成多行。
LATERAL VIEW
用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias
解释:用于和 split, explode 等 UDTF 一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。
select
movie,
category_name
from
movie_info lateral view explode(category) table_tmp as category_name; -
窗口函数
OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变化而变化;
CURRENT ROW:当前行;
n PRECEDING:往前 n 行数据;
n FOLLOWING:往后 n 行数据;
UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点,UNBOUNDED FOLLOWING 表示到后面的终点;
LAG(col,n):往前第 n 行数据;
LEAD(col,n):往后第 n 行数据;
NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从 1 开始,对于每一行,NTILE 返回此行所属的组的编号。注意:n 必须为 int 类型。 -
Rank
RANK() 排序相同时会重复,总数不会变;
DENSE_RANK() 排序相同时会重复,总数会减少;
ROW_NUMBER() 会根据顺序计算。
Hive函数
-
系统内置函数
1)查看系统自带的函数
hive> show functions;
2)显示自带的函数的用法
hive> desc function upper;
3)详细显示自带的函数的用法
hive> desc function extended upper; -
自定义函数
1)Hive 自带了一些函数,比如:max/min 等,但是数量有限,自己可以通过自定义 UDF来方便的扩展。
2)当 Hive 提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。
3)根据用户自定义函数类别分为以下三种:
(1)UDF(User-Defined-Function)一进一出
(2)UDAF(User-Defined Aggregation Function)聚集函数,多进一出,类似于:count/max/min
(3)UDTF(User-Defined Table-Generating Functions)一进多出,如 lateral view explore() -
自定义函数步骤
(1)类继承
UDF继承 org.apache.hadoop.hive.ql.UDF,
UDTF继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
(2)重写方法
UDF需要实现 evaluate 函数;evaluate 函数支持重载;
(3)在 hive 的命令行窗口创建函数
a)打成jar上传至服务器,并添加 jar
add jar linux_jar_path
b)创建 function与class关联
create [temporary] function [dbname.]function_name AS class_name;
(4)在 hive 的命令行窗口删除函数
drop [temporary] function [if exists] [dbname.]function_name;压缩和存储
-
要在 Hadoop 中启用压缩,可以配置如下参数(mapred-site.xml 文件中):
Hadoop的压缩可以发生在以下3个地方
文件输入压缩
mapper输出压缩
reducer输出压缩
-
开启 Map 输出阶段压缩
开启 map 输出阶段压缩可以减少 job 中 map 和 Reduce task 间数据传输量。具体配置如下:
1.开启 hive 中间传输数据压缩功能
hive (default)>set hive.exec.compress.intermediate=true;
2.开启 mapreduce 中 map 输出压缩功能
hive (default)>set mapreduce.map.output.compress=true;
3.设置 mapreduce 中 map 输出数据的压缩方式
hive (default)>set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;
4.执行查询语句
hive (default)> select count(ename) name from emp; -
开启 Reduce 输出阶段压缩
当 Hive 将输出写入到表中时,输出内容同样可以进行压缩。属性hive.exec.compress.output 控制着这个功能。用户可能需要保持默认设置文件中的默认值false,这样默认的输出就是非压缩的纯文本文件了。用户可以通过在查询语句或执行脚本中设置这个值为 true,来开启输出结果压缩功能。
1.开启 hive 最终输出数据压缩功能
hive (default)>set hive.exec.compress.output=true;
2.开启 mapreduce 最终输出数据压缩
hive (default)>set mapreduce.output.fileoutputformat.compress=true;
3.设置 mapreduce 最终数据输出压缩方式
hive (default)> set mapreduce.output.fileoutputformat.compress.codec =
org.apache.hadoop.io.compress.SnappyCodec;
4.设置 mapreduce 最终数据输出压缩为块压缩
hive (default)> set mapreduce.output.fileoutputformat.compress.type=BLOCK;
5.测试一下输出结果是否是压缩文件
hive (default)> insert overwrite local directory
‘/opt/module/datas/distribute-result’ select * from emp
distribute by deptno sort by empno desc; -
文件存储格式
Hive 支持的存储数的格式主要有:TEXTFILE 、SEQUENCEFILE、ORC、PARQUET。
1.行存储的特点
查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。
2.列存储的特点
因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。
TEXTFILE 和 SEQUENCEFILE 的存储格式都是基于行存储的;
ORC 和 PARQUET 是基于列式存储的。 -
在实际的项目开发当中,hive 表的数据存储格式一般选择:orc 或 parquet。压缩方式一般选择 snappy,lzo。
Hive优化
- 设置Fetch 抓取
Fetch 抓取是指,Hive 中对某些情况的查询可以不必使用 MapReduce 计算。例如:SELECT * FROM employees;在这种情况下,Hive 可以简单地读取 employee 对应的存储目录下的文件,然后输出查询结果到控制台。在 hive-default.xml.template 文件中 hive.fetch.task.conversion 默认是 more,老版本 hive默认是 minimal,该属性修改为 more 以后,在全局查找、字段查找、limit 查找等都不走mapreduce。 - 开启本地模式
大多数的 Hadoop Job 是需要 Hadoop 提供的完整的可扩展性来处理大数据集的。不过,有时 Hive 的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际 job 的执行时间要多的多。对于大多数这种情况,Hive 可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。用户可以通过设置 hive.exec.mode.local.auto 的值为 true,来让 Hive 在适当的时候自动启动这个优化。
set hive.exec.mode.local.auto=true; //开启本地 mr
//设置 local mr 的最大输入数据量,当输入数据量小于这个值时采用 local mr 的
方式,默认为 134217728,即 128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
//设置 local mr 的最大输入文件个数,当输入文件个数小于这个值时采用 local mr的方式,默认为 4
set hive.exec.mode.local.auto.input.files.max=10; - 小表大表join
小表jion大表
将 key 相对分散,并且数据量小的表放在 join 的左边,这样可以有效减少内存溢出错误发生的几率;再进一步,可以使用 map join 让小的维度表(1000 条以下的记录条数)先进内存。在 map 端完成 reduce。
实际测试发现:新版的 hive 已经对小表 JOIN 大表和大表 JOIN 小表进行了优化。小表放在左边和右边已经没有明显区别。 - 大表join大表
1.空 KEY 过滤
有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同的 reducer 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key,很多情况下,这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。
2.空 key 转换
有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上。 - MapJoin
如果不指定 MapJoin 或者不符合 MapJoin 的条件,那么 Hive 解析器会将 Join 操作转换成 Common Join,即:在 Reduce 阶段完成 join。容易发生数据倾斜。可以用 MapJoin 把小表全部加载到内存在 map 端进行 join,避免 reducer 处理。
1.开启 MapJoin 参数设置
(1)设置自动选择 MapJoin
set hive.auto.convert.join = true; 默认为 true
(2)大表小表的阈值设置(默认 25M 以下认为是小表):
set hive.mapjoin.smalltable.filesize=25000000;
2.MapJoin 工作机制,如图 6-15 所示
- group by
默认情况下,Map 阶段同一 Key 数据分发给一个 reduce,当一个 key 数据过大时就倾斜了。并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果,若所有操作都必须在reduce端完成,不能在map端预聚合(例如avg()函数),这时就需要开启数据倾斜处理:hive.groupby.skewindata = true。
1.开启 Map 端聚合参数设置
(1)是否在 Map 端进行聚合,默认为 True
hive.map.aggr = true
(2)在 Map 端进行聚合操作的条目数目
hive.groupby.mapaggr.checkinterval = 100000
(3)有数据倾斜的时候进行负载均衡(默认是 false)
hive.groupby.skewindata = true
当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。 - 去重统计
数据量小的时候无所谓,数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成,一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换。 - 避免笛卡尔积
尽量避免笛卡尔积,join 的时候不加 on 条件,或者无效的 on 条件,Hive 只能使用 1个 reducer 来完成笛卡尔积。 - 行列过滤
列处理:在 SELECT 中,只拿需要的列,如果有,尽量使用分区过滤,少用 SELECT *。
行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在 Where 后面,那么就会先全表关联,之后再过滤。 - 动态分区调整
关系型数据库中,对分区表 Insert 数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive 中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用 Hive 的动态分区,需要进行相应的配置。
1.开启动态分区参数设置
(1)开启动态分区功能(默认 true,开启)
hive.exec.dynamic.partition=true
(2)设置为非严格模式(动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。)
hive.exec.dynamic.partition.mode=nonstrict
(3)在所有执行 MR 的节点上,最大一共可以创建多少个动态分区。
hive.exec.max.dynamic.partitions=1000
(4)在每个执行 MR 的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即 day 字段有 365 个值,那么该参数就需要设置成大于 365,如果使用默认值 100,则会报错。
hive.exec.max.dynamic.partitions.pernode=100
(5)整个 MR Job 中,最大可以创建多少个 HDFS 文件。
hive.exec.max.created.files=100000
(6)当有空分区生成时,是否抛出异常。一般不需要设置。
hive.error.on.empty.partition=false - 合理设置 Map 数
1)通常情况下,作业会通过 input 的目录产生一个或者多个 map 任务。
主要的决定因素有:input 的文件总个数,input 的文件大小,集群设置的文件块大小。
2)是不是 map 数越多越好?
答案是否定的。如果一个任务有很多小文件(远远小于块大小 128m),则每个小文件也会被当做一个块,用一个 map 任务来完成,而一个 map 任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的 map 数是受限的。
3)是不是保证每个 map 处理接近 128m 的文件块,就高枕无忧了?
答案也是不一定。比如有一个 127m 的文件,正常会用一个 map 去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果 map 处理的逻辑比较复杂,用一个 map任务去做,肯定也比较耗时。针对上面的问题 2 和 3,我们需要采取两种方式来解决:即减少 map 数和增加 map 数。
小文件进行合并
在 map 执行前合并小文件,减少 map 数:CombineHiveInputFormat 具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat 没有对小文件合并功能。 - 复杂文件增加 Map 数
当 input 的文件都很大,任务逻辑复杂,map 执行非常慢的时候,可以考虑增加 Map数,来使得每个 map 处理的数据量减少,从而提高任务的执行效率。增加 map 的方法为:根据
computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式,调整 maxSize 最大值。让 maxSize 最大值低于 blocksize 就可以增加 map 的个数。 - 合理设置 Reduce 数
1.调整 reduce 个数方法一
(1)每个 Reduce 处理的数据量默认是 256MB
hive.exec.reducers.bytes.per.reducer=256000000
(2)每个任务最大的 reduce 数,默认为 1009
hive.exec.reducers.max=1009
(3)计算 reducer 数的公式
N=min(参数 2,总输入数据量/参数 1)
2.调整 reduce 个数方法二
在 hadoop 的 mapred-default.xml 文件中修改,设置每个 job 的 Reduce 个数
set mapreduce.job.reduces = 15;
3.reduce 个数并不是越多越好
1)过多的启动和初始化 reduce 也会消耗时间和资源;
2)另外,有多少个 reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置 reduce 个数的时候也需要考虑这两个原则:处理大数据量利用合适的 reduce 数;使单个 reduce 任务处理数据量大小要合适。 - 并行执行
Hive 会将一个查询转化成一个或者多个阶段。这样的阶段可以是 MapReduce 阶段、抽样阶段、合并阶段、limit 阶段。或者 Hive 执行过程中可能需要的其他阶段。默认情况下,Hive 一次只会执行一个阶段。不过,某个特定的 job 可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个 job 的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么 job 可能就越快完成。通过设置参数 hive.exec.parallel 值为 true,就可以开启并发执行。不过,在共享集群中,需要注意下,如果 job 中并行阶段增多,那么集群利用率就会增加。
set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个 sql 允许最大并行度,默认为 8。 - 严格模式
Hive 提供了一个严格模式,可以防止用户执行那些可能意想不到的不好的影响的查询。通过设置属性 hive.mapred.mode 值为默认是非严格模式 nonstrict 。开启严格模式需要修改 hive.mapred.mode 值为 strict,开启严格模式可以禁止 3 种类型的查询。
- 对于分区表,除非 where 语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
- 对于使用了 order by 语句的查询,要求必须使用 limit 语句。因为 order by 为了执行排序过程会将所有的结果数据分发到同一个 Reducer 中进行处理,强制要求用户增加这个LIMIT 语句可以防止 Reducer 额外执行很长一段时间。
- 限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行 JOIN 查询的时候不使用 ON 语句而是使用 where 语句,这样关系数据库的执行优化器就可以高效地将WHERE 语句转化成那个 ON 语句。不幸的是,Hive 并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。
- JVM 重用
JVM 重用是 Hadoop 调优参数的内容,其对 Hive 的性能具有非常大的影响,特别是对于很难避免小文件的场景或 task 特别多的场景,这类场景大多数执行时间都很短。Hadoop 的默认配置通常是使用派生 JVM 来执行 map 和 Reduce 任务的。这时 JVM 的启动过程可能会造成相当大的开销,尤其是执行的 job 包含有成百上千 task任务的情况。JVM重用可以使得 JVM 实例在同一个 job 中重新使用 N 次。N 的值可以在 Hadoop 的mapred-site.xml 文件中进行配置。通常在 10-20 之间,具体多少需要根据具体业务场景测试得出。
<property>
<name>mapreduce.job.jvm.numtasks</name>
<value>10</value>
<description>How many tasks to run per jvm. If set to -1, there is
no limit.
</description>
</property>
- 推测执行
在分布式集群环境下,因为程序 Bug(包括 Hadoop 本身的 bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有 50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。为了避免这种情况发生,Hadoop 采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
设置开启推测执行参数:Hadoop 的 mapred-site.xml 文件中进行配置:
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some map tasks
may be executed in parallel.</description>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some reduce tasks
may be executed in parallel.</description>
</property>
不过 hive 本身也提供了配置项来控制 reduce-side 的推测执行:
<property>
<name>hive.mapred.reduce.tasks.speculative.execution</name>
<value>true</value>
<description>Whether speculative execution for reducers should
be turned on.
</description>
</property>