【提示】本文给出的是仅在基于Windows7本地平台+Idea中的测试参考代码,并没有实现分布式部署。因此,供初学者在本地测试学习使用。
读操作
package com.jdbc.mysql57
import java.util.Properties
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object ReadMySQL {
def main(args: Array[String]): Unit = {
offLog
val ssc=newSparkSession("SparkSQL_From_ODBC")
val p=new Properties()
p.put("user","root")
p.put("password","admin123")
try {
val df:DataFrame=ssc.read.jdbc(
"jdbc:mysql://localhost:3306/takeout",
"takeout_food",
p)
df.show
} catch {
case e: Exception =>
println("MySQL Error:")
e.printStackTrace()
}
}
def newSparkSession(appName: String, isHive: Boolean = false) = {
val builder = SparkSession
.builder()
.appName(appName)
.master("local")
.config("spark.sql.warehouse.dir", "warehouse")
if (isHive) {
builder.enableHiveSupport()
}
builder.getOrCreate()
}
def offLog={
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
Logger.getLogger("org.spark_project").setLevel(Level.OFF)
}
}
写操作
package com.jdbc.mysql57
import java.util.Properties
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object P97 {
def main(args: Array[String]): Unit = {
offLog
val ssc=newSparkSession("SparkSQL_From_ODBC")
//writeDataFrameToMySQL
import ssc.implicits._
val lines = ssc.sparkContext.textFile("file:/D:/sparksqllearn/spark02/src/com/rdd/examples/words.txt")
val df3=lines.toDF()
writeDataFrameToMySQL(df3)
}
def writeDataFrameToMySQL(df:DataFrame): Unit ={
val p=new Properties()
p.put("user","root")
p.put("password","admin123")
try {
df.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/takeout",
"test_table",
p)
} catch {
case e: Exception =>
println("MySQL Error:")
e.printStackTrace()
}
}
def newSparkSession(appName: String, isHive: Boolean = false) = {
val builder = SparkSession
.builder()
.appName(appName)
.master("local")
.config("spark.sql.warehouse.dir", "warehouse")
if (isHive) {
builder.enableHiveSupport()
}
builder.getOrCreate()
}
def offLog={
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
Logger.getLogger("org.spark_project").setLevel(Level.OFF)
}
}
上述代码运行成功后,打开MySQL数据库takeout即可观察到新表test_table与新添加的记录数据!
几点说明
Idea工具下的基础准备需要自己设置,在此不赘述。
上述代码编写过程中可能遇到1-2个错误,可参考引文中的第2、3篇。
主要引用
- 《Spark SQL入门与数据分析实践》,杨虹 谢显中 周前能 张安文,人民邮电出版社。
- Spark:将DataFrame写入Mysql
- java.sql.SQLException: No suitable driver的几种解决办法
- Apache Spark™ examples