一、Spark SQL
1.概述
Spark SQL是Spark用于结构化数据(structured data)处理的Spark模块。
与基本的Spark RDD API不同,Spark SQL的抽象数据类型为Spark提供了关于数据结构和正在执行的计算的更多信息。
在内部,Spark SQL使用这些额外的信息去做一些额外的优化,有多种方式与Spark SQL进行交互,比如: SQL和DatasetAPI。
当计算结果的时候,使用的是相同的执行引擎,不依赖你正在使用哪种API或者语言。这种统一也就意味着开发者可以很容易在不同的API之间进行切换,这些API提供了最自然的方式来表达给定的转换。
Hive是将 HiveSQL转换成 MapReduce然后提交到集群上执行,大大简化了编写 MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!
Spark SQL它提供了2个编程抽象,类似Spark Core中的RDD
- DataFrame
- DataSet
2.Spark SQL特点
1.易整合
无缝的整合了 SQL 查询和 Spark 编程
2.统一的数据访问方式
使用相同的方式连接不同的数据源
3.兼容Hive
在已有的仓库上直接运行 SQL 或者 HiveQL
4.标准的数据连接
通过 JDBC 或者 ODBC 来连接
3.DataFrame
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从 API 易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API 要更加友好,门槛更低。
左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。
DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待
DataFrame也是懒执行的,但性能上比RDD要高,主要原因:优化的执行计划,即查询计划通过Spark catalyst optimiser进行优化。比如下面一个例子:
为了说明查询优化,我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。
如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。
4.DataSet
DataSet是分布式数据集合。DataSet是Spark 1.6中添加的一个新抽象,是DataFrame的一个扩展。它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。DataSet也可以使用功能性的转换(操作map,flatMap,filter等等)。
- 是DataFrame API的一个扩展,是SparkSQL最新的数据抽象
- 用户友好的API风格,既具有类型安全检查也具有DataFrame的查询优化特性;
- 用样例类来定义DataSet中数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称;
- DataSet是强类型的。比如可以有DataSet[Car],DataSet[Person]。
- DataFrame是DataSet的特列,DataFrame=DataSet[Row] ,所以可以通过as方法将DataFrame转换为DataSet。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息都用Row来表示。
二、Spark SQL编程
1.SparkSession
在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。当我们使用 spark-shell 的时候, spark 会自动的创建一个叫做spark的SparkSession, 就像我们以前可以自动获取到一个sc来表示SparkContext
2.DataFrame
Spark SQL的DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成SQL表达式。DataFrame API 既有transformation操作也有action操作,DataFrame的转换从本质上来说更具有关系, 而 DataSet API 提供了更加函数式的 API
1.创建DataFrame
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。
1)从Spark数据源进行创建
查看Spark支持创建文件的数据源格式
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
读取json文件创建DataFrame
scala> val df = spark.read.json("/opt/module/spark-local/test.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
df.show
2)从RDD进行转换
xxxxxxxxxxxxxxxxxxx
3)Hive Table进行查询返回
xxxxxxxxxxxxxxxxxxx
2.SQL风格语法
SQL语法风格是指我们查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助
createGlobalTempView
创建全局视图
createTempView
创建临时视图
createOrReplaceTempView
创建或替换临时视图
1)创建一个DataFrame
scala> var df = spark.read.json("/opt/module/spark-local/test.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)对DataFrame创建一个临时表
scala> df.createTempView("test")
3)通过SQL语句实现查询全表
scala> var sqlDF = spark.sql("select * from test")
4)结果show
普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.test
5)对于DataFrame创建一个全局表
scala> df.createGlobalTempView("test")
6)通过SQL语句实现查询全表
scala> spark.sql("SELECT * FROM global_temp.test").show()
scala> spark.newSession().sql("SELECT * FROM global_temp.test").show()
3.DSL风格语法
DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据,可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了
1)创建一个DataFrame
scala> val df = spark.read.json("/opt/module/spark-local /test.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)查看DataFrame的Schema信息
scala> df.printSchema
3)只查看”name”列数据
scala> df.select("name").show()
4)查看所有列
scala> df.select("*").show
5)查看”name”列数据以及”age+1”数据
涉及到运算的时候, 每列都必须使用$
scala> df.select($"name",$"age" + 1).show
6)查看”age”大于”19”的数据
scala> df.filter($"age">19).show
7)按照”age”分组,查看数据条数
scala> df.groupBy("age").count.show
4.RDD转换为DataFrame
注意:如果需要RDD与DF或者DS之间操作,那么都需要引入 import spark.implicits._ (spark不是包名,而是sparkSession对象的名称,所以必须先创建SparkSession对象再导入. implicits是一个内部object)
前置条件
- 导入隐式转换并创建一个RDD
- 在/opt/module/spark-local/目录下准备test.txt
zhangsan,18
lisi,19
wangwu,20
scala> import spark.implicits._
scala> var testRDD = sc textFile("/opt/module/spark-local/test.txt")
输出
testRDD: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/test.txt MapPartitionsRDD[30] at textFile at <console>:27
1)通过手动确定转换
testRDD.map{x=> val fields=x.split(",");(fields(0),fields(1).trim.toInt)}.toDF("name","age").show
2)通过样例类反射转换(常用)
(1)创建一个样例类
scala> case class Test(name:String,age:Int)
(2)根据样例类将RDD转换为DataFrame
testRDD.map{x=> var fields=x.split(",");Test(fields(0),fields(1).toInt)}.toDF.show
5.DataFrame转换为RDD
直接调用rdd
1)创建一个DataFrame
scala> var df = spark.read.json("/opt/module/spark-local/test.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)将DataFrame转换为RDD注意:得到的RDD存储类型为Row
scala> var dfToRDD = df.rdd
dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[43] at rdd at <console>:28
3)打印RDD
scala> dfToRDD.collect
res19: Array[org.apache.spark.sql.Row] = Array([18,zhangsan], [19,lisi], [20,wangwu])
3.DataSet
DataSet是具有强类型的数据集合,需要提供对应的类型信息。
1.创建DataSet
1)使用样例类序列创建DataSet
scala> case class Test(name:String,age:Int)
Seq(Test("wangwu",20)).toDS()
scala> Seq(Test("wangwu",20)).toDS().show
2)使用基本类型的序列创建DataSet
scala> val ds = Seq(1,2,3,4,5,6).toDS
2.RDD转换为DataSet
SparkSQL能够自动将包含有样例类的RDD转换成DataSet,样例类定义了table的结构,样例类属性通过反射变成了表的列名。样例类可以包含诸如Seq或者Array等复杂的结构。
1)创建一个RDD
val testRDD = sc.textFile("/opt/module/spark-local/test.txt")
2)创建一个样例类
case class Test(name:String,age:Int)
3)将RDD转化为DataSet
testRDD.map(line => {val fields = line.split(",");Test(fields(0),fields(1). toInt)}).toDS
3.DataSet转换为RDD
调用rdd方法
1)创建一个DataSet
scala> val DS = Seq(Test("yangbailao", 32)).toDS()
2)将DataSet转换为RDD
scala> DS.rdd
4.DataFrame与DataSet的互操作
1.DataFrame转为DataSet
1)创建一个DateFrame
scala> val df = spark.read.json("/opt/module/spark-local/test.json")
2)创建一个样例类
scala> case class Test(name: String,age: Long)
3)将DataFrame转化为DataSet
scala> df.as[Test]
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。
2.Dataset转为DataFrame
1)创建一个样例类
case class Test(name: String,age: Long)
2)创建DataSet
scala> val ds = Seq(Test("lvdongbin",32)).toDS()
3)将DataSet转化为DataFrame
ds.toDF
5.RDD、DataFrame和DataSet之间的关系
在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口。
1.三者的共性
1)RDD、DataFrame、DataSet全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;
3)三者有许多共同的函数,如filter,排序等;
4)在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入)
5)三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
6)三者都有partition的概念
DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型
2.三者的区别
1)RDD
RDD一般和Spark MLib同时使用
RDD不支SparkSQL操作
2)DataFrame
- 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
- DataFrame与DataSet一般不与 Spark MLib 同时使用
- DataFrame与DataSet均支持 SparkSQL 的操作,比如select,groupby之类,还能注册临时表/视窗,进行 sql 语句操作
- DataFrame与DataSet支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然
3)DataSet
Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。 DataFrame其实就是DataSet的一个特例
type DataFrame = Dataset[Row]
DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息
3.三者的互相转化
6.IDEA创建SparkSQL程序
1)添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
2)代码
package com.spark.day08
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* RDD与DataFrame与DataSet关系转换
*/
object SparkSQL01_Test {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Test")
//创建SparkSQL执行的入口点对象 SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//读取json文件 创建DataFrame
//val df: DataFrame = spark.read.json("D:\\mywork\\IDEAproject\\spark-11\\src\\main\\input\\test.json")
//spark不是包名,不是类名,是我们创建的入口对象SparkSession的对象名称
//不引入无法调用toDF toDs方法
import spark.implicits._
//查看df的数据显式20行
//df.show()
//SQL语法风格 创建临时表
//df.createOrReplaceTempView("user")
//spark.sql("select * from user").show()
//DSL风格
//df.select("name","age").show()
//RDD--->DataFrame--->DataSet
//创建RDD
val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "lvdongbin", 80), (2, "hanzhongli", 70), (3, "caoguojiu", 65)))
//RDD--->DataFrame
val df: DataFrame = rdd.toDF("id","name","age")
//DataFrame--->DataSet
val ds: Dataset[user] = df.as[user]
ds.show()
//DataSet--->DataFrame--->RDD
val df1: DataFrame = ds.toDF()
val rdd1: RDD[Row] = df1.rdd
df1.show()
rdd1.collect().foreach(println)
//释放资源
spark.stop()
}
}
case class user(id:Int,name:String,age:Int)
7.用户自定义函数
1.UDF
输入一行,返回一个结果。在Shell窗口中可以通过spark.udf功能用户可以自定义函数。
package com.spark.day08
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 自定义UDF函数,在每个查询到的名字前,添加问候语
*/
object SparkSQL2_UDF {
def main(args: Array[String]): Unit = {
//创建SparkConf配置文件
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL2_UDF")
//创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//创建DF
val df: DataFrame = spark.read.json("D:\\mywork\\IDEAproject\\spark-11\\src\\main\\input\\test.json")
//注册自定义函数
spark.udf.register("addSayHi",(name:String)=>{"Hello " + name})
//创建一个临时视图
df.createOrReplaceTempView("user")
//通过SQL语句,从临时视图查询数据
spark.sql("select addSayHi(name),age from user").show()
//释放资源
spark.stop()
}
}
2.UDAF
输入多行,返回一行。强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。
需求:实现求平均年龄
1)RDD算子方式实现
/**
*
* 需求:求平均年龄 --rdd算子实现
*/
object SparkSQL3_RDD {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangw", 40)))
//结构转换
val mapRdd: RDD[(Int, Int)] = rdd.map {
case (name, age) => {
(age, 1)
}
}
//对年龄总人数进行聚合操作 (年龄,个数)
val res: (Int, Int) = mapRdd.reduce {
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
}
println(res._1 / res._2)
//释放资源
sc.stop()
}
}
2)自定义累加器方式实现
package com.spark.day08
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
/**
*
* 需求:求平均年龄 --累加器实现
*/
object SparkSQL4_UDAF {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangw", 40)))
//创建累加器
val myAc = new MyAccumulator
//注册累加器
sc.register(myAc)
//使用累加器
rdd.foreach{
case (name,age)=>{
myAc.add(age)
}
}
//获取累加器的值
println(myAc.value)
//释放资源
sc.stop()
}
}
class MyAccumulator extends AccumulatorV2[Int, Double] {
//定义一个变量,汇总年龄次数
var ageSum: Int = 0
var countSum: Int = 0
//定义一个变量,汇总年龄总数
override def isZero: Boolean = {
ageSum == 0 && countSum == 0
}
override def copy(): AccumulatorV2[Int, Double] = {
val newMyAccumulator = new MyAccumulator
newMyAccumulator.ageSum = this.ageSum
newMyAccumulator.countSum = this.countSum
newMyAccumulator
}
override def reset(): Unit = {
ageSum = 0
countSum = 0
}
override def add(v: Int): Unit = {
ageSum += v
countSum += 1
}
override def merge(other: AccumulatorV2[Int, Double]): Unit = {
other match {
case mc: MyAccumulator => {
this.ageSum += mc.ageSum
this.countSum += mc.countSum
}
case _ =>
}
}
override def value: Double = {
ageSum / countSum
}
}
3)自定义聚合函数实现-弱类型(应用于SparkSQL更方便)
package com.spark.day08
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* 自定义UDAF(弱类型主要应用于SQL风格的DF查询)
*/
object SparkSQL05_UDAF {
def main(args: Array[String]): Unit = {
//创建配置文件对象
val conf: SparkConf = new SparkConf().setAppName("SparkSQL05_UDAF").setMaster("local[*]")
//创建SparkSession
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//读取Json文件, 创建df
val df: DataFrame = spark.read.json("D:\\mywork\\IDEAproject\\spark-11\\src\\main\\input\\test.json")
//创建自定义函数对象
val myAvg = new MyAvg
//注册自定义函数
spark.udf.register("myAvg",myAvg)
//创建临时视图
df.createOrReplaceTempView("user")
//使用聚合函数进行查询
spark.sql("select MyAvg(age) from user").show()
//
//释放资源
spark.stop()
}
}
/*
(弱类型)定义类继承UserDefinedAggregateFunction,并重写其中方法
*/
class MyAvg extends UserDefinedAggregateFunction{
//聚合函数的输入的数据类型
override def inputSchema: StructType = {
val structType: StructType = StructType(Array(StructField("age", IntegerType)))
structType
}
// 聚合函数缓冲区中值的数据类型(age,count)
override def bufferSchema: StructType = {
StructType(Array(StructField("sum",LongType),StructField("count",LongType)))
}
// 函数返回值的数据类型
override def dataType: DataType = DoubleType
// //稳定性 默认不处理,直接返回true 相同输入是否会得到相同的输出
override def deterministic: Boolean = true
// 函数缓冲区初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
// 存年龄的总和
buffer(0) = 0L
// 存年龄的个数
buffer(1) = 0L
}
// 更新缓冲区中的数据
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!buffer.isNullAt(0)){
buffer(0) = buffer.getLong(0) + input.getInt(0)
buffer(1) = buffer.getLong(1) + 1L
}
}
// 分区间数据合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 计算最终结果
override def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble/buffer.getLong(1)
}
4)自定义聚合函数实现-强类型(应用于DataSet的DSL更方便)
package com.spark.day08
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn}
/**
* 自定义UDAF(强类型应用于DataSet的DSL)
*/
object SparkSQL06_UDAF {
def main(args: Array[String]): Unit = {
//创建配置文件对象
val conf: SparkConf = new SparkConf().setAppName("SparkSQL05_UDAF").setMaster("local[*]")
//创建SparkSession
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//读取Json文件, 创建df
val df: DataFrame = spark.read.json("D:\\mywork\\IDEAproject\\spark-11\\src\\main\\input\\test.json")
//创建自定义函数对象
val myAvgNew = new MyAvgNew
//自定义UDTF强类型,无法应用与SQL风格的查询
//将df转为ds
val ds: Dataset[User01] = df.as[User01]
//将聚合函数对象转换为查询的列
val column: TypedColumn[User01, Double] = myAvgNew.toColumn
ds.select(column).show()
//释放资源
spark.stop()
}
}
//输入数据类型
case class User01(name: String, age: Long)
//缓存类型
case class AgeBuffer(var sum: Long, var count: Long)
/**
* 定义类继承org.apache.spark.sql.expressions.Aggregator
* 重写类中的方法
* 自定义UDAF函数 (强类型)
* IN 输入数据类型
* BUF 缓存数据类型
* OUT 输出结果数据类型
*
*/
class MyAvgNew extends Aggregator[User01, AgeBuffer, Double] {
//对缓存数据进行初始化
override def zero: AgeBuffer = {
AgeBuffer(0L, 0L)
}
//对当前分区内数据进行聚合
override def reduce(b: AgeBuffer, a: User01): AgeBuffer = {
b.sum += a.age
b.count +=1
b
}
//分区间合并
override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
//返回计算结果
override def finish(reduction: AgeBuffer): Double = {
reduction.sum.toDouble/reduction.count
}
//DataSet的编码及解码器,用于序列化,写法固定
//用户自定义类型 product
override def bufferEncoder: Encoder[AgeBuffer] = {
Encoders.product
}
//系统值类型 scala根据具体值类型选择
override def outputEncoder: Encoder[Double] = {
Encoders.scalaDouble
}
}