0
点赞
收藏
分享

微信扫一扫

Apache Spark 练习二:使用Dataframe分析新冠肺炎疫情数据


本文依然延续使用​​《Apache Spark 练习一:使用RDD分析新冠肺炎疫情数据》​​中的数据进行分析。

0. 数据预处理

import org.apache.spark.sql.SparkSession

// 创建SparkSession
val spark = SparkSession
.builder()
.appName("Coronavirus")
.getOrCreate()
// 导入$-记号
import spark.implicits._

// 读取源数据
val df = spark.read
.option("header", value = true)
.csv("hdfs:///SparkLearning/us-counties.csv")
.withColumns(
Map(
"cases" -> $"cases".cast("int"),
"deaths" -> $"deaths".cast("int")
)
)

1. 统计美国截止每日的累计确诊人数和累计死亡人数

val res = df
.groupBy($"date")
.agg(
Map(
"cases" -> "sum",
"deaths" -> "sum"
)
)
.withColumnRenamed("sum(cases)", "total_cases")
.withColumnRenamed("sum(deaths)", "total_deaths")
.sort($"date")

2. 统计截至2020.5.19,美国累计确诊人数最多的十个州

val res = df
.filter($"date" === "2020-05-19")
.groupBy($"state")
.sum("cases")
.sort($"cases")
.tail(10)

3. 统计截止2020.5.19,全美各州的病死率

import org.apache.spark.sql.functions.round

val res = df
.filter($"date" === "2020-05-19")
.groupBy($"state")
.sum("cases", "deaths")
.withColumn("death_rate", round($"sum(deaths)" / $"sum(cases)", 4))
.select("state", "death_rate")
.sort($"state")

4. 统计美国每日的新增确诊人数

import org.apache.spark.sql.functions.datediff

val totalCases = df
.groupBy($"date")
.sum("cases")
.withColumnRenamed("sum(cases)", "total_cases")
val newCases = totalCases
.as("t1")
.join(totalCases.as("t2"), datediff($"t1.date", $"t2.date") === 1)
.withColumn("new_cases", $"t1.total_cases" - $"t2.total_cases")
.select($"t1.date".as("date"), $"new_cases")
.sort($"date")

举报

相关推荐

0 条评论