0
点赞
收藏
分享

微信扫一扫

spark保存到外部数据源

杏花疏影1 2022-02-16 阅读 86



文章目录


  • ​​保存为sequenceFile​​
  • ​​保存到HDFS​​
  • ​​保存到mysql​​


保存为sequenceFile

package write

import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkConf, SparkContext}

object saveToSeq {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]")
.setAppName("saveToSeq")
val sc = new SparkContext(conf)

val data = List(("name", "xiaoming"), ("age", "18"))
val rddData = sc.parallelize(data, 1)
rddData.saveAsSequenceFile("D:\\studyplace\\sparkBook\\chapter4\\result\\1",Some(classOf[GzipCodec]))
}
}

其中saveAsSequenceFile的api第一个参数是保存文件路径,第二个参数是设置压缩方式

对于ClassOf[xxxCodec]对象必须封装在Option集合中再传入SequenceFile方法中,在scala中Option的两个实例为Some集合和None集合,后者代表没有任何元素

在压缩方式中,GzipCodec的压缩比率较高,磁盘不足可以使用这个方式,虽然Bzip压缩率更高,但对于频繁读写场景不适用

保存到HDFS


  • saveAsTextFile
    本质上调用了saveAsHadoopFile方法
  • saveAsHadoopFile
    对URI进行判断,以file:/// 将数据保存到本地文件系统中,如果schema是hdfs://将数据写到hdfs文件中

saveAsHadoopFile方法中,默认调用的是TextOutputFormat实现类作为输出数据的格式化工具



​import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.spark.{SparkConf, SparkContext} object saveTohadoop { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("saveTohadoop").setMaster("local[*]") val sc = new SparkContext(conf) val rddData = sc.parallelize(List(("cat",20),("dog",29),("pig",11)),1) rddData.saveAsNewAPIHadoopFile("路径",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]]) sc.stop() } } ​

保存到mysql

package write

import java.sql.DriverManager

import org.apache.spark.{SparkConf, SparkContext}

object saveToMySQL {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("saveToMySQL")
val sc = new SparkContext(conf)

Class.forName("com.mysql.jdbc.Driver")
val rddData = sc.parallelize(List(("tom",11),("jettty",19)))
rddData.foreachPartition((iter:Iterator[(String,Int)]) => {
val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?useUnicode=true&characterEncoding=utf-8","root","123456")
conn.setAutoCommit(false)
val statement = conn.prepareStatement("insert into spark.person (name,age) VALUES (?,?);")
iter.foreach( t => {
statement.setString(1,t._1)
statement.setInt(2,t._2)
statement.addBatch()
})
statement.executeBatch()
conn.commit()
conn.close()
})
sc.stop()
}
}

保存数据的时候使用foreachPartition方法遍历RDD的每一个分区

​注意:​​DriverManager.getConnection 需要移到foreaPartition内部

conn.setAutoCommit(false) 关闭自动提交,对于大数据量批量操作更合适



举报

相关推荐

0 条评论