0
点赞
收藏
分享

微信扫一扫

spark大数据分析:spark core(3) RDD数据保存到外部存储系统

老牛走世界 2022-02-15 阅读 87



文章目录


  • ​​保存成文本文件​​
  • ​​保存成JSON文件​​
  • ​​保存成TSV,CSV 文件​​
  • ​​保存成SequenceFile​​
  • ​​保存成Object​​
  • ​​显式调用Hadoop API 存储HDFS​​
  • ​​写入Mysql​​


保存成文本文件

import org.apache.spark.{SparkConf, SparkContext}

object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val realDate = sc.parallelize(Array(("one", 1), ("two", 2), ("three", 3)),10)
realDate.saveAsTextFile("D://test")
}
}

parallelize 中10为分区数,设置较大会产生空文件,数据落在哪个分区上,是根据数据中的key的hash值与分区数进行求模运算来确定

保存成JSON文件

import org.apache.spark.{SparkConf, SparkContext}

import scala.util.parsing.json.JSONObject

object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val map = Map("name" -> "zs","age"-> 4)
val realDate = sc.parallelize(List(JSONObject(map)),1)
realDate.saveAsTextFile("D://test")
sc.stop()
}
}

保存成TSV,CSV 文件

import org.apache.spark.{SparkConf, SparkContext}

object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val arr = Array("tom", "sd", 12, 2)
// CSV
val realDate = sc.parallelize(Array(arr.mkString(",")))
realDate.saveAsTextFile("D://test")
// TSV
val realDate2 = sc.parallelize(Array(arr.mkString("\t")))
realDate2.saveAsTextFile("D://1")
sc.stop()
}
}

保存成SequenceFile

import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkConf, SparkContext}

object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val arr = List(("tom",2), ("sd", 12))
// CSV
val realDate = sc.parallelize(arr)
realDate.saveAsSequenceFile("D:/test",Some(classOf[GzipCodec]))
sc.stop()
}
}

保存成Object

import org.apache.spark.{SparkConf, SparkContext}

object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val s1 = new Student("zs", 12)
val s2 = new Student("zs", 12)
val rdd = sc.parallelize(List(s1, s2))
rdd.saveAsObjectFile("D:/test")
sc.stop()
}
}

case class Student(name: String, age: Int)

saveAsObjectFile 其实其底层调用了 saveAsSequenceFile 但是这里不允许压缩

/**
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String): Unit = withScope {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
.saveAsSequenceFile(path)
}

显式调用Hadoop API 存储HDFS

import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.spark.{SparkConf, SparkContext}

object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val realDate = sc.parallelize(List(("sz",1),("si",3)))
realDate.saveAsHadoopFile("D:/test",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
sc.stop()
}
}

写入Mysql

mysql对于大数据量数据并发写入不是很友好,因此此方式只适合数据量处理完较小的插入,对于大数据量数据又想使用sql方式解决,可以考虑TiDB以及Phoeix

import java.sql.DriverManager

import org.apache.spark.{SparkConf, SparkContext}

object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val realDate = sc.parallelize(List(("sz", 1), ("si", 3)))
realDate.foreachPartition((iter: Iterator[(String, Int)]) => {
val connection = DriverManager.getConnection("jdbc:mysql://note01?test", "root", "123456")
val statement = connection.prepareStatement("insert into test.person ('name','age') values (?,?);")
iter.foreach(t => {
statement.setString(1,t._1)
statement.setString(2,t._2)
statement.addBatch()
})
statement.executeBatch()
connection.commit()
connection.close()
})
sc.stop()
}
}



举报

相关推荐

0 条评论