0
点赞
收藏
分享

微信扫一扫

Windows7+Spark基础|本地读写MySQL

Separes 2022-02-04 阅读 76

【提示】本文给出的是仅在基于Windows7本地平台+Idea中的测试参考代码,并没有实现分布式部署。因此,供初学者在本地测试学习使用。

读操作

package com.jdbc.mysql57
import java.util.Properties
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object ReadMySQL {
def main(args: Array[String]): Unit = {
offLog

val ssc=newSparkSession("SparkSQL_From_ODBC")

val p=new Properties()
p.put("user","root")
p.put("password","admin123")

try {
val df:DataFrame=ssc.read.jdbc(
"jdbc:mysql://localhost:3306/takeout",
"takeout_food",
p)
df.show
} catch {
case e: Exception =>
println("MySQL Error:")
e.printStackTrace()
}
}
def newSparkSession(appName: String, isHive: Boolean = false) = {
val builder = SparkSession
.builder()
.appName(appName)
.master("local")
.config("spark.sql.warehouse.dir", "warehouse")
if (isHive) {
builder.enableHiveSupport()
}
builder.getOrCreate()
}

def offLog={
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
Logger.getLogger("org.spark_project").setLevel(Level.OFF)
}
}

写操作

package com.jdbc.mysql57

import java.util.Properties
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object P97 {
def main(args: Array[String]): Unit = {
offLog

val ssc=newSparkSession("SparkSQL_From_ODBC")
//writeDataFrameToMySQL
import ssc.implicits._
val lines = ssc.sparkContext.textFile("file:/D:/sparksqllearn/spark02/src/com/rdd/examples/words.txt")
val df3=lines.toDF()

writeDataFrameToMySQL(df3)
}
def writeDataFrameToMySQL(df:DataFrame): Unit ={
val p=new Properties()
p.put("user","root")
p.put("password","admin123")

try {
df.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/takeout",
"test_table",
p)

} catch {
case e: Exception =>
println("MySQL Error:")
e.printStackTrace()
}

}
def newSparkSession(appName: String, isHive: Boolean = false) = {
val builder = SparkSession
.builder()
.appName(appName)
.master("local")
.config("spark.sql.warehouse.dir", "warehouse")
if (isHive) {
builder.enableHiveSupport()
}
builder.getOrCreate()
}

def offLog={
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
Logger.getLogger("org.spark_project").setLevel(Level.OFF)
}

}

上述代码运行成功后,打开MySQL数据库takeout即可观察到新表test_table与新添加的记录数据!

几点说明

Idea工具下的基础准备需要自己设置,在此不赘述。

上述代码编写过程中可能遇到1-2个错误,可参考引文中的第2、3篇。

主要引用

  • 《Spark SQL入门与数据分析实践》,杨虹 谢显中 周前能 张安文,人民邮电出版社。
  • ​​Spark:将DataFrame写入Mysql​​
  • ​​java.sql.SQLException: No suitable driver的几种解决办法​​
  • ​​Apache Spark™ examples​​


举报

相关推荐

0 条评论