Info
先生成DataFrame,再把数据储存在HDFS上。
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
Intitializing Scala interpreter ...
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1577952043881)
SparkSession available as 'spark'
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
val builder = SparkSession
.builder()
.appName("learningScala")
.config("spark.executor.heartbeatInterval","60s")
.config("spark.network.timeout","120s")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryoserializer.buffer.max","512m")
.config("spark.dynamicAllocation.enabled", false)
.config("spark.sql.inMemoryColumnarStorage.compressed", true)
.config("spark.sql.inMemoryColumnarStorage.batchSize", 10000)
.config("spark.sql.broadcastTimeout", 600)
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.crossJoin.enabled", true)
.master("local[*]")
val spark = builder.appName("OperateHdfs").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
builder: org.apache.spark.sql.SparkSession.Builder = org.apache.spark.sql.SparkSession$Builder@679eb75a
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@551aa019
val df1 = Seq(
(1, "male", "18" ,"2019-01-01 11:45:50"),
(2, "female", "37" ,"2019-01-02 11:55:50"),
(3, "male", "21" ,"2019-01-21 11:45:50"),
(4, "female", "44" ,"2019-02-01 12:45:50"),
(5, "male", "39" ,"2019-01-15 10:40:50")
).toDF("id","sex","age", "createtime_str")
df1: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]
val df=df1.withColumn("ds",date_format($"createtime_str","yyyyMMdd"))
df: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 3 more fields]
df.show()
+---+------+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+------+---+-------------------+--------+
| 1| male| 18|2019-01-01 11:45:50|20190101|
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 3| male| 21|2019-01-21 11:45:50|20190121|
| 4|female| 44|2019-02-01 12:45:50|20190201|
| 5| male| 39|2019-01-15 10:40:50|20190115|
+---+------+---+-------------------+--------+
查看hdfs文件
是否存在对应目录
import相关方法
import org.apache.hadoop.fs.{FileSystem, Path,FileStatus,FileUtil}
import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil}
获取配置信息
var path="../Data"
val hadoopConf = spark.sparkContext.hadoopConfiguration
path: String = ../Data
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, __spark_hadoop_conf__.xml
val hdfs = FileSystem.get(hadoopConf)
hdfs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@25866b28
设置路径
val outputPath = new Path("../Data")
outputPath: org.apache.hadoop.fs.Path = ../Data
hdfs上是否存在这个路径
hdfs.exists(outputPath)
res2: Boolean = true
hdfs上是否存在这个文件
hdfs.exists(new Path("../Data/test.txt"))
res3: Boolean = true
判断该path是否为文件夹?
hdfs.getFileStatus(outputPath).isDirectory()
res4: Boolean = true
判断该path是否为文件?
hdfs.getFileStatus(outputPath).isFile()
res5: Boolean = false
hdfs.getFileStatus(new Path("../Data/test.txt")).isFile()
res6: Boolean = true
获取路径下所有文件
val allFiles = FileUtil.stat2Paths(hdfs.listStatus(outputPath))
allFiles: Array[org.apache.hadoop.fs.Path] = Array(file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest, file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv, file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt)
打印一级目录名和文件名
allFiles.foreach(println)
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
打印一级目录名
逻辑:
- 获取路径下所有文件
- 循环遍历,判断每一个文件是否为Directory
- 是则打印出来
allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录
.foreach(println)//打印对应一级目录名
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
打印对应路径下文件
allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录
.foreach(println)//打印对应一级目录名
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
封装成Object
object HdfsCheckPath {
import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil}
import org.apache.spark.sql.{SparkSession}
/**
*
* @param spark SparkSession
* @param path 字符串格式,指定的路径
*/
def isPathExist(spark: SparkSession, path: String) = {
// 获取hdfs配置信息
val hadoopConf = spark.sparkContext.hadoopConfiguration
val hdfs = FileSystem.get(hadoopConf)
// 设置输入的路径
val outputPath = new Path(path)
if (hdfs.exists(outputPath)) {
println(s"This path(${path}) Already exist!")
} else {
println(s"This path(${path}) don't exist!")
}
hdfs.exists(outputPath)
}
/**
*
* @param spark SparkSession
* @param path 对应路径
* @param printlevel 打印的级别,枚举:directory、file、total
*/
def printPathDetail(spark: SparkSession, path: String, printlevel: String="total"): Unit = {
val hadoopConf = spark.sparkContext.hadoopConfiguration
val hdfs = FileSystem.get(hadoopConf)
val isExists = hdfs.exists(new Path(path))
// 路径不存在无需继续,上一步中直接print出信息
if (isExists) {
println("This path Already exist!")
// 如果路径存在,打印出对应的一级目录和文件名
val allFiles = FileUtil.stat2Paths(hdfs.listStatus(new Path(path)))
if (printlevel == "directory") {
println("-----Directory:")
allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录
.foreach(println) //打印对应一级目录名
} else if (printlevel == "file") {
println("-----File:")
allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录
.foreach(println)//打印对应一级目录名
} else if (printlevel == "total") {
println("-----Total:")
allFiles.foreach(println)
}
}else{
println("This path don't exist!")
}
}
}
defined object HdfsCheckPath
HdfsCheckPath.isPathExist(spark,"../Data")
This path(../Data) Already exist!
res10: Boolean = true
HdfsCheckPath.printPathDetail(spark,"../Data","total")
This path Already exist!
-----Total:
HdfsCheckPath.printPathDetail(spark,"../Data","directory")
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
HdfsCheckPath.printPathDetail(spark,"../Data","file")
This path Already exist!
-----File:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
储存文件
储存暂时只介绍parquet格式
非分区储存
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
mode参数
Specifies the behavior when data or table already exists. Options include:
- overwrite: overwrite the existing data//覆盖写入
- append: append the data//追加写入
- ignore: ignore the operation (i.e. no-op)//忽略写入操作?那搞这个干啥?
- error or errorifexists: default option, throw an exception at runtime//如果写入目录存在则报错
一般选择errorifexists,防止错误地把数据插入在已经存在的目录。参数saveMode的值可以是String
或者SaveMode
(需要import spark.sql.SaveMode)
df1.write.mode(saveMode="errorifexists").parquet(path="../Data/hdfsSaveNoPartition/")
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3zNCZ30J-1577952595406)(…/Picture/Pic01.jpg)]
df1.write.mode(saveMode=SaveMode.Overwrite).parquet("../Data/hdfsSaveNoPartition/")
分区储存
val partitionColNames = Array("ds")
df.write.partitionBy(partitionColNames:_*).mode(saveMode="errorifexists").parquet("../Data/hdfsSavePartition/")
partitionColNames: Array[String] = Array(ds)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yu50KUcn-1577952595407)(…/Picture/Pic02.jpg)]
同样的,如果对应分区已经存在,也会报错。
读取
非分区读取
val df1New = spark.read.parquet("../Data/hdfsSaveNoPartition")
df1New: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]
df1New.show()
+---+------+---+-------------------+
| id| sex|age| createtime_str|
+---+------+---+-------------------+
| 2|female| 37|2019-01-02 11:55:50|
| 4|female| 44|2019-02-01 12:45:50|
| 1| male| 18|2019-01-01 11:45:50|
| 3| male| 21|2019-01-21 11:45:50|
| 5| male| 39|2019-01-15 10:40:50|
+---+------+---+-------------------+
分区读取
指定某一个固定的分区
spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet("../Data/hdfsSavePartition/ds=20190101").show()
+---+----+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+----+---+-------------------+--------+
| 1|male| 18|2019-01-01 11:45:50|20190101|
+---+----+---+-------------------+--------+
指定多个分区
spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet("../Data/hdfsSavePartition/ds=20190101","../Data/hdfsSavePartition/ds=20190102").show()
+---+------+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+------+---+-------------------+--------+
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 1| male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+
指定多分区的简略写法
:*作为一个整体,告诉编译器你希望将某个参数当作参数序列处理!例如val s = sum(1 to 5:*)就是将1 to 5当作参数序列处理
val partitionPathArray = Array(20190101,20190102).map(x=>"../Data/hdfsSavePartition/" + s"ds=${x}")
partitionPathArray: Array[String] = Array(../Data/hdfsSavePartition/ds=20190101, ../Data/hdfsSavePartition/ds=20190102)
partitionPathArray.mkString("\n")
res20: String =
../Data/hdfsSavePartition/ds=20190101
../Data/hdfsSavePartition/ds=20190102
spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet(partitionPathArray:_*).show()
+---+------+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+------+---+-------------------+--------+
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 1| male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+
如果指定分区中部分缺失
val partitionPathArray1 = Array(20190101,20190102,20191231).map(x=>"../Data/hdfsSavePartition/" + s"ds=${x}")
partitionPathArray1: Array[String] = Array(../Data/hdfsSavePartition/ds=20190101, ../Data/hdfsSavePartition/ds=20190102, ../Data/hdfsSavePartition/ds=20191231)
partitionPathArray1.mkString("\n")
res22: String =
../Data/hdfsSavePartition/ds=20190101
../Data/hdfsSavePartition/ds=20190102
../Data/hdfsSavePartition/ds=20191231
partitionPathArray1.map(x=>HdfsCheckPath.isPathExist(spark,x))
This path(../Data/hdfsSavePartition/ds=20190101) Already exist!
This path(../Data/hdfsSavePartition/ds=20190102) Already exist!
This path(../Data/hdfsSavePartition/ds=20191231) don't exist!
res23: Array[Boolean] = Array(true, true, false)
spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet(partitionPathArray1.filter(x=>HdfsCheckPath.isPathExist(spark,x)):_*).show()
This path(../Data/hdfsSavePartition/ds=20190101) Already exist!
This path(../Data/hdfsSavePartition/ds=20190102) Already exist!
This path(../Data/hdfsSavePartition/ds=20191231) don't exist!
+---+------+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+------+---+-------------------+--------+
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 1| male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+
删除文件
删除与文件本身无关,是对整个目录的删除操作
删除非分区数据
第二个参数表示递归删除,即删除当前路径以及其子文件和子文件夹
路径不存在时,返回false
hdfs.delete(new Path("../Data/hdfsSaveNoPartition555/"),true)
res25: Boolean = false
路径存在返回true
hdfs.delete(new Path("../Data/hdfsSaveNoPartition/"),true)
res26: Boolean = true
删除分区数据
删除分区的逻辑比较简单,即把需删除分区的完整路径循环删除即可
查看分区数据
HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition","directory")
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
删除指定某个分区
hdfs.delete(new Path("../Data/hdfsSavePartition/ds=20190101"),true)
res28: Boolean = true
HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition","directory")
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
删除多个分区
val partitionPathArray2 = Array("20190102","20200101").map(x=>"../Data/hdfsSavePartition"+"/"+x)
partitionPathArray2: Array[String] = Array(../Data/hdfsSavePartition/20190102, ../Data/hdfsSavePartition/20200101)
partitionPathArray2.mkString("\n")
res30: String =
../Data/hdfsSavePartition/20190102
../Data/hdfsSavePartition/20200101
partitionPathArray.foreach(x=>hdfs.delete(new Path(x),true))
查看当前目录,上述两个分区已经被删除
HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition/","total")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
如上所示,如果有的分区不存在,程序并不报错,可以在代码里加上print语句,作为提示。
hdfs.delete(new Path("../Data/hdfsSavePartition"),true)
res33: Boolean = true
封装
object封装
目的:
- 封装常用的hdfs操作,方便调用
- object方式组织,方便函数的内部调用
- obejct scala2Hdfs
object scala2Hdfs {
import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession,Dataset}
/**
* 获取文件系统对象
*
* @param spark
* @return
*/
def getFileSystem(spark: SparkSession): FileSystem = {
val hadoopConf = spark.sparkContext.hadoopConfiguration
FileSystem.get(hadoopConf)
}
/**
*
* @param spark SparkSession
* @param path 字符串格式,指定的路径
*/
def isPathExist(spark: SparkSession, path: String) = {
// 获取hdfs配置信息
val hdfs = getFileSystem(spark)
// 设置输入的路径
val outputPath = new Path(path)
hdfs.exists(outputPath)
// if (hdfs.exists(outputPath)) {
// println("This path Already exist!")
// } else {
// println("This path don't exist!")
// }
}
/**
* 判断文件夹是否存在,以及是否为文件夹
*
* @param spark SparkSession
* @param path 字符串格式,指定的路径
* @return
*/
def isExistsDirectory(spark: SparkSession, path: String): Boolean = {
val hdfs = getFileSystem(spark)
val outputPath = new Path(path)
if(!(hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isDirectory)){
println(s"This path(${path}) don't exist!")
}
hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isDirectory
}
/**
*
* @param spark SparkSession
* @param path 字符串格式,指定的路径
* @return
*/
def isExistsFile(spark: SparkSession, path: String): Boolean = {
val hdfs = getFileSystem(spark)
val outputPath = new Path(path)
hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isFile
}
/**
*
* @param spark SparkSession
* @param path 对应路径
* @param printlevel 打印的级别,枚举:directory、file、total
*/
def printPathDetail(spark: SparkSession, path: String, printlevel: String = "total"): Unit = {
val hdfs = getFileSystem(spark)
val isExists = isPathExist(spark, path)
// 路径不存在无需继续,上一步中直接print出信息
if (isExists) {
println("This path Already exist!")
// 如果路径存在,打印出对应的一级目录和文件名
val allFiles = FileUtil.stat2Paths(hdfs.listStatus(new Path(path)))
if (printlevel == "directory") {
println("-----Directory:")
allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录
.foreach(println) //打印对应一级目录名
} else if (printlevel == "file") {
println("-----File:")
allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录
.foreach(println) //打印对应一级目录名
} else if (printlevel == "total") {
println("-----Total:")
allFiles.foreach(println)
}
} else {
println("This path don't exist!")
}
}
/**
*
* @param spark SparkSession
* @param path 对应路径
* path存在,检查是否有子文件夹,有break,没有delete
*/
def deleteSinglePath(spark: SparkSession, path: String): Unit = {
val hdfs = getFileSystem(spark)
val outputPath = new Path(path)
// 先判断是否有对应路径,没有直接写,有删除
if (hdfs.exists(outputPath)) {
// 判断对应路径下是否有子文件夹
val isDirectoryExists = hdfs.listStatus(outputPath).exists(_.isDirectory)
// 没有子文件夹,删除数据
if (!isDirectoryExists) {
// 递归删除
hdfs.delete(outputPath, true)
println("Clean this path: " + path)
} else {
// 如果有子文件,防止错删,跳过
println("Contains sub path, Skip clean")
}
} else {
println(s"This path(${path}) don't exist!")
}
}
/**
*
* @param spark SparkSession
* @param path 对应路径,要求最后不用带/
* @param partitionsArray 对应分区列表,格式和表中实际分区字段一致
* @param isPartition 是否分区数据
*/
def deletePartitionPath(spark: SparkSession, path: String, partitionsArray: Array[String], isPartition: Boolean = true): Unit = {
if (!isPartition) {
// 如果非分区,直接删除
deleteSinglePath(spark, path)
} else {
if (partitionsArray.length == 0) {
println("partitionsArray has no items")
} else {
// 拼接分区path
val partitionPathArray = partitionsArray.map(x => path + "/" + s"ds=${x}")
// 循环删除分区,如果路径不存在,报错
partitionPathArray.foreach(x => deleteSinglePath(spark, x))
}
}
}
/**
* 存储
*
* @param df 存储的df
* @param path 路径
* @param coalesceNum 合并后分区数,为了提高关联的效率,具体用法暂不清楚
* @param saveType 存储类型
* @param saveMode 模式
*/
def saveSinglePath(df: DataFrame, path: String, coalesceNum: Int = 0
, saveType: String = "parquet", saveMode: String = "errorifexists"
): Unit = {
var tempDf = df
if (coalesceNum >= 1) {
tempDf = df.coalesce(coalesceNum)
}
val write = tempDf.write.mode(saveMode)
saveType match {
case "csv" => write.option("header", "true").csv(path);println("Save this path: "+ path)
case "parquet" => write.parquet(path);println("Save this path: "+ path)
case _ => println(s"Not Support this savetype:${saveType}")
}
}
/**
*
* @param dataFrameWithDs 待分区字段的dataframe
* @param path 储存的path
* @param saveMode 保存方式,默认为append
* @param coalesceNum 合并后分区数,为了提高关联的效率,具体用法暂不清楚
* @param partitionColNames 分区列名,array形式
* 得到分区列数据,存储即可
*/
def savePartitionPath(dataFrameWithDs: DataFrame, path: String, saveType: String = "parquet"
, saveMode: String = "append", coalesceNum: Int = 0
, partitionColNames: Array[String] = Array("ds")): Unit = {
var tempDf = dataFrameWithDs
if (coalesceNum >= 1) {
tempDf = dataFrameWithDs.coalesce(coalesceNum)
}
val write = tempDf.write.partitionBy(partitionColNames: _*).mode(saveMode)
saveType match {
case "csv" => write.option("header", "true").csv(path);println("Save this path: "+ path)
case _ => write.parquet(path);println("Save this path: "+ path)
}
}
/**
* 清空并保存
*
* @param df 存储的df
* @param path 路径
* @param coalesceNum 合并后分区数
* @param saveType 存储类型
* @param saveMode 模式
*/
def cleanAndSaveSinglePath(spark: SparkSession, df: DataFrame, path: String, coalesceNum: Int = 0
, saveType: String = "parquet", saveMode: String = "errorifexists"): Unit = {
// 先删除对应路径
deleteSinglePath(spark, path)
// 再保存数据
saveSinglePath(df, path, coalesceNum, saveType, saveMode)
}
/**
*
* @param dataFrameWithDs 存储的df
* @param path 路径
* @param partitionsArray 分区list
* @param coalesceNum 合并后分区数
* @param partitionColNames 分区列
*/
def cleanAndSavePartitionPath(dataFrameWithDs: DataFrame, path: String, saveMode: String = "append"
, partitionsArray: Array[String], coalesceNum: Int = 0
, partitionColNames: Array[String] = Array("ds")): Unit = {
val spark = dataFrameWithDs.sparkSession
// 先删除对应分区的数据
deletePartitionPath(spark, path, partitionsArray)
// 保存对应分区的数据
savePartitionPath(dataFrameWithDs = dataFrameWithDs, path = path
,saveMode=saveMode, partitionColNames = partitionColNames)
}
def readSinglePath(spark: SparkSession, path: String): DataFrame = {
if (isExistsDirectory (spark, path) ) {
spark.read.parquet(path)
}else{
println ("This path don't exist!")
// 返回个空数据框,不知道有没有别的方式!!
spark.emptyDataFrame
}
}
/**
*
* @param spark SparkSession
* @param path 路径
* @param readType 文件类型,暂只支持parquet
* @param partitionsArray 所读的分区
* @return
*/
def readPartitionPath(spark: SparkSession, path: String, readType: String = "parquet"
, partitionsArray: Array[String])={
if(readType!="parquet"){
println(s"Not Support this readType:${readType}")
spark.emptyDataFrame
}else{
if(partitionsArray.length==0){
println("PartitionsArray is null")
spark.emptyDataFrame
}else{
val partitionPathArray = partitionsArray.map(x=>path +"/"+ s"ds=${x}")
//过滤掉不存在的分区目录
spark.read.option("basePath", path).parquet(partitionPathArray.filter(x=> isExistsDirectory(spark, x)):_*)
}
}
}
}
defined object scala2Hdfs
查看操作
查看路径是否存在,是否为文件、为目录
var path ="../Data"
path: String = ../Data
scala2Hdfs.isExistsDirectory(spark,path)
res34: Boolean = true
scala2Hdfs.isPathExist(spark,path)
res35: Boolean = true
scala2Hdfs.isExistsFile(spark,path)
res36: Boolean = false
scala2Hdfs.isExistsFile(spark,"../Data/test.txt")
res37: Boolean = true
scala2Hdfs.isPathExist(spark,"../Data/test.txt")
res38: Boolean = true
打印路径信息
scala2Hdfs.printPathDetail(spark,path,"total")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
scala2Hdfs.printPathDetail(spark,path,"directory")
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
scala2Hdfs.printPathDetail(spark,path,"file")
This path Already exist!
-----File:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
保存操作
非分区保存
saveMode="errorifexists"相当于直接新建,再保存。如果路径存在,则会报错。
此时可以采用覆盖写入的方式或者用saveSinglePath方法,先删除再写入
scala2Hdfs.saveSinglePath(df=df1,path="../Data/hdfsSaveNoPartition",saveMode="errorifexists")
Save this path: ../Data/hdfsSaveNoPartition
scala2Hdfs.cleanAndSaveSinglePath(spark=spark,df=df1,path="../Data/hdfsSaveNoPartition",saveMode="overwrite")
Clean this path: ../Data/hdfsSaveNoPartition
Save this path: ../Data/hdfsSaveNoPartition
错误的储存格式
scala2Hdfs.saveSinglePath(df=df,path="../Data/hdfsSaveTest",saveMode="append",saveType="dd")
Not Support this savetype:dd
分区储存
scala2Hdfs.savePartitionPath(dataFrameWithDs=df,path="../Data/hdfsSavePartition",partitionColNames=Array("ds"))
Save this path: ../Data/hdfsSavePartition
查看分区保存的结果
scala2Hdfs.printPathDetail(spark=spark,path="../Data/hdfsSavePartition")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
cleanAndSavePartitionPath,可以先对分区删除,再保存。此时,非20190101分区会有重复数据
常规的应用场景,都是往新分区里查数据,可具体问题具体分析
scala2Hdfs.cleanAndSavePartitionPath(dataFrameWithDs=df,path="../Data/hdfsSavePartition"
,partitionsArray=Array("20190101"),partitionColNames=Array("ds"))
Clean this path: ../Data/hdfsSavePartition/ds=20190101
Save this path: ../Data/hdfsSavePartition
scala2Hdfs.printPathDetail(spark=spark,path="../Data/hdfsSavePartition/ds=20190101")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101/part-00000-5771f2d8-7713-4fc3-9b5d-06df065bd3b8.c000.snappy.parquet
读取操作
非分区数据读取
spark.read.parquet("../Data/hdfsSaveNoPartition").show()
+---+------+---+-------------------+
| id| sex|age| createtime_str|
+---+------+---+-------------------+
| 2|female| 37|2019-01-02 11:55:50|
| 4|female| 44|2019-02-01 12:45:50|
| 1| male| 18|2019-01-01 11:45:50|
| 3| male| 21|2019-01-21 11:45:50|
| 5| male| 39|2019-01-15 10:40:50|
+---+------+---+-------------------+
scala2Hdfs.readSinglePath(spark, "../Data/hdfsSaveNoPartition").show()
+---+------+---+-------------------+
| id| sex|age| createtime_str|
+---+------+---+-------------------+
| 2|female| 37|2019-01-02 11:55:50|
| 4|female| 44|2019-02-01 12:45:50|
| 1| male| 18|2019-01-01 11:45:50|
| 3| male| 21|2019-01-21 11:45:50|
| 5| male| 39|2019-01-15 10:40:50|
+---+------+---+-------------------+
读取分区数据
scala2Hdfs.readPartitionPath(spark=spark, path="../Data/hdfsSavePartition",partitionsArray=Array("20191101","20190101","20190102")).show()
This path(../Data/hdfsSavePartition/ds=20191101) don't exist!
+---+------+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+------+---+-------------------+--------+
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 1| male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+
删除操作
删除操作错误路径
scala2Hdfs.deleteSinglePath(spark,"../data1")
This path(../data1) don't exist!
删除非分区数据
scala2Hdfs.deleteSinglePath(spark,"../Data/hdfsSaveNoPartition")
Clean this path: ../Data/hdfsSaveNoPartition
scala2Hdfs.printPathDetail(spark,"../Data/hdfsSaveNoPartition")
This path don't exist!
删除分区数据
scala2Hdfs.printPathDetail(spark,"../Data/hdfsSavePartition")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
scala2Hdfs.deletePartitionPath(spark=spark,path="../Data/hdfsSavePartition",partitionsArray=Array("20190101","20190102","20200101"))
Clean this path: ../Data/hdfsSavePartition/ds=20190101
Clean this path: ../Data/hdfsSavePartition/ds=20190102
This path(../Data/hdfsSavePartition/ds=20200101) don't exist!
scala2Hdfs.printPathDetail(spark,"../Data/hdfsSavePartition")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
清空所有数据
hdfs.delete(new Path("../Data/hdfsSavePartition"),true)
res58: Boolean = true
scala2Hdfs.printPathDetail(spark,"../Data")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
2020-01-02 于南京江宁区九龙湖