0
点赞
收藏
分享

微信扫一扫

scala快速读取配置类,读取日志parquet文件省市数据量输出json和mysql,

栖桐 2022-02-17 阅读 52



文章目录


  • ​​转换为json格式​​
  • ​​将结果输出到mysql中​​

  • ​​添加pom.xml​​
  • ​​主要类​​
  • ​​application.properties​​
  • ​​ProCityRptMysql​​



转换为json格式

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object ProCityRpt {
def main(args: Array[String]): Unit = {
// 0 校验参数个数
if (args.length != 2) {
println(
"""
|cn.dmp.report.ProCityRpt
|参数:
| logInputPath
| resultOutputPath
""".stripMargin)
sys.exit()
}

// 1 接受程序参数
val Array(logInputPath, resultOutputPath) = args

// 2 创建sparkconf->sparkContext
val sparkConf = new SparkConf()
sparkConf.setAppName(s"${this.getClass.getSimpleName}")
sparkConf.setMaster("local[*]")
// RDD 序列化到磁盘 worker与worker之间的数据传输
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val sc = new SparkContext(sparkConf)

val sqlc = new SQLContext(sc)
val df = sqlc.read.parquet(logInputPath)
df.registerTempTable("log")
// 按照省市进行分组聚合---》统计分组后的各省市的日志记录条数
df.show()
val result = sqlc.sql("select provincename,cityname,count(*) ct from log group by provincename,cityname")
val hadoopConf = sc.hadoopConfiguration
val fs = FileSystem.get(hadoopConf)
val resultPath = new Path(resultOutputPath)
if (fs.exists(resultPath)) {
fs.delete(resultPath)
}
//将结果存储成json格式
result.coalesce(1).write.json(resultOutputPath)
sc.stop()
}
}

将结果输出到mysql中

添加pom.xml

<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.1</version>
</dependency>

主要类

// 加载配置文件  application.conf -> application.json --> application.properties
val load = ConfigFactory.load()
val props = new Properties()
props.setProperty("user ", load.getString("jdbc.user"))
props.setProperty("password", load.getString("jdbc.password"))

application.properties

jdbc.url="jdbc:mysql://localhost:3306/spark?characterEncoding=utf-8"
jdbc.tableName="rpt_pc_count"
jdbc.user="root"
jdbc.password="123456"

jdbc.arearpt.table="rpt_area_analyse"

ProCityRptMysql

import java.util.Properties
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

object ProCityRptMysql {
def main(args: Array[String]): Unit = {
// 0 校验参数个数
if (args.length != 2) {
println(
"""
|cn.dmp.report.ProCityRpt
|参数:
| logInputPath
""".stripMargin)
sys.exit()
}

// 1 接受程序参数
val Array(logInputPath) = args

// 2 创建sparkconf->sparkContext
val sparkConf = new SparkConf()
sparkConf.setAppName(s"${this.getClass.getSimpleName}")
sparkConf.setMaster("local[*]")
// RDD 序列化到磁盘 worker与worker之间的数据传输
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val sc = new SparkContext(sparkConf)

val sqlc = new SQLContext(sc)
val df = sqlc.read.parquet(logInputPath)
df.registerTempTable("log")
// 按照省市进行分组聚合---》统计分组后的各省市的日志记录条数
df.show()
val result = sqlc.sql("select provincename,cityname,count(*) ct from log group by provincename,cityname")

/*
加载配置文件 加载顺序 application.conf -> application.json --> application.properties
*/
val load = ConfigFactory.load()
val props = new Properties()
props.setProperty("user",load.getString("jdbc.user"))
props.setProperty("password",load.getString("jdbc.password"))
//将结果存储成json格式
result.write/*.mode(SaveMode.Append)*/.jdbc(load.getString("jdbc.url"), load.getString("jdbc.tableName"), props)
sc.stop()
}

}



举报

相关推荐

0 条评论