package com.fh.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
object TestDemo1 {
def main(args: Array[String]): Unit = {
method4
}
case class Player(name:String,age:Int,gender:String)
def method1():Unit={
val spark: SparkSession = SparkSession.builder().appName("test1").master("local[*]").getOrCreate()
val sparkContext: SparkContext = spark.sparkContext
val rdd1: RDD[(String, Int, String)] = sparkContext.makeRDD(List(("jack", 11, "male"), ("lisa", 12, "female")))
rdd1.foreach(println)
import spark.implicits._
val df1: DataFrame = rdd1.toDF("name","age","gender")
df1.show()
val ds1: Dataset[Player] = rdd1.map(x => Player(x._1, x._2, x._3)).toDS()
ds1.show()
df1.rdd.foreach(println)
ds1.rdd.foreach(println)
val ds2: Dataset[Player] = ds1.as[Player]
ds2.show()
val df2: DataFrame = ds1.toDF("nm", "ag", "sex")
df2.show()
}
def method2():Unit={
val sparkConf: SparkConf = new SparkConf().setAppName("test2").setMaster("local[*]")
val sparkContext: SparkContext = new SparkContext(sparkConf)
val rddFile: RDD[String] = sparkContext.textFile("src/main/resources/myfile/goodjob.txt")
val rddResult: RDD[(String, Int)] = rddFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
rddResult.foreach(println)
rddResult.saveAsTextFile("src/main/resources/myfile/result.txt")
}
def method3():Unit={
val sparkSession: SparkSession = SparkSession.builder().appName("test3").master("local[*]").enableHiveSupport().getOrCreate()
val df: DataFrame = sparkSession.sql("select * from meta.player where age < 12").toDF("a","b","c")
df.createOrReplaceTempView("tv1")
sparkSession.sql("insert into meta.player2(name,age,gender) select a,b,c from tv1")
}
def method4():Unit={
val sparkSession: SparkSession = SparkSession.builder().appName("test3").master("local[*]").enableHiveSupport().getOrCreate()
val df1: DataFrame = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/my_test").option("user", "root").option("password", "123456").option("dbtable", "ui").load()
df1.createOrReplaceTempView("tv1")
val df2: DataFrame = sparkSession.sql("select * from tv1 where age <20")
df2.write.mode("append").format("jdbc").option("url", "jdbc:mysql://localhost:3306/my_test?useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull&jdbcCompliantTruncation=false").option("user", "root").option("password", "123456").option("dbtable","ui2").save()
}
}