0
点赞
收藏
分享

微信扫一扫

大数据学习之路,Spark的介绍、部署以及wordcount实例的实现(1)

沈芏 2022-04-13 阅读 80

Spark简介

什么是Spark?

Apache Spark是一种多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习。

Spark的安装

检查

检查HDFS、YARN环境

[vagary@vagary ~]$ jps
4736 NameNode
5490 NodeManager
5106 SecondaryNameNode
4870 DataNode
5881 Jps
5375 ResourceManager

检查Java环境

[vagary@vagary ~]$ java -version
java version "1.8.0_212"
Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)

Scala部署

下载Scala安装包

这些安装包在官网上都有
https://www.scala-lang.org/download/all.html,然后这里我们安装2.12.15版本的Scala

[vagary@vagary software]$ wget https://downloads.lightbend.com/scala/2.12.15/scala-2.12.15.tgz

解压Scala安装包

[vagary@vagary software]$ tar -zxvf scala-2.12.15.tgz -C ../app

创建软连接

[vagary@vagary app]$ ln -s scala-2.12.15 scala

配置环境变量

编辑全局变量文件,/etc/profile

[root@vagary ~]# vi /etc/profile

然后将环境变量加入

export SCALA_HOME=/home/vagary/app/scala
export PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SCALA_HOME/bin:$PATH

配置完成退出后,使环境变量生效:

[root@vagary ~]# source /etc/profile

然后验证一下,出现版本号就表示配置成功:

[root@vagary ~]# scala -version
Scala code runner version 2.12.15 -- Copyright 2002-2021, LAMP/EPFL and Lightbend, Inc.

Spark部署

首先下载Spark安装包,从官网https://spark.apache.org/downloads.html上看版本很多,这里我们选择3.2.1的版本进行下载
在这里插入图片描述
注:我们Hadoop版本是3.1.3,然后这里下的spark是3.2.1,因为是自己学的,可以这么去用,如果服务器级别的开发是不能这么下载的,还是要下对应版本,或者预编译的。

预编译版的Spark

预编译好的,没有我们要的版本,所以我们要下载,从官网上下
https://spark.apache.org/downloads.html,版本很多,这里我们就用3.2.1,然后选择包类型为Source code,然后进行下载
在这里插入图片描述

解压

将安装包解压在app目录下

[vagary@vagary software]$ tar -zxvf spark-3.2.1-bin-hadoop3.2.tgz -C ../app

创建软连接

[vagary@vagary app]$ ln -s spark-3.2.1-bin-hadoop3.2  spark

配置环境变量

编辑全局变量文件,/etc/profile

[root@vagary ~]# vi /etc/profile

然后将环境变量加入

export SPARK_HOME=/home/vagary/app/spark
export PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SCALA_HOME/bin:$SPARK_HOME/bin:$PATH

配置完成退出后,使环境变量生效:

[root@vagary ~]# source /etc/profile

然后验证一下,出现版本号就表示配置成功:

[root@vagary ~]# scala -version
Scala code runner version 2.12.15 -- Copyright 2002-2021, LAMP/EPFL and Lightbend, Inc.

开始添加Spark配置信息

进入spark/conf文件夹,会看到所有文件都是template,一般情况都是拷贝一个,然后进行修改(避免修改错误,尽量保存原本文件,随时恢复)

[vagary@vagary conf]$ cp spark-env.sh.template spark-env.sh

然后打开spark-env.sh文件添加配置

SPARK_CONF_DIR=/home/vagary/app/spark/conf
HADOOP_CONF_DIR=/home/vagary/app/hadoop/etc/hadoop
YARN_CONF_DIR=/home/vagary/app/hadoop/etc/hadoop

添加MySQL驱动

这里直接拿之前hive的MySQL驱动包就行了

[vagary@vagary lib]$ cp mysql-connector-java-5.1.27-bin.jar ../../spark/jars/

添加hive配置文件

将之前的hive的conf/hive-site.xml文件,拷贝到spark/conf下就行了

[vagary@vagary spark]$ cp ../hive/conf/hive-site.xml ./conf/

测试是否安装部署完成

运行一个测试圆周率的实例,如果出现结果,就说明部署完成

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /home/sqq/app/spark/examples/jars/spark-examples*.jar 10

在这里插入图片描述
其实在跑一个实例任务的时候,会一闪而过一个web端的,但是任务结束就消失了,所以我们需要取启动History Server ,保留跑完的服务;

配置History Server

复制一份spark-defaults.conf出来,进行修改

[vagary@vagary conf]$ cp spark-defaults.conf.template spark-defaults.conf

我们先去hdfs上创建一个目录,存放spark的日志文件

[vagary@vagary conf]$ hdfs dfs -mkdir hdfs://vagary:9000/spark-log
[vagary@vagary conf]$ hdfs dfs -ls hdfs://vagary:9000/
Found 5 items
drwxr-xr-x   - vagary supergroup          0 2022-03-20 17:45 hdfs://vagary:9000/delete
drwxr-xr-x   - vagary supergroup          0 2022-04-10 02:46 hdfs://vagary:9000/spark-log

将这两个配置信息的注释符去掉,然后将目录修改为我们刚刚创建的那个目录

spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://vagary:9000/spark-log

然后修改spark-env.sh文件,加上History Server的配置

export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://vagary:9000/spark-log -Dspark.history.ui.port=7777"

查看当前状态下的启动的服务有哪些


[vagary@vagary conf]$ jps
1377 SecondaryNameNode
6433 Jps
1121 DataNode
988 NameNode
1822 NodeManager
1662 ResourceManager

到sbin目录下,启动History Server


[vagary@vagary sbin]$ ./start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /home/vagary/app/spark/logs/spark-vagary-org.apache.spark.deploy.history.HistoryServer-1-vagary.out

jps查看一下,可以看到History Server是启动好的

[vagary@vagary sbin]$ jps
1377 SecondaryNameNode
1121 DataNode
6723 HistoryServer
988 NameNode
6797 Jps
1822 NodeManager
1662 ResourceManager

然后再去web端查看一下,可以看到是启动成功了
在这里插入图片描述
这里再跑一下实例,查看web端状态,可以看到刚跑的实例在这里是记录日志得了。
在这里插入图片描述

wordcount实例

先将文件上传到hdfs上

[vagary@vagary ~]$ hdfs dfs -put  /home/vagary/wcdata.txt   hdfs://vagary:9000/data/
2022-04-10 03:21:34,402 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

然后到bin目录下,启动spark-shell.sh脚本

[vagary@vagary bin]$ ./spark-shell

读一个文件将结果打印出来

scala> sc.textFile("/data/wcdata.txt").foreach(println)
aaa;bbb;ccc                                                         (0 + 2) / 2]
aaa;bbb;ddd
aaa;java;hadoop
hadoop;java;spark;scala
scala;spark;java

以分号进行切割,然后打印出来


scala> sc.textFile("/data/wcdata.txt").flatMap(_.split(";")).foreach(println)
aaa
bbb
ccc
aaa
bbb
ddd
aaa
java
hadoop
hadoop
java
spark
scala
scala
spark
java

等同于这样子写法

scala> sc.textFile("/data/wcdata.txt").flatMap(x => x.split(";")).foreach(println)
aaa
bbb
ccc
aaa
bbb
ddd
aaa
java
hadoop
hadoop
java
spark
scala
scala
spark
java

拆成k-v键值对形式


scala> sc.textFile("/data/wcdata.txt").flatMap(x => x.split(";")).map((_,1)).foreach(println)
(scala,1)
(spark,1)
(java,1)
(aaa,1)
(bbb,1)
(ccc,1)
(aaa,1)
(bbb,1)
(ddd,1)
(aaa,1)
(java,1)
(hadoop,1)
(hadoop,1)
(java,1)
(spark,1)
(scala,1)

最后进行值汇总

scala>  sc.textFile("/data/wcdata.txt").flatMap(x => x.split(";")).map((_,1)).reduceByKey(_+_).foreach(println)
(scala,2)
(bbb,2)
(ddd,1)
(java,3)
(spark,2)
(hadoop,2)
(ccc,1)
(aaa,3)

注释:其中flatMap类似于 map,但每个输入项可以映射到 0 个或更多输出项(因此func应该返回一个 Seq 而不是单个项);
foreach,对数据集的每个元素运行函数func。这通常是针对副作用进行的,例如更新累加器或与外部存储系统交互。

yarn-client/yarn-cluster

最大区别:Driver运行地方不同;
client运行在本地客户端,Spark-submit启动Driver线程;
Cluster模式下,启动,会将任务丢给yarn,向yarn申请一台机器运行;

Driver/Executor

Driver:
①、driver进程就是应用的main()函数并且构建sparkContext对象,当我们提交了应用之后,便会启动一个对应的driver进程, driver本身 会根据我们设置的参数占有一定的资源(主要指cpu core和memory),
②、driver可以运行在master上,也可以运行worker上(根据部署模式的不同)。
③、driver首先会向集群管理者(standalone、 yarn, mesos) 申请spark应用所需的资源,也就是executor, 然后集群管理者会根据spark应用所设置的参数在各个worker上分配一定数量 的executor,每个executor都占用-定数星的cpu和memory。在申请到应用所需的资源以后,driver就开始调度和执行我们编写的应用代码了。
④、driver进程 会将我们编写的spark应用代码拆分成多个stage,每个stage执行一部分代码片段,并为每个stage创建一批tasks, 然后将这些tasks分配到各个executor中执行。
Executor:
executor进程宿主在worker节点上,一个worker可以有多个executor。每个executor持有一个线程池,每个线程可以执行一个task, executor执行完task以后将结果返回给driver,每个executor执行的task都属于同一个应用。此外executor还有 一个功能就是为应用程序中要求缓存的RDD提供内存式存储,RDD是直接缓存在executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

举报

相关推荐

0 条评论