0
点赞
收藏
分享

微信扫一扫

SparkSQL指南-快速开始(Python版)——(一)

伽马星系 2023-03-07 阅读 68

快速开始

创建环境

Spark程序主要分为三个阶段:

  • 创建环境
  • 数据操作
  • 关闭环境(在Streaming程序中为执行环境)
    下面是批处理的Spark SQL API的创建环境的类:SparkSession,其目的是为了创建基本的Spark SQL的环境。

from pyspark.sql import SparkSession
spark=SparkSession.builder\
.appName("Spark SQL basic test")\
.getOrCreate()

注意:一定要有“\”
其中还可以指定操作,比如连接Mongodb的操作,支持Hive的操作,具体的写法类似于如下:

from pyspark.sql import SparkSession

if __name__ == '__main__':
spark=SparkSession.builder\
.appName("Spark SQL basic test")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()

数据操作

创建DataFrames

创建DataFrames其实和python中的Datarame类似,也是一种表的表达方式。在Saprk中创建DataFrame的来源有很多,很多中API都是通过RDD来创建的,或者读取本地文件;但是在实际的任务中最常见的还是从数据库(Hive,Mongodb,MySQL)中创建DataFrame。由于这里是快速上手所以,该例子只使用read.json()
文件内容为:

{"name":"Kone"}
{"name":"Alices", "age":30}
{"name":"Bob", "age":19}

代码:

from pyspark.sql import SparkSession

if __name__ == '__main__':
spark=SparkSession.builder\
.appName("Spark SQL basic test")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()

df=spark.read.json("your json file path")
df.show()

输出为:

+----+------+
| age| name|
+----+------+
|null| Kone|
| 30|Alices|
| 19| Bob|
+----+------+

在DataFrames上的操作

上文讲过DataFrame相当于一个表,则对表有很多结构化的操作,那么其实MySQL的相关操作都可在Spark DataFrame上实现。下面是几种常见的操作:

  • 访问数据
    在访问数据中存在两种访问:属性(df.age)和索引(df["age"])。在Spark中更加建议后一种访问方式:

df=spark.read.json(jsonPath)
df.select(df.age).show() # 使用属性来访问数据
df.select(df["age"]).show() # 使用索引来访问数据
df.select("name").show() # 使用select来访问数据

输出:

+----+
| age|
+----+
|null|
| 30|
| 19|
+----+

+----+
| age|
+----+
|null|
| 30|
| 19|
+----+

+------+
| name|
+------+
| Kone|
|Alices|
| Bob|
+------+

  • 输出DataFrame的结构

df.printSchema()

输出:

root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

  • 对DataFrame的一列进行操作

df.select(df["name"], df["age"]+1).show()

输出:

+------+---------+
| name|(age + 1)|
+------+---------+
| Kone| null|
|Alices| 31|
| Bob| 20|
+------+---------+

  • 对表进行筛选
    下面筛选出name的以“K”开头的字符串。

df.filter(df["name"].startswith("K")).show()

输出:

+----+----+
| age|name|
+----+----+
|null|Kone|
+----+----+

  • 聚集操作
    原数据Json文件为:

{"name":"Kone"}
{"name":"Alices", "age":30}
{"name":"Bob", "age":19}
{"name":"Kven", "age":19}
{"name":"Dven", "age":18}

代码为:

df.groupBy(df["age"]).count().show()

输出:

+----+-----+
| age|count|
+----+-----+
| 19| 2|
|null| 1|
| 18| 1|
| 30| 1|
+----+-----+

使用反射推断模式

Spark SQL可以将Row对象的RDD转化为DataFrame,从而推断数据类型。行是通过将键值对列表作为kwargs传递给Row类来构造。



举报

相关推荐

0 条评论