文章目录
- 2020大数据应用赛试题
- 任务一、Spark技术栈有哪些组件?简述其功能,及应用场景。
- 任务二、本题目使用spark进行数据分析
- 数据说明
- 题目
- 题目一
- 题目二
- 题目三
- 题目四
2020大数据应用赛试题
任务一、Spark技术栈有哪些组件?简述其功能,及应用场景。
1、其它组件的基础,spark的内核,主要包含:有向无环图、RDD 、Lingage、Cache、broadcast等,并封装了底层通讯框架、是spark的基础。
2、Spark Streaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kafka、Flume、Twitter、Zero和TCP套接字)进行类似Map、Reduce和join等复杂操作、将流式计算分解成一系列短小的批处理作业
3、Spark sql:Shark是SparkSQL的前身,Spark SQL 的一个中重要特点是其能够统一处理关系表和RDD ,使得开发人员可以轻松的使用SQL 命令进行外部查询,同时进行更复杂的数据分析。
4、BlinkDB:是一个用于在海量数据上运行交互式 SQL查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。
5、MLBase是 Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解学习的用户也能方便的使用MLBase。MLBase分为四部分:MLlib、MLI、ML Optimizer和 MLRuntime。
6、Graphx使用 Spark中图和图并进行计算。
返回顶部
任务二、本题目使用spark进行数据分析
数据说明
字段 | 字段说明 |
positionName | 职位名称 |
salary | 薪水 |
workYear | 工作年限 |
city | 城市 |
companyShortName | 公司简称 |
companySize | 公司规模 |
district | 所在区 |
financeStage | 融资阶段 |
industryField | 所在领域 |
thirdType | 职位类型 |
resumeProcessDay | 简历日处理 |
resumeProcessRate | 简历处理率 |
题目
题目一
1.拆分字段 salary -> min_salary,max_salry,并且取薪资的整数
- *如果salary字段是否包含类似’10k-20k*15薪’的值 如果存在 则把 15这样的类似数据给去掉,保留10K-20K; 然后拆分为 min_salary,max_salry 分别是 10,20
- 如果salary字段 类似10-20K 则拆分为 min_salary,max_salry 分别是 10,20
- 如果 salay字段为空值或者 面议,则拆分为 min_salary,max_salry 分别是 -1,-1
拆分后字段表结构为:
字段 |
positionName |
salary |
min_salary |
max_salry |
workYear |
city |
companyShortName |
companySize |
district |
financeStage |
industryField |
thirdType |
resumeProcessDay |
resumeProcessRate |
package 大数据应用赛_2020
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object Exam1_1 {
def main(args: Array[String]): Unit = {
// 创建环境
val spark = SparkSession.builder().appName("exam_1").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 指定数据结构信息去读取数据
val schema = StructType(
List(
StructField("职位名称",StringType),
StructField("薪水",StringType),
StructField("工作年限",StringType),
StructField("城市",StringType),
StructField("公司简介",StringType),
StructField("公司规模",StringType),
StructField("所在区",StringType),
StructField("融资阶段",StringType),
StructField("所在领域",StringType),
StructField("职位类型",StringType),
StructField("简历日处理",IntegerType),
StructField("简历处理率",IntegerType)
)
)
// 读取数据
val df = spark.read
.schema(schema) // 使用指定的schema
.option("header",value = true) //
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\zhaopin.txt")
df.show()
// 增加一列id
val window = Window.orderBy("城市")
val df_index = df.withColumn("index", row_number().over(window))
df_index.show()
// 取出薪水信息进行预处理
val salary = df_index.select('薪水, 'index)
salary.show()
salary.foreach(println(_))
// 信息过滤
val filter = salary
.map {
item =>
var line1 = item.toString().split('[')(1);
var line2 = line1.toString().split("]")(0); //..k-..k,index
var line3 = line2.toString().split(",")(1); // index
var line4 = line2.toString().split(",")(0); // ..k-..k
var sub_end2_0 = line4.toString().split("-")(0).toString.length - 1;
var sub_end2_1 = line4.toString().split("-")(1).toString.length - 1;
if (line4.toString.contains("*")) {
var base = line4.split("\\*")(0);
var sub_endb_0 = base.toString().split("-")(0).toString.length - 1;
var sub_endb_1 = base.toString().split("-")(1).toString.length - 1;
var min = base.toString().split("-")(0).substring(0,sub_endb_0).toInt;
var max = base.toString().split("-")(1).substring(0,sub_endb_1 ).toInt;
(min, max, line3)
} else if (line4.toString().contains("-")) {
var min = line4.toString().split("-")(0).substring(0, sub_end2_0).toInt;
var max = line4.toString().split("-")(1).substring(0, sub_end2_1).toInt;
(min, max, line3)
} else
(-1, -1, line3)
}
.toDF("min_salary", "max_salary", "index")
filter.show(100)
val full_data = df_index.join(filter, df_index.col("index") === filter.col("index"), "left_outer")
.select('职位名称 as "positionName", '薪水 as "salary", 'min_salary, 'max_salary, '工作年限 as "workYear",
'城市 as "city", '公司简介 as "companyShortName", '公司规模 as "companySize", '所在区 as "district",
'融资阶段 as "financeStage",'所在领域 as "industryField", '职位类型 as "thirdType",
'简历日处理 as "resumeProcessDay", '简历处理率 as "resumeProcessRate")
full_data.show(1000)
full_data.write
.option("header","true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\zhaopin.csv")
}
}
返回顶部
题目二
2.分析不同城市 薪资概况(工资的最小平均值,最大平均值)
- 返回结果包含 城市名称,薪资最小平均值avg(min_salry),最大平均值avg(max_salry)
package 大数据应用赛_2020
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.IntegerType
object Exam2 {
def main(args: Array[String]): Unit = {
// 创建环境
val spark = SparkSession.builder().appName("exam_1").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 读取数据
val data = spark.read
.option("header", "true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\zhaopin.csv\\part-00000-def725b7-564e-40f2-a8de-dc92875a7ea1-c000.csv")
data.show(10)
// 2、分析不同城市 薪资概况(工资的最小平均值,最大平均值)(2分)
// 返回结果包含 城市名称,薪资最小平均值avg(min_salry),最大平均值avg(max_salry)
// 转换数据类型
val data_use = data.select('city, 'min_salary cast (IntegerType), 'max_salary cast (IntegerType))
.toDF()
data_use.show(10)
data_use.printSchema()
// 选中所需字段,按照城市分组,聚合求均值
val result = data_use.select('city, 'min_salary, 'max_salary)
.groupBy('city)
.agg(avg('min_salary), avg('max_salary))
result.show()
result.coalesce(1) // 设置分区为1合并输出
.write.mode("overwrite")
.option("header","true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\exam02_1")
}
}
返回顶部
题目三
3、分析不同城市 、不同工作年限公司的职位需求数量
- 返回结果包含 城市名称,工作年限,需求数量
package 大数据应用赛_2020
import org.apache.spark.sql.{SaveMode, SparkSession}
object Exam3 {
def main(args: Array[String]): Unit = {
// 创建环境
val spark = SparkSession.builder().appName("exam_1").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 读取数据
val data = spark.read
.option("header", "true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\zhaopin.csv\\part-00000-def725b7-564e-40f2-a8de-dc92875a7ea1-c000.csv")
data.show(10)
// 3、分析不同城市 不同工作年限 公司职位需求数量(2分) 返回结果包含 城市名称,工作年限,需求数量,
// 按照所需字段提取数据,按照城市、工作年限分组,聚合计数
val data_use = data.select('city, 'workYear)
.groupBy('city, 'workYear)
.count()
.orderBy('city)
data_use.show(100)
data_use.coalesce(1) // 设置分区为1
.write.mode("overwrite")
.option("header","true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\exam03_1")
// 以下面的形式写出会按照分区执行!!!
// data_use.write
// .option("header","true")
// .mode(SaveMode.Overwrite)
// .csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\exam03")
}
}
返回顶部
题目四
4、 分析获得每个城市,在 职位名称中包含 “ 数据分析 “ 的职位需求、 公司名称 ,并按照需求数量倒序排列
- 返回结果包含 城市,公司名称,需求数量,
package 大数据应用赛_2020
import org.apache.spark.sql.{SaveMode, SparkSession}
object Exam4 {
def main(args: Array[String]): Unit = {
// 创建环境
val spark = SparkSession.builder().appName("exam_1").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 读取数据
val data = spark.read
.option("header", "true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\zhaopin.csv\\part-00000-def725b7-564e-40f2-a8de-dc92875a7ea1-c000.csv")
data.show(10)
// 4、分析获得每个城市,在 职位名称中包含 “数据分析“” 职位需求 公司名称 按照需求数量倒序排列(4分)
// 按照所需字段提取信息,按照城市、职位需求分组聚合计数,再按照新的count列降序desc排序
val data_use = data.select('city,'companyShortName,'positionName)
.where('positionName contains("数据分析"))
.groupBy('city,'companyShortName)
.count()
.orderBy('count desc)
data_use.show()
data_use.coalesce(1) // 设置分区为1
.write.mode("overwrite")
.option("header","true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\exam04_1")
}
}
返回顶部