0
点赞
收藏
分享

微信扫一扫

Spark的基本使用入门

君之言之 2022-01-30 阅读 42
package com.fh.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}


object TestDemo1 {
  def main(args: Array[String]): Unit = {
    method4
  }
  //测试案例类
  case class Player(name:String,age:Int,gender:String)

  //自建rdd,并完成rdd-dataframe-dataset之间的相互转化
  def method1():Unit={
    //master在本地设置为local,在线上集群应该设置为yarn
    val spark: SparkSession = SparkSession.builder().appName("test1").master("local[*]").getOrCreate()
    val sparkContext: SparkContext = spark.sparkContext
    //自建rdd,利用List区分行条目,利用元组区分列信息,自建rdd的api makeRDD实际调用的是parallelize
    val rdd1: RDD[(String, Int, String)] = sparkContext.makeRDD(List(("jack", 11, "male"), ("lisa", 12, "female")))
    rdd1.foreach(println)
    //隐式参数
    import spark.implicits._
    //rdd转dataframe,toDF内可以指定列名
    val df1: DataFrame = rdd1.toDF("name","age","gender")
    df1.show()
    //把rdd的元组映射为案例类,然后转为dataset
    val ds1: Dataset[Player] = rdd1.map(x => Player(x._1, x._2, x._3)).toDS()
    ds1.show()
    //把dataframe、dataset转为rdd后逐条打印
    df1.rdd.foreach(println)
    ds1.rdd.foreach(println)
    //dataframe转为dataset
    val ds2: Dataset[Player] = ds1.as[Player]
    ds2.show()
    //dataset转为dataframe,再次用toDF指定新的列名
    val df2: DataFrame = ds1.toDF("nm", "ag", "sex")
    df2.show()
  }
  //利用rdd读取本地文件,实现WordCount
  def method2():Unit={
    val sparkConf: SparkConf = new SparkConf().setAppName("test2").setMaster("local[*]")
    val sparkContext: SparkContext = new SparkContext(sparkConf)
    //textFile读取文件,rdd把文件按照行区分条目
    val rddFile: RDD[String] = sparkContext.textFile("src/main/resources/myfile/goodjob.txt")
    //对rdd内每条数据split按照空格拆分为数组,对rdd数据直接flatMap拍平为单词rdd集,然后map创建元组,在对元组reduceByKey以_1为基准,相同则把_2的值相加,得到最终结果
    val rddResult: RDD[(String, Int)] = rddFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    rddResult.foreach(println)
    //saveAsTextFile写入文件
    rddResult.saveAsTextFile("src/main/resources/myfile/result.txt")
  }

  //spark操作Hive
  //先创建hive表:create table player(name string,age int,gender string);create table player2(name string,age int,gender string);
  //insert into table player values('wangming',11,'male');
  //insert into table player values('yuki',12,'female');

  def method3():Unit={
    //注意spark操作hive时,需要加上enableHiveSupport
    val sparkSession: SparkSession = SparkSession.builder().appName("test3").master("local[*]").enableHiveSupport().getOrCreate()
    //测试故意修改列名
    val df: DataFrame = sparkSession.sql("select * from meta.player where age < 12").toDF("a","b","c")
    //创建临时会话视图
    df.createOrReplaceTempView("tv1")
    sparkSession.sql("insert into meta.player2(name,age,gender) select a,b,c from tv1")
  }

  //spark操作Mysql
  def method4():Unit={
    val sparkSession: SparkSession = SparkSession.builder().appName("test3").master("local[*]").enableHiveSupport().getOrCreate()
    val df1: DataFrame = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/my_test").option("user", "root").option("password", "123456").option("dbtable", "ui").load()
    df1.createOrReplaceTempView("tv1")
    val df2: DataFrame = sparkSession.sql("select * from tv1 where age <20")
    df2.write.mode("append").format("jdbc").option("url", "jdbc:mysql://localhost:3306/my_test?useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull&jdbcCompliantTruncation=false").option("user", "root").option("password", "123456").option("dbtable","ui2").save()
  }
}

举报

相关推荐

0 条评论