目录
PySpark SQL
基础
PySpark SQL与Hive的异同
这里的重点是:Spark SQL能支持SQL和其他代码混合执行,自由度更高,且其是内存计算,更快。但是其没有元数据管理,然而它最终还是会作用到Hive层面,可以调用Hive的Metasotre
SparkSQL的基本对象是DataFrame,其特点及与其他对象的区别为:
SparkSession对象
在RDD阶段,程序的执行入口对象是: SparkContext
在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象
SparkSession对象可以:
-用于SparkSQL编程作为入口对象
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
sc = spark.sparkContext
DataFrame入门
DataFrame构建
1、用RDD进行构建
rdd的结构要求为:[[xx,xx],[xx,xx]]
spark.createDataFrame(rdd,schema=[])
spark = SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile('data/input/sql/people.txt').map(lambda x:x.split(',')).map(lambda x:[x[0],int(x[1])])
print(rdd.collect())
# [['Michael', 29], ['Andy', 30], ['Justin', 19]]
df = spark.createDataFrame(rdd,schema=['name','age'])
df.printSchema()#打印表结构
df.show()#打印表
# root
# | -- name: string(nullable=true)
# | -- age: long(nullable=true)
#
# +-------+---+
# | name | age |
# +-------+---+
# | Michael | 29 |
# | Andy | 30 |
# | Justin | 19 |
# +-------+---+
2、利用StructType进行创建
需要先引入StructType,StringType,IntegerType等构建schema
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
if __name__ == '__main__':
spark = SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile('data/input/sql/people.txt').map(lambda x:x.split(',')).map(lambda x:[x[0],int(x[1])])
#构建schema
schema =StructType().add("name",StringType(),nullable=False).\
add('age',IntegerType(),nullable=True)
df = spark.createDataFrame(rdd,schema=schema)
df.printSchema()
df.show()
3、toDF将rdd转换为df
下面展示了两种方式
# 只设定列名,列的数据结构则是内部自己判断
df = rdd.toDF(['name','age'])
df.printSchema()
# root
# | -- name: string(nullable=true)
# | -- age: long(nullable=true)
# 设定列名和数据类型
schema =StructType().add("name",StringType(),nullable=False).\
add('age',IntegerType(),nullable=True)
df = rdd.toDF(schema=schema)
df.printSchema()
# root
# | -- name: string(nullable=false)
# | -- age: integer(nullable=true)
4、基于pandas构建
dfp = pd.DataFrame({
"id":[1,2,3],
'score':[99,98,100]
})
df = spark.createDataFrame(dfp)
df.printSchema()
df.show()
# root
# | -- id: long(nullable=true)
# | -- score: long(nullable=true)
#
# +---+-----+
# | id | score |
# +---+-----+
# | 1 | 99 |
# | 2 | 98 |
# | 3 | 100 |
# +---+-----+
5、通过文件读取创造
在读取json和parquet文件时不需要设定schema,因为文件已经自带
而读取csv时,还需要使用.option设定 header等参数
这里说一下parquet文件
DataFrame代码风格
DSL
其实就是用其内置的API处理数据,举例:
df.select('id','subject').show()
df.where('subject="语文"').show()
df.select('id','subject').where('subject="语文"').show()
df.groupBy('subject').count().show()
API其实跟SQL类似,这里不详细说明了,个人感觉不如直接写SQL语句
SQL
DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sgl0来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表采用如下的方式:
df.createTempView('tmp') #创建临时视图
df.createGlobalTempView('global_tmp')#创建全局试图
# 全局表: 跨SparkSession对象使用在一个程序内的多个SparkSession中均可调用查询前带上前缀:global_tmp
df.createOrReplaceTempView('repalce_tmp')#创建临时表,如果存在则替换
然后使用spark.sql的形式书写sql代码
spark.sql('select * from tmp where subject = "语文"').show()
spark.sql('select id,score from repalce_tmp where score>90').show()
spark.sql('select subject,max(score) from global_temp.global_tmp group by subject').show()
SparkSQL Shuffle 分区数目
原因: 在SparkSQL中当Job中产生Shufle时,默认的分区数 spark.sql.shufle,partitions 为200,在实际项目中要合理的设置。
在代码中可以设置:
spark = SparkSession.builder.appName('lmx').\
master('local[*]').config('spark.sql.shufle,partitions',2).\
getOrCreate()
DataFrame数据写出
统一API:
下面提供两种方法,分别写出为json和csv
spark.sql(
'select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc'
).write.mode('overwrite').format('json').save('data/output/1t')
spark.sql(
'select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc'
).write.mode('overwrite').format('csv')\
.option('header',True)\
.option('sep',';')\
.save('data/output/csv')
其他的一些方法:
SparkSQL中读取数据和写出数据 - 知乎
不过这里似乎不能自己命名导出的数据文件
Spark UDF
在SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF
UDF有两种定义方式
举例:
def double_score(num):
return 2*num
udf1 = spark.udf.register('udf_1',double_score,IntegerType())
# dsl风格
df.select(udf1(df['score'])).show()
# sql风格
df.selectExpr('udf_1(score)').show()
# sql风格2
df.createTempView('tmp')
spark.sql("select udf_1(score) from tmp").show()
udf2 = F.udf(double_score,IntegerType())
df.select(udf2(df['score'])).show()
当返回值是数组时,需要定义数组内部数据的数据类型:ArrayType(StringType())
spark = SparkSession.builder.appName('lmx').master('local[*]').config('spark.sql.shufle,partitions',2).getOrCreate()
sc = spark.sparkContext
rdd=sc.parallelize([['i love you'],['i like you']])
df = rdd.toDF(['ifo'])
def func(num):
return num.split(' ')
udf = spark.udf.register('udf_sql',func,ArrayType(StringType()))
# dsl风格
df.select(udf(df['ifo'])).show()
当返回值是字典时,需要使用StructType(),且定义每个列的名字(需要跟函数返回值的列名一样)和数据类型
rdd=sc.parallelize([[1],[2],[3],[4],[5]])
df = rdd.toDF(['ifo'])
df.show()
def func(num):
return {'num':num,'num1':num+10}
udf = spark.udf.register('udf_sql',func,StructType().\
add('num',IntegerType(),nullable=False).\
add('num1',IntegerType(),nullable=False))
df.select(udf(df['ifo'])).show()
Catalyst优化器
SparkSQL会对写完的代码,执行“自动优化”,既Catalyst优化器,以提升代码运行效率,避免开发者水平影响到代码执行效率。 (RDD代码不会,是因为RDD的数据对象太过复杂,无法被针对性的优化)
加入优化的SparkSQL大致架构为:
Catalyst优化器主要分为四个步骤
1、解析sql,生成AST(抽象语法树)
2、在 AST 中加入元数据信息,做这一步主要是为了一些优化,例如 col=col 这样的条件
以上面的图为例:
- score.id → id#1#L 为 score.id 生成 id 为1,类型是 Long
- score.math_score→math_score#2#L为 score.math_score 生成 id 为 2,类型为 Long
- people.id→id#3#L为 people.id 生成 id 为3,类型为 Long
- people.age→age#4#L为 people.age 生成 id 为 4,类型为 Long
3、对已经加入元数据的 AST,输入优化器,进行优化,主要包含两种常见的优化:
4、上面的过程生成的 AST 其实最终还没办法直接运行,这个 AST 叫做 逻辑计划,结束后,需要生成 物理计划,从而生成 RDD 来运行
Spark SQL的执行流程
如此,Spark SQL的执行流程为:
Spark新特性
自适应查询(SparkSQL)(AQE)
即:Adaptive Query Execution
由于缺乏或者不准确的数据统计信息(元数据)和对成本的错误估算(执行计划调度)导致生成的初始执行计划不理想
在Spark3.x版本提供Adaptive Query Execution自适应查询技术 通过在”运行时”对查询执行计划进行优化, 允许Planner在运行时执行可选计划,这些可选计划将会基于运行时数据统 计进行动态优化, 从而提高性能,其开启方式为:
set spark.sql.adaptive.enabled = true;
Adaptive Query Execution AQE主要提供了三个自适应优化:
动态合并shuffle分区
即:Dynamically coalescing shuffle partitions
可以动态调整shuffle分区的数量。用户可以在开始时设置相对较多的shuffle分区数,AQE会在运行时将相邻的小分区 合并为较大的分区。
动态调整Join策略
即:Dynamically switching join strategies
此优化可以在一定程度上避免由于缺少统计信息或着错误估计大小(当然也可能两种情况同时存在),而导致执行计 划性能不佳的情况。这种自适应优化可以在运行时sort merge join转换成broadcast hash join,从而进一步提升性能
其实就是将小的表设置为 广播表,使得所有大的表都能获得全部的小表,减少了后续的网络传输
动态优化倾斜Join
shuffle时将过于大的数据分成与其他数据分区大小相似的n个分区,已实现数据分区均衡
skew joins可能导致负载的极端不平衡,并严重降低性能。在AQE从shuffle文件统计信息中检测到任何倾斜后,它可 以将倾斜的分区分割成更小的分区,并将它们与另一侧的相应分区连接起来。这种优化可以并行化倾斜处理,获得更 好的整体性能。
触发条件: 1. 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionFactor (default=10) * "median partition size(中位数分区大小)"
2. 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default = 256MB )
动态分区裁剪(SparkSQL)
即:Dynamic Partition Pruning
当优化器在编译时无法识别可跳过的分区时,可以使用"动态分区裁剪",即基于运行时推断的信息来进一步进行分区 裁剪。这在星型模型中很常见,星型模型是由一个或多个并且引用了任意数量的维度表的事实表组成。在这种连接操 作中,我们可以通过识别维度表过滤之后的分区来裁剪从事实表中读取的分区。
Spark SQL深入分析之动态分区裁剪(Dynamic Partition Pruning) - 知乎