文章目录
- Spark Day06:Spark Core
- 01-[了解]-课程内容回顾
- 02-[了解]-课程内容提纲
- 03-[掌握]-Spark 内核调度之引例WordCount
- 04-[掌握]-Spark 内核调度之RDD 依赖
- 05-[掌握]-Spark 内核调度之DAG和Stage
- 06-[了解]-Spark 内核调度之Spark Shuffle
- 07-[掌握]-Spark 内核调度之Job 调度流程
- 08-[掌握]-Spark 内核调度之Spark 基本概念
- 09-[理解]-Spark 内核调度之并行度
- 10-[掌握]-SparkSQL应用入口SparkSession
- 11-[掌握]-词频统计WordCount之基于DSL编程
- 12-[掌握]-词频统计WordCount之基于SQL编程
Spark Day06:Spark Core
01-[了解]-课程内容回顾
1、Sogou日志分析
以搜狗官方提供用户搜索查询日志为基础,使用SparkCore(RDD)业务分析
数据格式:
文本文件数据,每条数据就是用户搜索时点击网页日志数据
各个字段之间使用制表符分割
业务需求:
- 搜索关键词统计,涉及知识点中文分词:HanLP
- 用户搜索点击统计
- 搜索时间段统计
编码实现
第一步、读取日志数据,封装到实体类对象SougouRecord
第二步、按照业务需求分析数据
词频统计WordCount变形
2、外部数据源
SparkCore与HBase和MySQL数据库交互
- HBase数据源,底层MapReduce从HBase表读写数据API
保存数据到HBase表
TableOutputFormat
RDD[(RowKey, Put)],其中RowKey = ImmutableBytesWritable
从HBase表加载数据
TableInputFormat
RDD[(RowKey, Result)]
从HBase 表读写数据,首先找HBase数据库依赖Zookeeper地址信息
- MySQL数据源
保存数据RDD到MySQL表中,考虑性能问题,5个方面
考虑降低RDD分区数目
针对分区数据进行操作,每个分区创建1个连接
每个分区数据写入到MySQL数据库表中,批量写入
可以将每个分区数据加入批次
批量将所有数据写入
事务性,批次中数据要么都成功,要么都失败
人为提交事务
考虑大数据分析特殊性,重复运行程序,处理相同数据,保存到MySQL表中
主键存在时,更新数据;不存在时,插入数据
REPLACE INTO ............
3、共享变量(Shared Variables)
表示某个值(变量)被所有Task共享
- 广播变量
Broadcast Variables,共享变量值不能被改变
解决问题:
共享变量存储问题,将变量广播以后,仅仅在每个Executor中存储一份;如果没有对变量进行广播的话,每个Task中存储一份。
广播变量节省内存使用
- 累加器
Accumulators,共享变量值可以被改变,只能“累加”
类似MapReduce框架种计数器Counter,起到累加统计作用
Spark框架提供三种类型累加器:
LongAccumulator、DoubleAccumulator、CollectionAccumulator
02-[了解]-课程内容提纲
1、Spark 内核调度(理解)
了解Spark框架如何执行Job程序,以词频统计WordCount程序为例,如何执行程序
RDD 依赖
DAG图、Stage阶段
Shuffle
Job 调度流程
Spark 基本概念
并行度
2、SparkSQL快速入门
SparkSQL中程序入口:SparkSession
基于SparkSQL实现词频统计
SQL语句,类似Hive
DSL语句,类似RDD中调用API,链式编程
SparkSQL模块概述
前世今生
官方定义
几大特性
03-[掌握]-Spark 内核调度之引例WordCount
以词频统计WordCount
程序为例,Job执行是DAG图:
04-[掌握]-Spark 内核调度之RDD 依赖
- 窄依赖(Narrow Dependency)
- Shuffle 依赖(宽依赖 Wide Dependency)
05-[掌握]-Spark 内核调度之DAG和Stage
1、Stage切割规则:从后往前,遇到宽依赖就切割Stage。
2、Stage计算模式:pipeline管道计算模式
pipeline只是一种计算思想、模式,来一条数据然后计算一条数据,把所有的逻辑走完,然后落地。
以词频统计WordCount为例:
从HDFS上读取数据,每个Block对应1个分区,当从Block中读取一条数据以后,经过flatMap、map和reduceByKey操作,最后将结果数据写入到本地磁盘中(Shuffle Write)。
block0: hadoop spark spark
|textFile
RDD-0 hadoop spark spark
|flatMap
RDD-1 hadoop\spark\spark
|map
RDD-2 (hadoop, 1)\(spark, 1)\(spark, 1)
|reduceByKey
写入磁盘 hadoop, 1 || spark, 1\ spark, 1
3、准确的说:一个task处理一串分区的数据,整个计算逻辑全部走完
前提条件:11.data中三条数据
结果A:
filter..................
filter..................
filter..................
map..................
map..................
map..................
flatMap..................
flatMap..................
flatMap..................
Count = 3
结果B:
filter..................
map..................
flatMap..................
filter..................
map..................
flatMap..................
filter..................
map..................
flatMap..................
Count = 3
06-[了解]-Spark 内核调度之Spark Shuffle
Spark Shuffle实现历史:
- Spark在1.1以前的版本一直是采用Hash Shuffle的实现的方式
- 到1.1版本时参考HadoopMapReduce的实现开始引入Sort Shuffle
- 在1.5版本时开始Tungsten钨丝计划,引入UnSafe Shuffle优化内存及CPU的使用
- 在1.6中将Tungsten统一到Sort Shuffle中,实现自我感知选择最佳Shuffle方式
- 到的2.0版本,Hash Shuffle已被删除,所有Shuffle方式全部统一到Sort Shuffle一个实现中。
具体各阶段Shuffle如何实现,参考思维导图XMIND,大纲如下:
07-[掌握]-Spark 内核调度之Job 调度流程
Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度。
一个Spark应用程序包括Job、Stage及Task:
第一、Job是以Action方法为界,遇到一个Action方法则触发一个Job;
第二、Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;
第三、Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。
08-[掌握]-Spark 内核调度之Spark 基本概念
官方文档:http://spark.apache.org/docs/2.4.5/cluster-overview.html#glossary
09-[理解]-Spark 内核调度之并行度
参数spark.defalut.parallelism默认是没有值的,如果设置了值,是在shuffle的过程才会起作用
分析网站日志数据:20GB,存储在HDFS上,160Block,从HDFS读取数据,
RDD 分区数目:160 个分区
1、RDD分区数目160,那么Task数目为160个
2、总CPU Core核数
160/2 = 80
CPU Core = 60
160/3 = 50
3、假设每个Executor:6 Core
60 / 6 = 10 个
4、每个Executor内存
6 * 2 = 12 GB
6 * 3 = 18 GB
5、参数设置
--executor-memory= 12GB
--executor-cores= 6
--num-executors=10
10-[掌握]-SparkSQL应用入口SparkSession
1、SparkSession
程序入口,加载数据
底层SparkContext,进行封装
2、DataFrame/Dataset
Dataset[Row] = DataFrame
数据结构,从Spark 1.3开始出现,一直到2.0版本,确定下来
底层RDD,加上Schema约束(元数据):字段名称和字段类型
- 1)、SparkSession在SparkSQL模块中,添加MAVEN依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
- 2)、SparkSession对象实例通过
建造者模式
构建,代码如下:
其中①表示导入SparkSession所在的包,②表示建造者模式构建对象和设置属性,③表示导入SparkSession类中implicits对象object中隐式转换函数。
- 3)、范例演示:构建SparkSession实例,加载文本数据,统计条目数。
package cn.itcast.spark.sql.start
import org.apache.spark.sql.{Dataset, SparkSession}
/**
* Spark 2.x开始,提供了SparkSession类,作为Spark Application程序入口,
* 用于读取数据和调度Job,底层依然为SparkContext
*/
object _03SparkStartPoint {
def main(args: Array[String]): Unit = {
// 使用建造者设计模式,创建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.getOrCreate()
import spark.implicits._
// TODO: 使用SparkSession加载数据
val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount.data")
// 显示前5条数据
println(s"Count = ${inputDS.count()}")
inputDS.show(5, truncate = false)
// 应用结束,关闭资源
spark.stop()
}
}
11-[掌握]-词频统计WordCount之基于DSL编程
使用SparkSession加载文本数据,封装到Dataset/DataFrame中,调用API函数处理分析数据(类似RDD中API函数,如flatMap、map、filter等),编程步骤:
第一步、构建SparkSession实例对象,设置应用名称和运行本地模式;
第二步、读取HDFS上文本文件数据;
第三步、使用DSL(Dataset API),类似RDD API处理分析数据;
第四步、控制台打印结果数据和关闭SparkSession;
package cn.itcast.spark.sql.wordcount
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 使用SparkSQL进行词频统计WordCount:DSL
*/
object _04SparkDSLWordCount {
def main(args: Array[String]): Unit = {
// 使用建造设设计模式,创建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.getOrCreate()
import spark.implicits._
// TODO: 使用SparkSession加载数据
val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount.data")
// DataFrame/Dataset = RDD + schema
/*
root
|-- value: string (nullable = true)
*/
//inputDS.printSchema()
/*
+----------------------------------------+
|value |
+----------------------------------------+
|hadoop spark hadoop spark spark |
|mapreduce spark spark hive |
|hive spark hadoop mapreduce spark |
|spark hive sql sql spark hive hive spark|
|hdfs hdfs mapreduce mapreduce spark hive|
+----------------------------------------+
*/
//inputDS.show(10, truncate = false)
// TODO: 使用DSL(Dataset API),类似RDD API处理分析数据
val wordDS: Dataset[String] = inputDS.flatMap(line => line.trim.split("\\s+"))
/*
root
|-- value: string (nullable = true)
*/
//wordDS.printSchema()
/*
+---------+
|value |
+---------+
|hadoop |
|spark |
+---------+
*/
// wordDS.show(10, truncate = false)
/*
table: words , column: value
SQL: SELECT value, COUNT(1) AS count FROM words GROUP BY value
*/
val resultDS: DataFrame = wordDS.groupBy("value").count()
/*
root
|-- value: string (nullable = true)
|-- count: long (nullable = false)
*/
resultDS.printSchema()
/*
+---------+-----+
|value |count|
+---------+-----+
|sql |2 |
|spark |11 |
|mapreduce|4 |
|hdfs |2 |
|hadoop |3 |
|hive |6 |
+---------+-----+
*/
resultDS.show(10, truncate = false)
// 应用结束,关闭资源
spark.stop()
}
}
12-[掌握]-词频统计WordCount之基于SQL编程
第一步、构建SparkSession对象,加载文件数据,分割每行数据为单词;
第二步、将DataFrame/Dataset注册为临时视图(Spark 1.x中为临时表);
第三步、编写SQL语句,使用SparkSession执行获取结果;
第四步、控制台打印结果数据和关闭SparkSession;
package cn.itcast.spark.sql.wordcount
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 使用SparkSQL进行词频统计WordCount:SQL
*/
object _05SparkSQLWordCount {
def main(args: Array[String]): Unit = {
// 使用建造设设计模式,创建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.getOrCreate()
import spark.implicits._
// TODO: 使用SparkSession加载数据
val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount.data")
/*
root
|-- value: string (nullable = true)
*/
//inputDS.printSchema()
/*
+--------------------+
| value|
+--------------------+
|hadoop spark hado...|
|mapreduce spark ...|
|hive spark hadoop...|
+--------------------+
*/
//inputDS.show(5, truncate = false)
// 将每行数据按照分割划分为单词
val wordDS: Dataset[String] = inputDS.flatMap(line => line.trim.split("\\s+"))
/*
table: words , column: value
SQL: SELECT value, COUNT(1) AS count FROM words GROUP BY value
*/
// step 1. 将Dataset或DataFrame注册为临时视图
wordDS.createOrReplaceTempView("tmp_view_word")
// step 2. 编写SQL并执行
val resultDF: DataFrame = spark.sql(
"""
|SELECT value as word, COUNT(1) AS count FROM tmp_view_word GROUP BY value
|""".stripMargin)
/*
+---------+-----+
|word |count|
+---------+-----+
|sql |2 |
|spark |11 |
|mapreduce|4 |
|hdfs |2 |
|hadoop |3 |
|hive |6 |
+---------+-----+
*/
resultDF.show(10, truncate = false)
// 应用结束,关闭资源
spark.stop()
}
}