0
点赞
收藏
分享

微信扫一扫

Apache Spark 练习三:使用Spark SQL分析新冠肺炎疫情数据


本文依然延续使用​​《Apache Spark 练习一:使用RDD分析新冠肺炎疫情数据》​​中的数据进行分析。

0. 数据预处理

import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Coronavirus")
.getOrCreate()
import spark.implicits._

// 读取源数据
val df = spark.read
.option("header", value = true)
.csv("hdfs:///SparkLearning/us-counties.csv")
.withColumns(
Map(
"cases" -> $"cases".cast("int"),
"deaths" -> $"deaths".cast("int")
)
)
// 注册为临时视图
df.createOrReplaceTempView("us_covid_19")

1. 统计美国截止每日的累计确诊人数和累计死亡人数

val res = spark.sql("""
|SELECT date, SUM(cases) AS total_cases, SUM(deaths) AS total_deaths
|FROM us_covid_19
|GROUP BY date
|ORDER BY date
|""".stripMargin)

2. 统计截至2020.5.19,美国累计确诊人数最多的十个州

val res = spark.sql("""
|SELECT state, SUM(cases) AS total_cases
|FROM us_covid_19
|WHERE date = "2020-05-19"
|GROUP BY state
|ORDER BY total_cases DESC
|LIMIT 10
|""".stripMargin)

3. 统计截止2020.5.19,全美各州的病死率

val res = spark.sql("""
|SELECT state, ROUND(SUM(deaths) / SUM(cases), 4) AS death_rate
|FROM us_covid_19
|WHERE date = "2020-05-19"
|GROUP BY state
|ORDER BY state
|""".stripMargin)

4. 统计美国每日的新增确诊人数

val res = spark.sql("""
|WITH daily_cases AS (
| SELECT date, SUM(cases) AS total_cases
| FROM us_covid_19
| GROUP BY date
|)
|SELECT t1.date, t1.total_cases - t2.total_cases AS new_cases
|FROM daily_cases AS t1 JOIN daily_cases AS t2 ON DATEDIFF(t1.date, t2.date) = 1
|ORDER BY t1.date
|""".stripMargin)

举报

相关推荐

0 条评论