Spark简介
Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎,其特点就是对任意类型的数据进行自定义计算。
Spark VS Hadoop
面试题:Hadoop的基于进程的计算和Spark基于线程方式优缺点?
答案:Hadoop中的MR中每个map/reduce task都是一个java进程方式运行,好处在于进程之间是互相独立的,每个task独享进程资源,没有互相干扰,监控方便,但是问题在于task之间不方便共享数据,执行效率比较低。比如多个map task读取不同数据源文件需要将数据源加载到每个map task中,造成重复加载和浪费内存。而基于线程的方式计算是为了数据共享和提高执行效率,Spark采用了线程的最小的执行单位,但缺点是线程之间会有资源竞争
Spark四大优点
- 速度快
- 易于使用
- 通用性强
- 运行方式
Spark框架模块
Spark的结构角色
Spark运行角色
Spark中由4类角色组成整个Spark的运行时环境
. Master角色,管理整个集群的资源 类比与YARN的ResouceManager
. Worker角色,管理单个服务器的资源 类比于YARN的NodeManager
. Driver角色,管理单个Spark任务在运行的时候的工作 类比于YARN的ApplicationMaster
. Executor角色,单个任务运行的时候的一堆工作者,干活的.类比于YARN的容器内运行的TASK
Spark的运行模式
本地模式
本质:启动一个JVM Process进程(一个进程里面有多个线程),执行任务Task
- Local模式可以限制模拟Spark集群环境的线程数量, 即Local[N] 或 Local[*]
- 其中N代表可以使用N个线程,每个线程拥有一个cpu core。如果不指定N,则默认是1个线程(该线程有1个core)。通常Cpu有几个Core,就指定几个线程,最大化利用计算能力.
- 如果是local[*],则代表 Run Spark locally with as many worker threads as logical cores on your machine.按照Cpu最多的Cores设置线程数
Local 下的角色分布:
资源管理:
Master:Local进程本身
Worker:Local进程本身
任务执行:
Driver:Local进程本身
Executor:不存在,没有独立的Executor角色, 由Local进程(也就是Driver)内的线程提供计算能力
Standalone模式
Standalone模式是Spark自带的一种集群模式,不同于前面本地模式启动多个进程来模拟集群的环境,Standalone模式是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多机器集群,用于实际的大数据处理。
StandAlone集群在进程上主要有3类进程:
主节点Master进程:
Master角色, 管理整个集群资源,并托管运行各个任务的Driver
从节点Workers:
Worker角色, 管理每个机器的资源,分配对应的资源来运行Executor(Task);
每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数
历史服务器HistoryServer(可选):
Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查看应用运行相关信息。
搭配环境
Spark基础入门-第三章-3.5-总结_哔哩哔哩_bilibili
启动历史服务器
sbin/start-history-server.sh
启动Spark的Master和Worker进程
# 启动全部master和worker
sbin/start-all.sh
# 或者可以一个个启动:
# 启动当前机器的master
sbin/start-master.sh
# 启动当前机器的worker
sbin/start-worker.sh
# 停止全部
sbin/stop-all.sh
# 停止当前机器的master
sbin/stop-master.sh
# 停止当前机器的worker
sbin/stop-worker.sh
Standalone HA 模式
Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master 单点故障(SPOF)的问题。
Spark on Yarn (重点)
部署模式DeployMode
Cluster模式:
Client模式:
二者主要区别:
Cluster模式 | Client模式 | |
Driver运行位置 | YARN容器内 | 客户端进程内 |
通讯效率 | 高 | 低于Cluster模式 |
日志查看 | 日志输出在容器内,查看不方便 | 日志输出在客户端的标准输出流中,方便查看 |
生产可用 | 推荐 | 不推荐 |
稳定性 | 稳定 | 基于客户端进程,受到客户端进程影响 |
PySpark
PySpark是一个Python的类库, 提供Spark的操作API
而之前使用的 bin/pyspark 是一个交互式的程序,可以提供交互式编程并执行Spark计算,注意区分
环境配置:
Spark基础入门-第七章-7.1-本机配置Python环境_哔哩哔哩_bilibili
初体验
Spark Application程序入口为:SparkContext,任何一个应用首先需要构建SparkContext对象,如下两步构建:
第一步、创建SparkConf对象
设置Spark Application基本信息,比如应用的名称AppName和应用运行Master
第二步、基于SparkConf对象,创建SparkContext对象
下面是一个demo:
#coding:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster('local[*]').setAppName('LMX')
sc = SparkContext(conf = conf)
file_rdd = sc.textFile("data/input/words.txt")
words_rdd = file_rdd.flatMap(lambda line: line.split(" "))
words_one_rdd = words_rdd.map(lambda x: (x,1))
result_rdd = words_one_rdd.reduceByKey(lambda a,b:a+b)
print(result_rdd.collect())
下面这两句由 driver 运行,然后将虚拟化发送到Executor
conf = SparkConf().setMaster('local[*]').setAppName('LMX')
sc = SparkContext(conf = conf)
Executor读取文件,完成计算,在这个过程中Executor之间可能还会存在shuffle交互
file_rdd = sc.textFile("data/input/words.txt")
words_rdd = file_rdd.flatMap(lambda line: line.split(" "))
words_one_rdd = words_rdd.map(lambda x: (x,1))
result_rdd = words_one_rdd.reduceByKey(lambda a,b:a+b)
最后Executor将结果汇总到driver,print输出
print(result_rdd.collect())
Python on Spark原理
PySpark宗旨是在不破坏Spark已有的运行时架构,在Spark架构外层包装一层Python API,借助Py4j实现Python和 Java的交互,进而实现通过Python编写Spark应用程序,其运行时架构如下图所示