0
点赞
收藏
分享

微信扫一扫

spark大数据分析:spark core(2) RDD数据读取

骨灰级搬砖工 2022-02-10 阅读 74



文章目录


  • ​​读取普通文本数据​​
  • ​​读取json文件​​
  • ​​读取CSV 以及TSV​​
  • ​​读取sequenceFile​​
  • ​​读取对象数据​​
  • ​​显式调用hadoop API​​
  • ​​读取Mysql数据​​


读取普通文本数据

import org.apache.spark.{SparkConf, SparkContext}

object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val context = new SparkContext(conf)
val input = context.textFile("F:\\spark-book\\test-data\\1.txt")
println(input.collect().mkString(","))
context.stop()
}
}

如果对于hdfs有大量小文件中读取数据可以使用 context.wholeTextFiles() ,可以显著提高读取效率 并且读取文件可以支持正则,例如读取该文件下.txt结尾的文件

val input = context.textFile("F:\\spark-book\\test-data\\*.txt")

读取json文件


{“name”:“Thomas”,“age”:“20”,“address”:[“通信地址1”,“通信地址2”]}
{“name”:“Alice”,“age”:“18”,“address”:[“通信地址1”,“通信地址2”,“通信地址3”]}


import org.apache.spark.{SparkConf, SparkContext}

import scala.util.parsing.json.JSON

object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = sc.textFile("F:\\spark-book\\test-data\\test-json.txt")
val context = input.map(JSON.parseFull)
println(context.collect.mkString(","))
context.foreach(
{
case Some(map: Map[String, Any]) => println(map)
case None => println("bad json")
case _ => println("other error !")
}
)
sc.stop()
}
}

对此,json信息必须是独立一行完整信息


Some(Map(name -> Thomas, age -> 20, address -> List(111, 2222))),Some(Map(name -> Alice, age -> 18, address -> List(22131, 323113312, 方法)))
Map(name -> Thomas, age -> 20, address -> List(111, 2222))
Map(name -> Alice, age -> 18, address -> List(22131, 323113312, 方法))


读取CSV 以及TSV

csv 按逗号分隔开,tsv为制表符分隔

import org.apache.spark.{SparkConf, SparkContext}

object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = sc.textFile("F:\\spark-book\\test-data\\1.txt")
input.flatMap(_.split(",")).collect.foreach(println)
sc.stop()
}
}
import org.apache.spark.{SparkConf, SparkContext}

object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = sc.textFile("F:\\spark-book\\test-data\\1.txt")
input.flatMap(_.split("\t")).collect.foreach(println)
sc.stop()
}
}

读取sequenceFile

只有键值对格式数据

import org.apache.spark.{SparkConf, SparkContext}

object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = sc.sequenceFile[String,String]("F:\\spark-book\\test-data\\1.txt")
println(input.collect.mkString(","))
sc.stop()
}
}

读取对象数据

import org.apache.spark.{SparkConf, SparkContext}

object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = sc.objectFile[Student]("F:\\spark-book\\test-data\\1.txt")
println(input.collect.mkString(","))
sc.stop()
}
}

case class Student(name: String, age: Int)

显式调用hadoop API

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}

object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val result = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat]("hdfs://test/tmp/1.txt", classOf[TextInputFormat]
, classOf[LongWritable], classOf[Text])
val rs = result.map(_._2.toString).collect()
println(rs.mkString(","))
sc.stop()
}
}

LongWritable 为行号,Text为该行数据,InputFormat为读取数据类型

读取Mysql数据

添加依赖

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = new JdbcRDD(sc, () => {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://note01:3306/test", "root", "123456")
},
"select * from person where id > ?;", 1, 2, 1
,
r => (r.getInt(1), r.getString(2), r.getInt(3)))

input.foreach(println)
sc.stop()
}
}



举报

相关推荐

0 条评论