一、DataFrame的两种编程风格
- DSL语法风格
- DSL称之为:领域特定语言
- 其实就是指DataFrame的特有API
- DSL风格意思就是以调用API的方式来处理Data
- 比如:df.where().limit()
- SQL语法风格
- SQL风格就是使用SQL语句处理DataFrame的数据
- 比如:spark.sql(“SELECT * FROM xxx)
二、DSL风格
-
show方法:
- 功能:展示DataFrame中的数据,默认20条
- df.show(参数1, 参数2)
- 参数1: 默认是20, 控制展示多少条
- 参数2: 是否阶段列, 默认只输出20个字符的长度, 过长不显示, 要显示的话 请填入 truncate = True
-
printSchema方法:
- 功能:打印输出df的schema信息
- 语法:df.printSchema()
-
select方法:
-
功能:选择DataFrame中的指定列(通过传入参数进行指定)
-
语法:
-
-
可传递:
- 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串列名来指定列
- List[Column]对象或者List[str]对象, 用来选择多个列
-
filter和where方法:
- 功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame
- 语法:
- df.filter()
- df.where()
- where和filter功能上是等价的
-
groupBy 分组方法:
- 功能:按照指定的列进行数据的分组, 返回值是GroupedData对象
- 语法:
- df.groupBy()
- df.groupBy()
三、SQL风格
- 注册DataFrame成为表
-
使用SQL查询
- 通过sparksession.sql(sql语句)来执行sql查询,返回值是一个新的df
# 注册好表后,就可以写sql查询 df2 = spark.sql("""SELECT * FROM score WHERE score < 99""") df2.show()
四、DataFrame的全局表和临时表
- 全局表
- 跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:
global_temp
- 跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:
- 临时表
- 只在当前SparkSession中可用
五、pyspark.sql.functions包
- 这个包里面提供了一些列的计算函数供SparkSQL使用
- ```from pyspark.sql import functions as F`
- 返回值多数都是Column对象
六、SparkSQL Shuffle分区数目
在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions)为200,在实际项目中要合理的设置
-
配置文件:
conf/spark-defaults.conf
:spark.sql.shuffle.partitions 100
-
在客户端提交参数中:
bin/spark-submit --conf "spark.sql.shuffle.partitions=100"
-
在代码中可以设置:
spark = SparkSession.builder.appName("create df").master("loacl[*]").config("spark.sql.shuffle.partitions","2").getOrCreate()
七、数据清洗API
-
去重方法:dropDuplicates
-
功能:对DF的数据进行去重,如果重复数据有多条,取第一条
-
# 去重API dropDuplicates,无参数是对数据进行整体去重 df.dropDuplicates().show() # API同样可以针对字段进行去重,如下传入age字段,表示只要年龄一样,就认为是重复数据 df.dropDuplicates(['age','job']).show()
-
-
删除缺失值的行方法 dropna
-
功能:如果数据中包含null,通过dropna来进行判断,符合条件就删除这一行数据
-
# 如果有缺失,进行数据删除 # 无参数,为how=any执行,只要有一个列是null,数据整行删除,如果填入how='all',表示全部列为空,才会删除,how参数默认是'any' df.dropna().show() # 指定阈值进行删除,thresh=3表示,有效的列最少有3个,这行数据才保留 # 设定thresh后,how参数无效了 df.dropna(thresh=3).show() # 可以指定阈值,以及配合指定列进行工作 # thresh=2,subset=['name','age']表示针对这2个列,有效列最少为2个才保留数据 df.dropna(thresh=2,subset=['name','age']).show()
-
-
填充缺失值数据 fillna
-
功能:根据参数的规则,来进行null的替换
-
# 将所有的空,按照指定的值进行填充,不理会列的,任何空都被填充 df.fillna('loss').show() # 指定列进行填充 df.fillna("loss",subset=['job']).show() # 给定字典,设定各个列的填充规则 df.fillna({'name':'未知姓名','age':1,'job':'worker'}).show()
-
八、SparkSQL统一API写出DataFrame数据
- 语法
df.write.mode().format().option(K,V).sava(PATH)
# mode,传入模式字符串,可选:append 追加,overwrite 覆盖,ignore 忽略,error 重复就报异常(默认)
# format,传入格式字符串,可选:text,csv,json,parquet,orc,avro,jdbc
# 注意text源只支持单列df写出
# option 设置属性,如:.option("sep",",")
# save 写出的路径,支持本地文件和HDFS
- 常见的源写出
# Write text写出,只能写出一个单列数据
df.select(F.concat_ws("---","user_id","movie_id","rank","ts")).\
write.\
mode("overwrite").\
format("text").\
save("file:///export/data/output/sql/text")
# Write CSV写出
df.write.mode("overwrite").\
format("csv").\
option("sep",",").\
option("header",True).\
save("file:///export/data/output/csv")
# Write Json写出
df.write.mode("overwrite").\
format("json").\
save("file:///export/data/output/json")
# Write Parquet 写出
df.write.mode("overwrite").\
format("parquet").\
save("file:///export/data/output/parquet")
# 不给format,默认以parquet写出
df.write.mode("overwrite").save("file:///export/data/output/parquet")
- 将数据写出到Hive表中(写出Hive表需要配置好Hive和Spark的集成)
- 就是写入到被Hive元数据MetaStore服务管理的地方
df.write.mode("append|overwrite|ignore|error").saveAsTable(参数1,参数2)
# 参数1:表名 如果指定数据库,可以写数据库.表名
# 参数2:格式,推荐使用parquet
九、DataFrame通过JDBC读写数据库(MySQL为例)
读取JDBC是需要有驱动的,我们读取的是jdbc:mysql://
这个协议,也就是读取的事mysql的数据,既然如此,就需要有mysql的驱动jar包给spark程序用
点击下载mysql5驱动文件,密码:xjb5
点击下载mysql8驱动文件,密码:j082
- windows系统(使用本地解释器)
- 将jar包放在:
Anaconda3的安装路径下\envs\虚拟环境\Lib\site-packages\pyspark\jars
- 将jar包放在:
- Linux系统(使用远程解释器执行)
- 将jar包放在:
Anaconda3的安装路径下/envs/虚拟环境/lib/python3.8/site-packages/pyspark/jars
- 将jar包放在:
# 写DF通过JDBC到数据库中
df.write.mode("overwrite").\
format("jdbc").\
option("url","jdbc:mysql://node1:3306/test?useSSL=false&useUnicode=true").\
option("dbtable","u_data").\
option("user","root").\
option("password","123456").\
save()
- 注意:
- jdbc连接字符串中,建议使用useSSL=false确保连接可以正常连接(不适用SSL安全协议进行连接)
- jdbc连接字符串中,建议使用useUnicode=true来确保传输中不出现乱码
- save()不啊哟填参数,没有路径,是写出数据库
- dbtable属性:指定写出的表名
# 从MySQL中读取数据
df = spark.read.format("jdbc").\
option("url","jdbc:mysql://node1:3306/test?useSSL=false&useUnicode=true").\
option("dbtable","u_data").\
option("user","root").\
option("password","123456").\
load()
- 注意:
- 读出来是自带schema,不需要设置schema,因为数据库就又schema
- load()不需要加参数,没有路径,从数据库中读取
- dbtable是指定读取的表名