0
点赞
收藏
分享

微信扫一扫

【2020大数据应用赛试题】Spark分析处理

凯约 2022-08-12 阅读 63


文章目录

  • ​​2020大数据应用赛试题​​
  • ​​任务一、Spark技术栈有哪些组件?简述其功能,及应用场景。​​
  • ​​任务二、本题目使用spark进行数据分析​​
  • ​​数据说明​​
  • ​​题目​​
  • ​​题目一​​
  • ​​题目二​​
  • ​​题目三​​
  • ​​题目四​​

2020大数据应用赛试题



任务一、Spark技术栈有哪些组件?简述其功能,及应用场景。

1、其它组件的基础,spark的内核,主要包含:有向无环图、RDD 、Lingage、Cache、broadcast等,并封装了底层通讯框架、是spark的基础。

2、Spark Streaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kafka、Flume、Twitter、Zero和TCP套接字)进行类似Map、Reduce和join等复杂操作、将流式计算分解成一系列短小的批处理作业

3、Spark sql:Shark是SparkSQL的前身,Spark SQL 的一个中重要特点是其能够统一处理关系表和RDD ,使得开发人员可以轻松的使用SQL 命令进行外部查询,同时进行更复杂的数据分析。

4、BlinkDB:是一个用于在海量数据上运行交互式 SQL查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。

5、MLBase是 Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解学习的用户也能方便的使用MLBase。MLBase分为四部分:MLlib、MLI、ML Optimizer和 MLRuntime。

6、Graphx使用 Spark中图和图并进行计算。

​​返回顶部​​

任务二、本题目使用spark进行数据分析

【2020大数据应用赛试题】Spark分析处理_sql

数据说明

字段

字段说明

positionName

职位名称

salary

薪水

workYear

工作年限

city

城市

companyShortName

公司简称

companySize

公司规模

district

所在区

financeStage

融资阶段

industryField

所在领域

thirdType

职位类型

resumeProcessDay

简历日处理

resumeProcessRate

简历处理率

题目

题目一

1.拆分字段 salary -> min_salary,max_salry,并且取薪资的整数

  • *如果salary字段是否包含类似’10k-20k*15薪’的值 如果存在 则把 15这样的类似数据给去掉,保留10K-20K; 然后拆分为 min_salary,max_salry 分别是 10,20
  • 如果salary字段 类似10-20K 则拆分为 min_salary,max_salry 分别是 10,20
  • 如果 salay字段为空值或者 面议,则拆分为 min_salary,max_salry 分别是 -1,-1

拆分后字段表结构为:

字段

positionName

salary

min_salary

max_salry

workYear

city

companyShortName

companySize

district

financeStage

industryField

thirdType

resumeProcessDay

resumeProcessRate

package 大数据应用赛_2020

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object Exam1_1 {
def main(args: Array[String]): Unit = {
// 创建环境
val spark = SparkSession.builder().appName("exam_1").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 指定数据结构信息去读取数据
val schema = StructType(
List(
StructField("职位名称",StringType),
StructField("薪水",StringType),
StructField("工作年限",StringType),
StructField("城市",StringType),
StructField("公司简介",StringType),
StructField("公司规模",StringType),
StructField("所在区",StringType),
StructField("融资阶段",StringType),
StructField("所在领域",StringType),
StructField("职位类型",StringType),
StructField("简历日处理",IntegerType),
StructField("简历处理率",IntegerType)
)
)
// 读取数据
val df = spark.read
.schema(schema) // 使用指定的schema
.option("header",value = true) //
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\zhaopin.txt")
df.show()

// 增加一列id
val window = Window.orderBy("城市")
val df_index = df.withColumn("index", row_number().over(window))
df_index.show()

// 取出薪水信息进行预处理
val salary = df_index.select('薪水, 'index)
salary.show()
salary.foreach(println(_))
// 信息过滤
val filter = salary
.map {
item =>
var line1 = item.toString().split('[')(1);
var line2 = line1.toString().split("]")(0); //..k-..k,index
var line3 = line2.toString().split(",")(1); // index
var line4 = line2.toString().split(",")(0); // ..k-..k

var sub_end2_0 = line4.toString().split("-")(0).toString.length - 1;
var sub_end2_1 = line4.toString().split("-")(1).toString.length - 1;
if (line4.toString.contains("*")) {
var base = line4.split("\\*")(0);
var sub_endb_0 = base.toString().split("-")(0).toString.length - 1;
var sub_endb_1 = base.toString().split("-")(1).toString.length - 1;
var min = base.toString().split("-")(0).substring(0,sub_endb_0).toInt;
var max = base.toString().split("-")(1).substring(0,sub_endb_1 ).toInt;
(min, max, line3)
} else if (line4.toString().contains("-")) {
var min = line4.toString().split("-")(0).substring(0, sub_end2_0).toInt;
var max = line4.toString().split("-")(1).substring(0, sub_end2_1).toInt;
(min, max, line3)
} else
(-1, -1, line3)
}
.toDF("min_salary", "max_salary", "index")
filter.show(100)

val full_data = df_index.join(filter, df_index.col("index") === filter.col("index"), "left_outer")
.select('职位名称 as "positionName", '薪水 as "salary", 'min_salary, 'max_salary, '工作年限 as "workYear",
'城市 as "city", '公司简介 as "companyShortName", '公司规模 as "companySize", '所在区 as "district",
'融资阶段 as "financeStage",'所在领域 as "industryField", '职位类型 as "thirdType",
'简历日处理 as "resumeProcessDay", '简历处理率 as "resumeProcessRate")
full_data.show(1000)

full_data.write
.option("header","true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\zhaopin.csv")
}
}

【2020大数据应用赛试题】Spark分析处理_大数据_02

​​返回顶部​​

题目二

2.分析不同城市 薪资概况(工资的最小平均值,最大平均值)

  • 返回结果包含 城市名称,薪资最小平均值avg(min_salry),最大平均值avg(max_salry)

package 大数据应用赛_2020

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.IntegerType

object Exam2 {

def main(args: Array[String]): Unit = {
// 创建环境
val spark = SparkSession.builder().appName("exam_1").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 读取数据
val data = spark.read
.option("header", "true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\zhaopin.csv\\part-00000-def725b7-564e-40f2-a8de-dc92875a7ea1-c000.csv")
data.show(10)

// 2、分析不同城市 薪资概况(工资的最小平均值,最大平均值)(2分)
// 返回结果包含 城市名称,薪资最小平均值avg(min_salry),最大平均值avg(max_salry)
// 转换数据类型
val data_use = data.select('city, 'min_salary cast (IntegerType), 'max_salary cast (IntegerType))
.toDF()
data_use.show(10)
data_use.printSchema()

// 选中所需字段,按照城市分组,聚合求均值
val result = data_use.select('city, 'min_salary, 'max_salary)
.groupBy('city)
.agg(avg('min_salary), avg('max_salary))
result.show()

result.coalesce(1) // 设置分区为1合并输出
.write.mode("overwrite")
.option("header","true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\exam02_1")
}
}

【2020大数据应用赛试题】Spark分析处理_sql_03

​​返回顶部​​

题目三

3、分析不同城市 、不同工作年限公司的职位需求数量

  • 返回结果包含 城市名称,工作年限,需求数量

package 大数据应用赛_2020

import org.apache.spark.sql.{SaveMode, SparkSession}

object Exam3 {

def main(args: Array[String]): Unit = {

// 创建环境
val spark = SparkSession.builder().appName("exam_1").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 读取数据
val data = spark.read
.option("header", "true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\zhaopin.csv\\part-00000-def725b7-564e-40f2-a8de-dc92875a7ea1-c000.csv")
data.show(10)

// 3、分析不同城市 不同工作年限 公司职位需求数量(2分) 返回结果包含 城市名称,工作年限,需求数量,
// 按照所需字段提取数据,按照城市、工作年限分组,聚合计数
val data_use = data.select('city, 'workYear)
.groupBy('city, 'workYear)
.count()
.orderBy('city)
data_use.show(100)

data_use.coalesce(1) // 设置分区为1
.write.mode("overwrite")
.option("header","true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\exam03_1")

// 以下面的形式写出会按照分区执行!!!
// data_use.write
// .option("header","true")
// .mode(SaveMode.Overwrite)
// .csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\exam03")
}
}

【2020大数据应用赛试题】Spark分析处理_spark_04

​​返回顶部​​

题目四

4、 分析获得每个城市,在 职位名称中包含 “ 数据分析 “ 的职位需求、 公司名称 ,并按照需求数量倒序排列

  • 返回结果包含 城市,公司名称,需求数量,

package 大数据应用赛_2020

import org.apache.spark.sql.{SaveMode, SparkSession}

object Exam4 {
def main(args: Array[String]): Unit = {
// 创建环境
val spark = SparkSession.builder().appName("exam_1").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 读取数据
val data = spark.read
.option("header", "true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\zhaopin.csv\\part-00000-def725b7-564e-40f2-a8de-dc92875a7ea1-c000.csv")
data.show(10)

// 4、分析获得每个城市,在 职位名称中包含 “数据分析“” 职位需求 公司名称 按照需求数量倒序排列(4分)
// 按照所需字段提取信息,按照城市、职位需求分组聚合计数,再按照新的count列降序desc排序
val data_use = data.select('city,'companyShortName,'positionName)
.where('positionName contains("数据分析"))
.groupBy('city,'companyShortName)
.count()
.orderBy('count desc)
data_use.show()

data_use.coalesce(1) // 设置分区为1
.write.mode("overwrite")
.option("header","true")
.csv("G:\\Projects\\IdeaProjects\\Spark_Competition\\src\\main\\scala\\大数据应用赛_2020\\exam04_1")
}
}

【2020大数据应用赛试题】Spark分析处理_数据分析_05

​​返回顶部​​


举报

相关推荐

0 条评论