0
点赞
收藏
分享

微信扫一扫

SparkSQL 创建 DataFrame 的方式

胡桑_b06e 2022-07-04 阅读 91

1.读取 json 格式的文件创建 DataFrame

注意:

  • 可以两种方式读取 json 格式的文件。
  • df.show()默认显示前 20 行数据。
  • DataFrame 原生 API 可以操作 DataFrame。
  • 注册成临时表时,表中的列默认按 ascii 顺序显示列。
    df.createTempView(“mytable”)
    df.createOrReplaceTempView(“mytable”)
    df.createGlobalTempView(“mytable”)
    df.createOrReplaceGlobalTempView(“mytable”)
    Session.sql(“select * from global_temp.mytable ”).show()
  • DataFrame 是一个一个 Row 类型的 RDD,

json数据源

{"name":"科比","age":24}
{"name":"詹姆斯","age":23}
{"name":"杜兰特","age":25}
{"name":"保罗","age":26}
{"name":"库里","age":27}
{"name":"加索尔","age":28}
{"name":"朗多","age":29}
{"name":"皮尔斯"}
{"name":"雷阿伦"}
{"name":"奥多姆"}
{"name":"拜纳姆","age":24}
{"name":"科比","age":24}
{"name":"詹姆斯","age":23}
{"name":"杜兰特","age":25}
{"name":"保罗","age":26}
{"name":"库里","age":27}
{"name":"加索尔","age":28}
{"name":"朗多","age":29}
{"name":"皮尔斯"}
{"name":"雷阿伦"}
{"name":"奥多姆"}
{"name":"拜纳姆","age":24}
package sparkSql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

/**
* @Author yqq
* @Date 2021/12/13 17:00
* @Version 1.0
*/
object ReadJsonDataToDF2{
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().master("local").appName("name").getOrCreate()
session.sparkContext.setLogLevel("Error")
// val frame = session.read.json("data/jsondata")
val frame = session.read.format("json").load("data/jsondata")
frame.createTempView("t")
val rdd: RDD[Row] = session.sql("select name,age from t where age is not null").rdd
rdd.foreach(row=>{
val name = row.getAs[String]("name")
val age = row.getAs[Long]("age")
println(s"name:$name,age$age")
})
// rdd.foreach(println)
// frame.show()
}
}

SparkSQL 创建 DataFrame 的方式_spark

2.通过 json 格式的 RDD 创建 DataFrame

package sparkSql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
* @Author yqq
* @Date 2021/12/13 17:17
* @Version 1.0
*/
object ReadJsonRDDToDF {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().master("local").appName("name").getOrCreate()
session.sparkContext.setLogLevel("Error")
val jsonArr = Array[String](
"{\"name\":\"科比\",\"age\":24}",
"{\"name\":\"詹姆斯\",\"age\":23}",
"{\"name\":\"杜兰特\",\"age\":35}",
"{\"name\":\"保罗\",\"age\":3}"
)
import session.implicits._
val jsonDataSet: Dataset[String] = jsonArr.toList.toDS()
val frame = session.read.json(jsonDataSet)
frame.createTempView("t")
session.sql("select name,age from t").show()
/**
* Spark2.0之前处理方式
*/
// val context = session.sparkContext
// val jsonRDD: RDD[String] = context.parallelize(jsonArr)
// val frame: DataFrame = session.read.json(jsonRDD)
// frame.show()
}
}

SparkSQL 创建 DataFrame 的方式_大数据_02

3.非 json 格式的 RDD 创建 DataFrame

  1. 通过反射的方式将非 json 格式的 RDD 转换成 DataFrame(不建议使 用)
    数据源:
1,科比,24,99
2,詹姆斯,6,100
3,杜兰特,35,100
4,哈登,13,80
5,乔丹,23,90
package sparkSql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* @Author yqq
* @Date 2021/12/13 19:06
* @Version 1.0
* 通过反射方式将普通格式的RDD转换成DataFrame
*/
case class PersonInfo(id:Int,name:String,num:Int,score:Int)
object ReadRDDToDF {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("test").master("local").getOrCreate()
val context: SparkContext = session.sparkContext
context.setLogLevel("Error")
val person: RDD[String] = context.textFile("data/person")
val personRDD: RDD[PersonInfo] = person.map(line => {
val arr: Array[String] = line.split(",")
val id: Int = arr(0).toInt
val name: String = arr(1)
val num: Int = arr(2).toInt
val score: Int = arr(3).toInt
PersonInfo(id, name, num, score)
})
import session.implicits._
val frame: DataFrame = personRDD.toDF()
frame.createTempView("t")
val frame1: DataFrame = session.sql("select id,name,num,score from t")
frame1.show()
frame1.printSchema()
}
}

SparkSQL 创建 DataFrame 的方式_spark_03
2. 动态创建 Schema 将非 json 格式的 RDD 转换成 DataFrame

package sparkSql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
* @Author yqq
* @Date 2021/12/13 19:27
* @Version 1.0
*/
object ReadRDDtoDF2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().master("local").appName("test1").getOrCreate()
val context: SparkContext = session.sparkContext
context.setLogLevel("Error")
val personRDD: RDD[String] = context.textFile("data/person")
val rowRDD: RDD[Row] = personRDD.map(line => {
val arr: Array[String] = line.split(",")
val id: Int = arr(0).toInt
val name: String = arr(1)
val num: Int = arr(2).toInt
val score: Double = arr(3).toDouble
Row(id, name, num, score)
})
val struct = StructType(List[StructField](
StructField("id",DataTypes.IntegerType,true),
StructField("name",DataTypes.StringType,true),
StructField("num",DataTypes.IntegerType,true),
StructField("score",DataTypes.DoubleType,true)
))
val frame: DataFrame = session.createDataFrame(rowRDD, struct)
frame.show()
frame.printSchema()
frame.createTempView("t")
val frame1: DataFrame = session.sql(
"""
|select id,name,num,score from t
|""".stripMargin)
frame1.show()
}
}

SparkSQL 创建 DataFrame 的方式_big data_04

4.读取 parquet 文件创建 DataFrame

package sparkSql

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
* @Author yqq
* @Date 2021/12/13 19:55
* @Version 1.0
*/
object ReadParquetFileToDF {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("ttt").master("local").getOrCreate()
val context: SparkContext = session.sparkContext
context.setLogLevel("Error")
val frame: DataFrame = session.read.json("data/jsondata")
/**
* Append:追加数据
* ErrorIfExists:存在就报错
* Overwrite:覆盖写数据
* Ignore:忽略
*/
frame.write.mode(SaveMode.Overwrite).parquet("data/parquet")
val frame1: DataFrame = session.read.parquet("data/parquet")
frame1.show(22)
println(frame1.count())
// frame1.write.json("data/json")
}
}

SparkSQL 创建 DataFrame 的方式_json_05

4.读取 cvs 格式的数据加载 DataFrame

csv数据源

id,name,age,score
1,科比,40,100
2,詹姆斯,37,100
3,乔丹,55,100
4,杜兰特,33,99
5,库里,34,99
package sparkSql

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* @Author yqq
* @Date 2021/12/13 20:35
* @Version 1.0
*/
object ReadCSVDataToDF {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().master("local").appName("eee").getOrCreate()
session.sparkContext.setLogLevel("Error")
val frame: DataFrame = session.read.option("header",true).csv("data/data.csv")
frame.show()
}
}

SparkSQL 创建 DataFrame 的方式_spark_06

5.读取Tuple类型的Dataset加载DataFrame

数据源比较大,不粘贴

package sparkSql

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
* @Author yqq
* @Date 2021/12/13 20:52
* @Version 1.0
*/
object ReadTupleDatasetToDF {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("rrr").master("local").getOrCreate()
session.sparkContext.setLogLevel("Error")
val dt: Dataset[String] = session.read.textFile("data/pvuvdata")
import session.implicits._
val value: Dataset[(String, String, String, String, String, String, String)] = dt.map(line => {
val arr: Array[String] = line.split("\t")
val ip = arr(0)
val local = arr(1)
val date = arr(2)
val ts = arr(3)
val uid = arr(4)
val site = arr(5)
val operator = arr(6)
(ip, local, date, ts, uid, site, operator)
})
val frame: DataFrame = value.toDF("ip","local","date","ts","uid","site","operator")
frame.createTempView("t")
session.sql(
"""
|select site,count(*) as site_count from t group by site order by site_count
|""".stripMargin).show()
session.sql(
"""
|select site,count(*) uv from (select distinct ip,site from t) t1 group by site order by uv
|""".stripMargin).show()
}
}

SparkSQL 创建 DataFrame 的方式_big data_07


举报

相关推荐

0 条评论