一.Pandas简介
1、基本介绍
-
Pandas是Python的一个第三方包,也是商业和工程领域最流行的结构化数据工具集,用于数据清洗、处理以及分析
-
Pandas和Spark SQL中很多功能都类似,甚至使用方法都是相同的
-
Pandas适用场景
-
Pandas用于处理单机数据
-
可以在数据ETL、查询分析、报表输出等环节使用
-
2.数据结构
Python中的Pandas的DataFrame数据结构:
DataFrame:表示一个二维表对象,就是表示整个表
字段,列,索引;Series表示一行或者一列
二.Spark SQL函数定义
1.窗口函数
2.SQL函数分类
SQL函数,主要分为以下三大类:
①UDF函数:用户自定义函数
特点:一对一,输入一个得到一个
例如:split() substr()
②UDAF函数:用户自定义聚合函数
特点:多对一,输入多个得到一个
例如:sum()
③UDTF函数:用户自定义表数据生成函数
特点:一对多,输入一个得到多个
例如:explode()
在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数
思考:有这么多的内置函数,为啥还需要自定义函数呢?
在Spark SQL中,针对Python语言,对于自定义函数,原生支持的并不是特别好。目前原生仅支持自定义UDF函数,而无法自定义UDAF函数和UDTF函数。
在1.6版本后,Java 和scala语言支持自定义UDAF函数,但Python并不支持。
Spark SQL原生存在的问题:大量的序列化和反序列
3.Spark原生自定义UDF函数
自定义函数流程:
4.Pandas的UDF函数
4.1 Apache Arrow框架基本介绍
Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层,它的设计目标就是作为一个跨平台的数据层,来加快大数据分析项目的运行效率
Pandas 与 Spark SQL 进行交互的时候,建立在Apache Arrow上,带来低开销 高性能的UDF函数
Arrow并不会自动使用,在某些情况下,需要配置 以及在代码中需要进行小的更改才可以使用
4.2 基于Arrow完成Pandas DataFrame和Spark DataFrame互转
使用场景:
1- Spark的DataFrame -> Pandas的DataFrame:当大数据处理到后期的时候,可能数据量会越来越少,这样可以考虑使用单机版的Pandas来做后续数据的分析
2- Pandas的DataFrame -> Spark的DataFrame:当数据量达到单机无法高效处理的时候,或者需要和其他大数据框架集成的时候,可以转成Spark中的DataFrame
4.3 基于Pandas完成UDF函数
基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。
Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型
基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数
三.Spark on Hive
1.集成原理
2.在代码中集成Hive
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
"""
spark.sql.warehouse.dir:告诉Spark数据存储在什么地方。默认使用本地磁盘进行存储。推荐使用HDFS
hive.metastore.uris:告诉Spark元数据信息去什么地方找MetaStore
enableHiveSupport():开启SparkSQL和Hive的集成
"""
spark = SparkSession.builder\
.config("spark.sql.warehouse.dir","hdfs://node1:8020/user/hive/warehouse")\
.config("hive.metastore.uris","thrift://node1.itcast.cn:9083")\
.appName('sparksql_hive')\
.master('local[*]')\
.enableHiveSupport()\
.getOrCreate()
# 2- 数据输入
# 3- 数据处理
# 4- 数据输出
spark.sql("show databases").show()
spark.sql("""
select
*
from day07.student
where id>=2
""").show()
# 5- 释放资源
spark.stop()