文章目录
- 保存成文本文件
- 保存成JSON文件
- 保存成TSV,CSV 文件
- 保存成SequenceFile
- 保存成Object
- 显式调用Hadoop API 存储HDFS
- 写入Mysql
保存成文本文件
import org.apache.spark.{SparkConf, SparkContext}
object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val realDate = sc.parallelize(Array(("one", 1), ("two", 2), ("three", 3)),10)
realDate.saveAsTextFile("D://test")
}
}
parallelize 中10为分区数,设置较大会产生空文件,数据落在哪个分区上,是根据数据中的key的hash值与分区数进行求模运算来确定
保存成JSON文件
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.parsing.json.JSONObject
object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val map = Map("name" -> "zs","age"-> 4)
val realDate = sc.parallelize(List(JSONObject(map)),1)
realDate.saveAsTextFile("D://test")
sc.stop()
}
}
保存成TSV,CSV 文件
import org.apache.spark.{SparkConf, SparkContext}
object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val arr = Array("tom", "sd", 12, 2)
// CSV
val realDate = sc.parallelize(Array(arr.mkString(",")))
realDate.saveAsTextFile("D://test")
// TSV
val realDate2 = sc.parallelize(Array(arr.mkString("\t")))
realDate2.saveAsTextFile("D://1")
sc.stop()
}
}
保存成SequenceFile
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkConf, SparkContext}
object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val arr = List(("tom",2), ("sd", 12))
// CSV
val realDate = sc.parallelize(arr)
realDate.saveAsSequenceFile("D:/test",Some(classOf[GzipCodec]))
sc.stop()
}
}
保存成Object
import org.apache.spark.{SparkConf, SparkContext}
object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val s1 = new Student("zs", 12)
val s2 = new Student("zs", 12)
val rdd = sc.parallelize(List(s1, s2))
rdd.saveAsObjectFile("D:/test")
sc.stop()
}
}
case class Student(name: String, age: Int)
saveAsObjectFile 其实其底层调用了 saveAsSequenceFile 但是这里不允许压缩
/**
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String): Unit = withScope {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
.saveAsSequenceFile(path)
}
显式调用Hadoop API 存储HDFS
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.spark.{SparkConf, SparkContext}
object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val realDate = sc.parallelize(List(("sz",1),("si",3)))
realDate.saveAsHadoopFile("D:/test",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
sc.stop()
}
}
写入Mysql
mysql对于大数据量数据并发写入不是很友好,因此此方式只适合数据量处理完较小的插入,对于大数据量数据又想使用sql方式解决,可以考虑TiDB以及Phoeix
import java.sql.DriverManager
import org.apache.spark.{SparkConf, SparkContext}
object WriteSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WriteSpark")
val sc = new SparkContext(conf)
val realDate = sc.parallelize(List(("sz", 1), ("si", 3)))
realDate.foreachPartition((iter: Iterator[(String, Int)]) => {
val connection = DriverManager.getConnection("jdbc:mysql://note01?test", "root", "123456")
val statement = connection.prepareStatement("insert into test.person ('name','age') values (?,?);")
iter.foreach(t => {
statement.setString(1,t._1)
statement.setString(2,t._2)
statement.addBatch()
})
statement.executeBatch()
connection.commit()
connection.close()
})
sc.stop()
}
}