文章目录
- RDD, DataFrame ,DataSet 比较
- 案例
- DSL 与 SQL 语法风格
- 视图
- 会话临时视图
- 全局临时视图
RDD, DataFrame ,DataSet 比较
dataFrame在内存中映射为一张表,RDD相当于表中一行数据,dataset是具备RDD和dataFrame所有优点,强数据类型,保证编译时数据类型安全,符合面向对象编程,便于使用lamba函数
在spark2 版本中 dataFram源码已被移除,但是约定 DataFrame-DataSet[Row]
主要区别
RDD可以知道每个元素具体类型,不知道元素具体属性
DataFrame数据类Row类型,数据列以及名称
DataSet 数据类型,字段名,字段类型
案例
统计年龄小于25的女生
import org.apache.spark.sql.SparkSession
object TestSQL {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("test")
.getOrCreate()
import spark.implicits._
val df = spark.read.json("d:/test/a.json")
df.show()
df.count()
df.filter($"sex" === "F").filter($"age" < 25).show()
val ds = spark.read.json("d:/test/a.json").as[User]
ds.filter(_.sex == "F").filter(_.age < 25).show()
spark.stop()
}
}
case class User(name: String, age: Int, sex: String, addr: Array[String])
$“sex” === “F” 是SQLContext中特殊的判断表达式,其中 三=为Column类中方法名,用于判断符号两边是否相等,不等为=!=,在引入此表达式时,需要引入 import spark.implicits._ 类包
DSL 与 SQL 语法风格
DSL 风格:意为领域专用语言,开发者通过方法实现SQL功能
import org.apache.spark.sql.SparkSession
object TestSQL2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("test")
.getOrCreate()
import spark.implicits._
val df = spark.read.json("d:/test/a.json")
df.show()
df.select($"name", $"age", $"sex").filter($"sex" === "F" && $"age" < 25)
df.groupBy("sex")
.agg(Map(
"age" -> "max",
"name" -> "count"
))
df.select($"name").map(row => row.getAs[String]("name").toLowerCase).show()
spark.stop()
}
}
SQL 风格
import org.apache.spark.sql.SparkSession
object TestSQL2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("test")
.getOrCreate()
import spark.implicits._
val df = spark.read.json("d:/test/a.json")
df.createTempView("t_user")
spark.sql("select count(1) from t_user").show()
spark.stop()
}
}
注意: SQL末尾不应有;号
视图
会话临时视图
import org.apache.spark.sql.SparkSession
object TestSQL2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("test")
.getOrCreate()
import spark.implicits._
val df = spark.read.json("d:/test/a.json")
df.createTempView("t_user")
spark.sql("select count(1) from t_user").show()
spark.newSession().sql("select count(1) from t_user").show()
spark.stop()
}
}
createTempView只针对当前会话有效,且当前视图中不允许出现重复的视图,如果出现同名视图 使用createOrReplaceTempView
spark.newSession().sql(“select count(1) from t_user”).show() 是开启新会话,原视图不可用
全局临时视图
import org.apache.spark.sql.SparkSession
object TestSQL2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("test")
.getOrCreate()
import spark.implicits._
val df = spark.read.json("d:/test/a.json")
df.createGlobalTempView("t_user")
spark.sql("select count(1) from t_user").show()
spark.newSession().sql("select count(1) from t_user").show()
spark.stop()
}
}
全局视图无会话间视图无法共享的概念多个会话之间共享
删除临时视图
spark.sqlContext.dropTempTable("t_user")
清楚视图中缓存的数据
spark.sqlContext.uncacheTable("t_user")