0
点赞
收藏
分享

微信扫一扫

SparkSQL

玉字璧 2022-04-26 阅读 66
spark

一、Spark SQL

1.概述

        Spark SQL是Spark用于结构化数据(structured data)处理的Spark模块。

        与基本的Spark RDD API不同,Spark SQL的抽象数据类型为Spark提供了关于数据结构和正在执行的计算的更多信息。

        在内部,Spark SQL使用这些额外的信息去做一些额外的优化,有多种方式与Spark SQL进行交互,比如: SQL和DatasetAPI。

        当计算结果的时候,使用的是相同的执行引擎,不依赖你正在使用哪种API或者语言。这种统一也就意味着开发者可以很容易在不同的API之间进行切换,这些API提供了最自然的方式来表达给定的转换。

        Hive是将 HiveSQL转换成 MapReduce然后提交到集群上执行,大大简化了编写 MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

Spark SQL它提供了2个编程抽象,类似Spark Core中的RDD

  • DataFrame
  • DataSet

2.Spark SQL特点

1.易整合

无缝的整合了 SQL 查询和 Spark 编程

2.统一的数据访问方式

使用相同的方式连接不同的数据源

3.兼容Hive

在已有的仓库上直接运行 SQL 或者 HiveQL

4.标准的数据连接

通过 JDBC 或者 ODBC 来连接

3.DataFrame

        在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

        同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从 API 易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API 要更加友好,门槛更低。

        左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。

DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待

DataFrame也是懒执行的,性能上比RDD,主要原因:优化的执行计划,即查询计划通过Spark catalyst optimiser进行优化。比如下面一个例子:

        为了说明查询优化,我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。

如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。 

 

4.DataSet

        DataSet是分布式数据集合。DataSet是Spark 1.6中添加的一个新抽象,是DataFrame的一个扩展。它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。DataSet也可以使用功能性的转换(操作map,flatMap,filter等等)。

  • 是DataFrame API的一个扩展,是SparkSQL最新的数据抽象
  • 用户友好的API风格,既具有类型安全检查也具有DataFrame的查询优化特性;
  • 用样例类来定义DataSet中数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称;
  • DataSet是强类型的。比如可以有DataSet[Car],DataSet[Person]。
  • DataFrame是DataSet的特列,DataFrame=DataSet[Row] ,所以可以通过as方法将DataFrame转换为DataSet。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息都用Row来表示。

二、Spark SQL编程

1.SparkSession

        在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。

        SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。当我们使用 spark-shell 的时候, spark 会自动的创建一个叫做spark的SparkSession, 就像我们以前可以自动获取到一个sc来表示SparkContext

 

2.DataFrame

         Spark SQL的DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成SQL表达式。DataFrame API 既有transformation操作也有action操作,DataFrame的转换从本质上来说更具有关系, 而 DataSet API 提供了更加函数式的 API

1.建DataFrame 

在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。

1)从Spark数据源进行创建

查看Spark支持创建文件的数据源格式

scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

读取json文件创建DataFrame

scala> val df = spark.read.json("/opt/module/spark-local/test.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

df.show

 

2)从RDD进行转换

xxxxxxxxxxxxxxxxxxx

3)Hive Table进行查询返回

xxxxxxxxxxxxxxxxxxx

2.SQL风格语法

        SQL语法风格是指我们查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助

createGlobalTempView

创建全局视图   

createTempView

创建临时视图

createOrReplaceTempView                                        

创建或替换临时视图

1)创建一个DataFrame

scala> var df = spark.read.json("/opt/module/spark-local/test.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)对DataFrame创建一个临时表

scala> df.createTempView("test")

 3)通过SQL语句实现查询全表

scala> var sqlDF = spark.sql("select * from test")

4)结果show

普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.test

5)对于DataFrame创建一个全局表

scala> df.createGlobalTempView("test")

6)通过SQL语句实现查询全表

scala> spark.sql("SELECT * FROM global_temp.test").show()

scala> spark.newSession().sql("SELECT * FROM global_temp.test").show()

3.DSL风格语法

        DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据,可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了

1)创建一个DataFrame

scala> val df = spark.read.json("/opt/module/spark-local /test.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)查看DataFrame的Schema信息

scala> df.printSchema

3)只查看”name”列数据

scala> df.select("name").show()

4)查看所有列

scala> df.select("*").show

5)查看”name”列数据以及”age+1”数据

涉及到运算的时候, 每列都必须使用$

scala> df.select($"name",$"age" + 1).show

6)查看”age”大于”19”的数据

scala> df.filter($"age">19).show

7)按照”age”分组,查看数据条数

scala> df.groupBy("age").count.show

4.RDD转换为DataFrame

        注意:如果需要RDDDF或者DS之间操作,那么都需要引入 import spark.implicits._  spark不是包名,而是sparkSession对象的名称,所以必须先创建SparkSession对象再导入. implicits是一个内部object

前置条件

  • 导入隐式转换并创建一个RDD
  • 在/opt/module/spark-local/目录下准备test.txt
zhangsan,18
lisi,19
wangwu,20
scala> import spark.implicits._
scala> var testRDD = sc textFile("/opt/module/spark-local/test.txt")
输出
testRDD: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/test.txt MapPartitionsRDD[30] at textFile at <console>:27

1)通过手动确定转换

testRDD.map{x=> val fields=x.split(",");(fields(0),fields(1).trim.toInt)}.toDF("name","age").show

 

 2)通过样例类反射转换(常用)

(1)创建一个样例类

scala> case class Test(name:String,age:Int)

(2)根据样例类将RDD转换为DataFrame

testRDD.map{x=> var fields=x.split(",");Test(fields(0),fields(1).toInt)}.toDF.show

5.DataFrame转换为RDD

直接调用rdd

1)创建一个DataFrame

scala> var df = spark.read.json("/opt/module/spark-local/test.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)将DataFrame转换为RDD注意:得到的RDD存储类型为Row

scala> var dfToRDD = df.rdd
dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[43] at rdd at <console>:28

3)打印RDD

scala> dfToRDD.collect
res19: Array[org.apache.spark.sql.Row] = Array([18,zhangsan], [19,lisi], [20,wangwu])

3.DataSet

DataSet是具有强类型的数据集合,需要提供对应的类型信息。

1.创建DataSet

1)使用样例类序列创建DataSet

scala> case class Test(name:String,age:Int)

Seq(Test("wangwu",20)).toDS()

scala> Seq(Test("wangwu",20)).toDS().show

 

2)使用基本类型的序列创建DataSet

scala> val ds = Seq(1,2,3,4,5,6).toDS

2.RDD转换为DataSet

        SparkSQL能够自动将包含有样例类的RDD转换成DataSet,样例类定义了table的结构,样例类属性通过反射变成了表的列名。样例类可以包含诸如Seq或者Array等复杂的结构。

1)创建一个RDD

val testRDD = sc.textFile("/opt/module/spark-local/test.txt")

2)创建一个样例类

case class Test(name:String,age:Int)

3)将RDD转化为DataSet

testRDD.map(line => {val fields = line.split(",");Test(fields(0),fields(1). toInt)}).toDS

3.DataSet转换为RDD 

调用rdd方法

1)创建一个DataSet

scala> val DS = Seq(Test("yangbailao", 32)).toDS()


2)将DataSet转换为RDD

scala> DS.rdd

4.DataFrame与DataSet的互操作

1.DataFrame转为DataSet

1)创建一个DateFrame

scala> val df = spark.read.json("/opt/module/spark-local/test.json")


2)创建一个样例类

scala> case class Test(name: String,age: Long)


3)将DataFrame转化为DataSet   

scala> df.as[Test]


这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。

2.Dataset转为DataFrame


1)创建一个样例类

case class Test(name: String,age: Long)



2)创建DataSet
 

scala> val ds = Seq(Test("lvdongbin",32)).toDS()


 

 3)将DataSet转化为DataFrame

ds.toDF


5.RDD、DataFrame和DataSet之间的关系

        在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:

RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口

1.三者的共性

1)RDD、DataFrame、DataSet全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;

2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;

3)三者有许多共同的函数,如filter,排序等;

4)在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入)

5)三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出

6)三者都有partition的概念

DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

2.三者的区别

1)RDD                              
RDD一般和Spark MLib同时使用
RDD不支SparkSQL操作

2)DataFrame

- 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
- DataFrame与DataSet一般不与 Spark MLib 同时使用
- DataFrame与DataSet均支持 SparkSQL 的操作,比如select,groupby之类,还能注册临时表/视窗,进行 sql 语句操作
- DataFrame与DataSet支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然

3)DataSet

        Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。 DataFrame其实就是DataSet的一个特例  
type DataFrame = Dataset[Row]
        DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

3.三者的互相转化

 6.IDEA创建SparkSQL程序

1)添加依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.1.1</version>

</dependency>

2)代码

package com.spark.day08

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * RDD与DataFrame与DataSet关系转换
 */

object SparkSQL01_Test {
  def main(args: Array[String]): Unit = {

    //创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Test")

    //创建SparkSQL执行的入口点对象 SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    //读取json文件 创建DataFrame
    //val df: DataFrame = spark.read.json("D:\\mywork\\IDEAproject\\spark-11\\src\\main\\input\\test.json")

    //spark不是包名,不是类名,是我们创建的入口对象SparkSession的对象名称
    //不引入无法调用toDF toDs方法
    import spark.implicits._

    //查看df的数据显式20行
    //df.show()

    //SQL语法风格 创建临时表
    //df.createOrReplaceTempView("user")
    //spark.sql("select * from user").show()

    //DSL风格
    //df.select("name","age").show()

    //RDD--->DataFrame--->DataSet
    //创建RDD
    val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "lvdongbin", 80), (2, "hanzhongli", 70), (3, "caoguojiu", 65)))
    //RDD--->DataFrame

    val df: DataFrame = rdd.toDF("id","name","age")

    //DataFrame--->DataSet
    val ds: Dataset[user] = df.as[user]
    ds.show()

    //DataSet--->DataFrame--->RDD

    val df1: DataFrame = ds.toDF()

    val rdd1: RDD[Row] = df1.rdd

    df1.show()

    rdd1.collect().foreach(println)

    //释放资源
    spark.stop()
  }
}
case class user(id:Int,name:String,age:Int)

7.用户自定义函数

1.UDF

输入一行,返回一个结果。在Shell窗口中可以通过spark.udf功能用户可以自定义函数。

package com.spark.day08

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 自定义UDF函数,在每个查询到的名字前,添加问候语
 */
object SparkSQL2_UDF {
  def main(args: Array[String]): Unit = {
    //创建SparkConf配置文件
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL2_UDF")
    //创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    //创建DF
    val df: DataFrame = spark.read.json("D:\\mywork\\IDEAproject\\spark-11\\src\\main\\input\\test.json")

    //注册自定义函数
    spark.udf.register("addSayHi",(name:String)=>{"Hello " + name})
    //创建一个临时视图
    df.createOrReplaceTempView("user")

    //通过SQL语句,从临时视图查询数据
    spark.sql("select addSayHi(name),age from user").show()


    //释放资源
    spark.stop()
  }

}

2.UDAF

        输入多行,返回一行。强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。

需求:实现求平均年龄

1)RDD算子方式实现

/**
 *
 * 需求:求平均年龄  --rdd算子实现
 */
object SparkSQL3_RDD {
  def main(args: Array[String]): Unit = {
    //创建Spark配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
    //创建SparkContext对象,该对象时提交Spark App的入口
    val sc = new SparkContext(conf)

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangw", 40)))
    //结构转换
    val mapRdd: RDD[(Int, Int)] = rdd.map {
      case (name, age) => {
        (age, 1)
      }
    }
    //对年龄总人数进行聚合操作 (年龄,个数)
    val res: (Int, Int) = mapRdd.reduce {
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    }
    println(res._1 / res._2)

    //释放资源
    sc.stop()
  }

}

2)自定义累加器方式实现

package com.spark.day08

import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

/**
 *
 * 需求:求平均年龄  --累加器实现
 */
object SparkSQL4_UDAF {
  def main(args: Array[String]): Unit = {
    //创建Spark配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
    //创建SparkContext对象,该对象时提交Spark App的入口
    val sc = new SparkContext(conf)

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangw", 40)))

    //创建累加器
    val myAc = new MyAccumulator

    //注册累加器
    sc.register(myAc)

    //使用累加器
    rdd.foreach{
      case (name,age)=>{
        myAc.add(age)
      }
    }

    //获取累加器的值
    println(myAc.value)


    //释放资源
    sc.stop()
  }

}

class MyAccumulator extends AccumulatorV2[Int, Double] {
  //定义一个变量,汇总年龄次数
  var ageSum: Int = 0
  var countSum: Int = 0

  //定义一个变量,汇总年龄总数
  override def isZero: Boolean = {
    ageSum == 0 && countSum == 0
  }

  override def copy(): AccumulatorV2[Int, Double] = {
    val newMyAccumulator = new MyAccumulator
    newMyAccumulator.ageSum = this.ageSum
    newMyAccumulator.countSum = this.countSum
    newMyAccumulator
  }

  override def reset(): Unit = {
    ageSum = 0
    countSum = 0
  }

  override def add(v: Int): Unit = {
    ageSum += v
    countSum += 1
  }

  override def merge(other: AccumulatorV2[Int, Double]): Unit = {
    other match {
      case mc: MyAccumulator => {
        this.ageSum += mc.ageSum
        this.countSum += mc.countSum
      }
      case _ =>
    }
  }

  override def value: Double = {
    ageSum / countSum
  }
}

3)自定义聚合函数实现-弱类型(应用于SparkSQL更方便)

package com.spark.day08

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
 * 自定义UDAF(弱类型主要应用于SQL风格的DF查询)
 */
object SparkSQL05_UDAF {
  def main(args: Array[String]): Unit = {
    //创建配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("SparkSQL05_UDAF").setMaster("local[*]")

    //创建SparkSession
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    //读取Json文件, 创建df
    val df: DataFrame = spark.read.json("D:\\mywork\\IDEAproject\\spark-11\\src\\main\\input\\test.json")

    //创建自定义函数对象
    val myAvg = new MyAvg
    //注册自定义函数
    spark.udf.register("myAvg",myAvg)
    //创建临时视图
    df.createOrReplaceTempView("user")

    //使用聚合函数进行查询
    spark.sql("select MyAvg(age) from user").show()

    //

    //释放资源
    spark.stop()
  }
}


/*
(弱类型)定义类继承UserDefinedAggregateFunction,并重写其中方法
*/
class MyAvg extends UserDefinedAggregateFunction{

  //聚合函数的输入的数据类型
  override def inputSchema: StructType = {
    val structType: StructType = StructType(Array(StructField("age", IntegerType)))
    structType
  }

  // 聚合函数缓冲区中值的数据类型(age,count)
  override def bufferSchema: StructType = {
    StructType(Array(StructField("sum",LongType),StructField("count",LongType)))
  }

  // 函数返回值的数据类型
  override def dataType: DataType = DoubleType

  // //稳定性  默认不处理,直接返回true    相同输入是否会得到相同的输出
  override def deterministic: Boolean = true

  // 函数缓冲区初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    // 存年龄的总和
    buffer(0) = 0L
    // 存年龄的个数
    buffer(1) = 0L

  }

  // 更新缓冲区中的数据
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!buffer.isNullAt(0)){
      buffer(0) = buffer.getLong(0) + input.getInt(0)
      buffer(1) = buffer.getLong(1) + 1L
    }

  }
  // 分区间数据合并
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  // 计算最终结果
  override def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble/buffer.getLong(1)
}

4)自定义聚合函数实现-强类型(应用于DataSet的DSL更方便)

package com.spark.day08


import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn}

/**
 * 自定义UDAF(强类型应用于DataSet的DSL)
 */
object SparkSQL06_UDAF {
  def main(args: Array[String]): Unit = {
    //创建配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("SparkSQL05_UDAF").setMaster("local[*]")

    //创建SparkSession
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    import spark.implicits._

    //读取Json文件, 创建df
    val df: DataFrame = spark.read.json("D:\\mywork\\IDEAproject\\spark-11\\src\\main\\input\\test.json")

    //创建自定义函数对象
    val myAvgNew = new MyAvgNew

    //自定义UDTF强类型,无法应用与SQL风格的查询

    //将df转为ds
    val ds: Dataset[User01] = df.as[User01]

    //将聚合函数对象转换为查询的列
    val column: TypedColumn[User01, Double] = myAvgNew.toColumn

    ds.select(column).show()

    //释放资源
    spark.stop()
  }
}

//输入数据类型
case class User01(name: String, age: Long)

//缓存类型
case class AgeBuffer(var sum: Long, var count: Long)

/**
 * 定义类继承org.apache.spark.sql.expressions.Aggregator
 * 重写类中的方法
 * 自定义UDAF函数 (强类型)
 * IN 输入数据类型
 * BUF 缓存数据类型
 * OUT 输出结果数据类型
 *
 */
class MyAvgNew extends Aggregator[User01, AgeBuffer, Double] {
  //对缓存数据进行初始化
  override def zero: AgeBuffer = {
    AgeBuffer(0L, 0L)
  }

  //对当前分区内数据进行聚合
  override def reduce(b: AgeBuffer, a: User01): AgeBuffer = {
    b.sum += a.age
    b.count +=1
    b
  }

  //分区间合并
  override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }

  //返回计算结果
  override def finish(reduction: AgeBuffer): Double = {
    reduction.sum.toDouble/reduction.count
  }

  //DataSet的编码及解码器,用于序列化,写法固定
  //用户自定义类型 product
  override def bufferEncoder: Encoder[AgeBuffer] = {
    Encoders.product
  }

  //系统值类型 scala根据具体值类型选择
  override def outputEncoder: Encoder[Double] = {
    Encoders.scalaDouble
  }
}
举报

相关推荐

0 条评论