文章目录
- 读取普通文本数据
- 读取json文件
- 读取CSV 以及TSV
- 读取sequenceFile
- 读取对象数据
- 显式调用hadoop API
- 读取Mysql数据
读取普通文本数据
import org.apache.spark.{SparkConf, SparkContext}
object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val context = new SparkContext(conf)
val input = context.textFile("F:\\spark-book\\test-data\\1.txt")
println(input.collect().mkString(","))
context.stop()
}
}
如果对于hdfs有大量小文件中读取数据可以使用 context.wholeTextFiles() ,可以显著提高读取效率 并且读取文件可以支持正则,例如读取该文件下.txt结尾的文件
val input = context.textFile("F:\\spark-book\\test-data\\*.txt")
读取json文件
{“name”:“Thomas”,“age”:“20”,“address”:[“通信地址1”,“通信地址2”]}
{“name”:“Alice”,“age”:“18”,“address”:[“通信地址1”,“通信地址2”,“通信地址3”]}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.parsing.json.JSON
object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = sc.textFile("F:\\spark-book\\test-data\\test-json.txt")
val context = input.map(JSON.parseFull)
println(context.collect.mkString(","))
context.foreach(
{
case Some(map: Map[String, Any]) => println(map)
case None => println("bad json")
case _ => println("other error !")
}
)
sc.stop()
}
}
对此,json信息必须是独立一行完整信息
Some(Map(name -> Thomas, age -> 20, address -> List(111, 2222))),Some(Map(name -> Alice, age -> 18, address -> List(22131, 323113312, 方法)))
Map(name -> Thomas, age -> 20, address -> List(111, 2222))
Map(name -> Alice, age -> 18, address -> List(22131, 323113312, 方法))
读取CSV 以及TSV
csv 按逗号分隔开,tsv为制表符分隔
import org.apache.spark.{SparkConf, SparkContext}
object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = sc.textFile("F:\\spark-book\\test-data\\1.txt")
input.flatMap(_.split(",")).collect.foreach(println)
sc.stop()
}
}
import org.apache.spark.{SparkConf, SparkContext}
object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = sc.textFile("F:\\spark-book\\test-data\\1.txt")
input.flatMap(_.split("\t")).collect.foreach(println)
sc.stop()
}
}
读取sequenceFile
只有键值对格式数据
import org.apache.spark.{SparkConf, SparkContext}
object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = sc.sequenceFile[String,String]("F:\\spark-book\\test-data\\1.txt")
println(input.collect.mkString(","))
sc.stop()
}
}
读取对象数据
import org.apache.spark.{SparkConf, SparkContext}
object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = sc.objectFile[Student]("F:\\spark-book\\test-data\\1.txt")
println(input.collect.mkString(","))
sc.stop()
}
}
case class Student(name: String, age: Int)
显式调用hadoop API
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}
object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val result = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat]("hdfs://test/tmp/1.txt", classOf[TextInputFormat]
, classOf[LongWritable], classOf[Text])
val rs = result.map(_._2.toString).collect()
println(rs.mkString(","))
sc.stop()
}
}
LongWritable 为行号,Text为该行数据,InputFormat为读取数据类型
读取Mysql数据
添加依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestSpark")
val sc = new SparkContext(conf)
val input = new JdbcRDD(sc, () => {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://note01:3306/test", "root", "123456")
},
"select * from person where id > ?;", 1, 2, 1
,
r => (r.getInt(1), r.getString(2), r.getInt(3)))
input.foreach(println)
sc.stop()
}
}