一、checkpoint概述
checkpoint,是Spark提供的一个比较高级的功能。有时候,我们的Spark任务,比较复杂,从初始化RDD开始,到最后整个任务完成,有比较多的步骤,比如超过10个transformation算子。而且,整个任务运行的时间也特别长,比如通常要运行1~2个小时。
在这种情况下,就比较适合使用checkpoint功能了。
因为对于特别复杂的Spark任务,有很高的风险会出现某个要反复使用的RDD因为节点的故障导致丢失,虽然之前持久化过,但是还是导致数据丢失了。那么也就是说,出现失败的时候,没有容错机制,所以当后面的transformation算子,又要使用到该RDD时,就会发现数据丢失了,此时如果没有进行容错处理的话,那么就需要再重新计算一次数据了。
所以针对这种Spark Job,如果我们担心某些关键的,在后面会反复使用的RDD,因为节点故障导致数据丢失,那么可以针对该RDD启动checkpoint机制,实现容错和高可用
那如何使用checkPoint呢?
首先要调用SparkContext的setCheckpointDir()方法,设置一个容错的文件系统的目录,比如HDFS;然后,对RDD调用checkpoint()方法。
最后,在RDD所在的job运行结束之后,会启动一个单独的job,将checkpoint设置过的RDD的数据写入之前设置的文件系统中。
这是checkpoint使用的基本步骤,很简单,那我们下面先从理论层面分析一下当我们设置好checkpoint之后,Spark底层都做了哪些事情
二、RDD之checkpoint流程
看这个图
1:SparkContext设置checkpoint目录,用于存放checkpoint的数据;
对RDD调用checkpoint方法,然后它就会被RDDCheckpointData对象进行管理,此时这个RDD的checkpoint状态会被设置为Initialized
2:待RDD所在的job运行结束,会调用job中最后一个RDD的doCheckpoint方法,该方法沿着RDD的血缘关系向上查找被checkpoint()方法标记过的RDD,并将其checkpoint状态从Initialized设置为CheckpointingInProgress
3:启动一个单独的job,来将血缘关系中标记为CheckpointInProgress的RDD执行checkpoint操作,也就是将其数据写入checkpoint目录
4:将RDD数据写入checkpoint目录之后,会将RDD状态改变为Checkpointed;
并且还会改变RDD的血缘关系,即会清除掉RDD所有依赖的RDD;
最后还会设置其父RDD为新创建的CheckpointRDD
三、checkpoint与持久化的区别
那这里所说的checkpoint和我们之前讲的RDD持久化有什么区别吗?
1、lineage是否发生改变
linage(血缘关系)说的就是RDD之间的依赖关系
持久化,只是将数据保存在内存中或者本地磁盘文件中,RDD的lineage(血缘关系)是不变的;Checkpoint执行之后,RDD就没有依赖的RDD了,也就是它的lineage改变了
2、丢失数据的可能性
持久化的数据丢失的可能性较大,如果采用 persist 把数据存在内存中的话,虽然速度最快但是也是最不可靠的,就算放在磁盘上也不是完全可靠的,因为磁盘也会损坏。
Checkpoint的数据通常是保存在高可用文件系统中(HDFS),丢失的可能性很低
建议:对需要checkpoint的RDD,先执行persist(StorageLevel.DISK_ONLY)
为什么呢?
因为默认情况下,如果某个RDD没有持久化,但是设置了checkpoint,那么这个时候,本来Spark任务已经执行结束了,但是由于中间的RDD没有持久化,在进行checkpoint的时候想要将这个RDD的数据写入外部存储系统的话,就需要重新计算这个RDD的数据,再将其checkpoint到外部存储系统中。
如果对需要checkpoint的rdd进行了基于磁盘的持久化,那么后面进行checkpoint操作时,就会直接从磁盘上读取rdd的数据了,就不需要重新再计算一次了,这样效率就高了。
那在这能不能使用基于内存的持久化呢?当然是可以的,不过没那个必要。
四、checkPoint的使用
那我们来演示一下:将一个RDD的数据持久化到HDFS上面
1、scala代码如下:
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:checkpoint的使用
*
*/
object CheckPointOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("CheckPointOpScala")
val sc = new SparkContext(conf)
if(args.length==0){
System.exit(100)
}
val outputPath = args(0)
//1:设置checkpint目录
sc.setCheckpointDir("hdfs://bigdata01:9000/chk001")
val dataRDD = sc.textFile("hdfs://bigdata01:9000/hello_10000000.dat")
//2:对rdd执行checkpoint操作
dataRDD.checkpoint()
dataRDD.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_ + _)
.saveAsTextFile(outputPath)
sc.stop()
}
}
2、java代码如下:
package com.imooc.java;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* 需求:checkpoint的使用
*
*/
public class CheckPointOpJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("CheckPointOpJava");
JavaSparkContext sc = new JavaSparkContext(conf);
if(args.length==0){
System.exit(100);
}
String outputPath = args[0];
//1:设置checkpint目录
sc.setCheckpointDir("hdfs://bigdata01:9000/chk002");
JavaRDD<String> dataRDD = sc.textFile("hdfs://bigdata01:9000/hello_10000000.dat");
//2: 对rdd执行checkpoint操作
dataRDD.checkpoint();
dataRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
}).mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word,1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
}).saveAsTextFile(outputPath);
sc.stop();
}
}
3、验证效果
下面我们把这个任务打包提交到集群上运行一下,看一下效果。
先确保hadoop集群是正常运行的,以及hadoop中的historyserver进程和spark的historyserver进程也是正常运行的。
(1)数据准备
测试数据之前已经上传到了hdfs上面,如果没有则需要上传
[root@bigdata01 soft]# hdfs dfs -ls /hello_10000000.dat
-rw-r--r-- 2 root supergroup 1860100000 2020-04-28 22:15 /hello_10000000.dat
(2)打包前配置
将pom.xml中的spark-core的依赖设置为provided,然后编译打包
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
(3)打包
mvn clean package -DskipTests
将打包的jar包上传到bigdata04的/data/soft/sparkjars目录,创建一个新的spark-submit脚本
[root@bigdata04 sparkjars]# cp wordCountJob.sh checkPointJob.sh
[root@bigdata04 sparkjars]# vi checkPointJob.sh
spark-submit \
--class com.imooc.scala.CheckPointOpScala \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 1 \
db_spark-1.0-SNAPSHOT-jar-with-dependencies.jar \
/out-chk001
提交任务
[root@bigdata04 sparkjars]# sh -x checkPointJob.sh
执行成功之后可以到setCheckpointDir指定的目录中查看一下,可以看到目录中会生成对应的文件保存rdd中的数据,只不过生成的文件不是普通文本文件,直接查看文件中的内容显示为乱码。
[root@bigdata04 sparkjars]# hdfs dfs -ls /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1
Found 14 items
-rw-r--r-- 2 root supergroup 134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00000
-rw-r--r-- 2 root supergroup 134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00001
-rw-r--r-- 2 root supergroup 134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00002
-rw-r--r-- 2 root supergroup 134946420 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00003
-rw-r--r-- 2 root supergroup 134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00004
-rw-r--r-- 2 root supergroup 134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00005
-rw-r--r-- 2 root supergroup 134946420 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00006
-rw-r--r-- 2 root supergroup 134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00007
-rw-r--r-- 2 root supergroup 134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00008
-rw-r--r-- 2 root supergroup 134946420 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00009
-rw-r--r-- 2 root supergroup 134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00010
-rw-r--r-- 2 root supergroup 134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00011
-rw-r--r-- 2 root supergroup 134946420 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00012
-rw-r--r-- 2 root supergroup 115894912 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00013
接下来进到YARN的8088界面查看
点击Tracking UI进入spark的ui界面
看第一个界面jobs
在这可以看出来产生了2个job,
第一个job是我们正常的任务执行,执行了39s,一共产生了28个task任务
第二个job是checkpoint启动的job,执行了35s,一共产生了14个task任务
看第二个界面Stages,这里面的3个Stage是前面2个job产生的
具体想知道哪些Stage属于哪个job任务的话,可以在任务界面,点击Description中的链接就可以看到job对应的Stage
先看第一个job的Stage
第一个job其实就是我们实现的单词计数的功能,这个任务产生了两个stage,这两个stage具体是如何划分的呢?
咱们前面讲过,stage的划分是由宽依赖决定的,在这个任务中reduceByKey这个过程会产生宽依赖,所以会产生2个Stage
这里面显示的有这两个stage的一些基本信息
stage id:stage的编号,从0开始
Duration:stage执行消耗的时间
Tasks:Successed/Total:task执行成功数量/task总量
Input:输入数据量
ouput:输出数据量
shuffle read/shuffle read:shuffle过程传输数据量
点击这个界面中的DAG Visualization可以看到当前这个任务stage的划分情况,可以看到每个Stage包含哪些算子
进到Stage内部看一下
下面可以看到每个task的具体执行情况,执行状态,执行消耗的时间,GC消耗的时间,处理的数据量和数据条数、通过shuffle输出的数据量和数据条数
其实从这里也可以看出来文件的每一个block块会产生一个task
这就是这个Stage执行的基本信息了。
加下来看一下第二个Job,这个job是checkpoint启动的任务,查看它的stage的信息
这个job只会产生一个stage,因为我们只针对textFile的结果设置了checkpoint
这个stage执行消耗了35s,说明这份数据是重新通过textFile读取过来的。
针对Storage这块,显示的其实就是持久化的数据,如果对RDD做了持久化,那么在任务执行过程中能看到,任务执行结束就看不到了。
下面我们来验证一下在开启持久化的情况下执行checkpoint操作时的区别
在代码中针对RDD开启持久化
1:对比此时产生的两个job总的消耗的时间,以及job中的Stage消耗的时间
其实你会发现开启持久化之后,checkpoint的那个job消耗的时间就变少了
2:查看DAG Visualization,你会发现stage里面也会有有一些不一样的地方
五、checkpoint源码分析
前面我们通过理论层面分析了checkpoint的原理,以及演示了checkpoint的使用
下面我们通过源码层面来对我们前面分析的理论进行验证
先下载spark源码,下载流程和下载spark安装包的流程一样
把下载的安装包解压到idea项目目录中
D:\IdeaProjects\spark-2.4.3
打开spark-2.4.3源码目录,进入core目录,这个是spark的核心代码,我们要查看的checkpoint的源码就在这个项目中
在idea中打开core这个子项目
下面我们就来分析一下RDD的checkpoint功能:
checkpoint功能可以分为两块
1:checkpoint的写操作
将指定RDD的数据通过checkpoint存储到指定外部存储中
2:checkpoint的读操作
任务中RDD数据在使用过程中丢失了,正好这个RDD之前做过checkpoint,所以这时就需要通过checkpoint来恢复数据
先看checkpoint的写操作
1.1:当我们在自己开发的spark任务中先调用sc.setCheckpointDir时,底层其实就会调用SparkContext中的setCheckpointDir方法
def setCheckpointDir(directory: String) {
// If we are running on a cluster, log a warning if the directory is local.
// Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
// its own local file system, which is incorrect because the checkpoint files
// are actually on the executor machines.
if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
s"must not be on the local filesystem. Directory '$directory' " +
"appears to be on the local filesystem.")
}
//根据我们传过来的目录,后面再拼上一个子目录,子目录使用一个UUID随机字符串
//使用HDFS的javaAPI 在HDFS上创建目录
checkpointDir = Option(directory).map { dir =>
val path = new Path(dir, UUID.randomUUID().toString)
val fs = path.getFileSystem(hadoopConfiguration)
fs.mkdirs(path)
fs.getFileStatus(path).getPath.toString
}
}
1.2:接着我们会调用RDD.checkpoint方法,此时会执行RDD这个class中的checkpoint方法
//这里相当于是checkpoint的一个标记,并没有真正执行checkpoint
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensuring
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
//如果SparkContext没有设置checkpointDir,则抛出异常
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
//如果设置了,则创建RDDCheckpointData的子类,这个子类主要负责管理RDD的checkpoint的进程和状态等
//并且会初始化checkpoint状态为Initialized
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
这个checkpoint方法执行完成之后,这个流程就结束了。
1.3:剩下的就是在这个设置了checkpint的RDD所在的job执行结束之后,Spark会调用job中最后一个RDD的doCheckpoint方法
这个逻辑是在SparkContext这个class的runJob方法中,当执行到Spark中的action算子时,这个runJob方法会被触发,开始执行任务。
这个runJob的最后一行会调用rdd中的doCheckpoint方法
//在有action动作时,会触发sparkcontext对runJob的调用
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStageException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
//在这里会执行doCheckpoint()
rdd.doCheckpoint()
}
1.4:接着会进入到RDD中的doCheckpoint方法
这里面最终会调用RDDCheckpointData的checkpoint方法
checkpointData.get.checkpoint()
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
//该rdd是否已经调用doCheckpoint,如果还没有,则开始处理
if (!doCheckpointCalled) {
doCheckpointCalled = true
//若已经被checkpoint()标记过,则checkpointData.isDefined为true
if (checkpointData.isDefined) {
//查看是否需要把该rdd的所有依赖全部checkpoint
//checkpointAllMarkedAncestors取自配置"spark.checkpoint.checkpointAllMarkedAncestors",
//默认不配时值为false
if (checkpointAllMarkedAncestors) {
// TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
// them in parallel.
// Checkpoint parents first because our lineage will be truncated after we
// checkpoint ourselves
// 血缘上的每一个父rdd递归调用该方法
dependencies.foreach(_.rdd.doCheckpoint())
}
//调用RDDCheckpointData的checkpoint方法
checkpointData.get.checkpoint()
} else {
//沿着rdd的血缘关系向上查找被checkpoint()标记过的RDD
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
1.5:接下来进入到RDDCheckpointData的checkpoint方法中
这里面会调用子类ReliableCheckpointRDD中的doCheckpoint()方法
final def checkpoint(): Unit = {
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the Stage of this RDDCheckpointData
将checkpoint的状态从Initialized置为CheckpointingInProgress
RDDCheckpointData.synchronized {
if (cpStage == Initialized) {
cpStage = CheckpointingInProgress
} else {
return
}
}
//调用子类的doCheckpoint,默认会使用ReliableCheckpointRDD子类,创建一个新的CheckpointRDD
val newRDD = doCheckpoint()
// Update our Stage and truncate the RDD lineage
//将checkpoint状态置为Checkpointed状态,并且改变rdd之前的依赖,设置父rdd为新创建的CheckpointRDD
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpStage = Checkpointed
rdd.markCheckpointed()
}
}
1.6:接着来进入ReliableCheckpointRDD中的doCheckpoint()方法
这里面会调用ReliableCheckpointRDD中的writeRDDToCheckpointDirectory方法将rdd的数据写入HDFS中的checkpoint目录,并且返回创建的CheckpointRDD
protected override def doCheckpoint(): CheckpointRDD[T] = {
//将rdd的数据写入HDFS中的checkpoint目录,并且创建的CheckpointRDD
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
// Optionally clean our checkpoint files if the reference is out of scope
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
newRDD
}
1.7:接下来进入ReliableCheckpointRDD的writeRDDToCheckpointDirectory方法
这里面最终会启动一个job,将checkpoint的数据写入到指定的HDFS目录中
//将rdd的数据写入HDFS中checkpoint目录,并且创建CheckpointRDD
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val checkpointStartTimeNs = System.nanoTime()
val sc = originalRDD.sparkContext
//Create the output path for the checkpoint
//创建checkpoint输出目录
val checkpointDirPath = new Path(checkpointDir)
//获取HDFS文件系统API接口
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
//创建目录
if (!fs.mkdirs(checkpointDirPath)) {
throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
}
// Save to file, and reload it as an RDD
//将Hadoop配置文件信息广播到所有节点
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
//这里强调了checkpoint是一个昂贵的操作,主要是说它昂贵在需要沿着血缘关系重新计算该RDD
//重新启动一个job,将rdd的分区数据写入HDFS
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
//如果rdd的partitioner不为空,则将partitioner写入checkpoint目录
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
val checkpointDurationMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
logInfo(s"Checkpointing took $checkpointDurationMs ms.")
//创建一个CheckpointRDD,该RDD的分区数目和原始的rdd的分区数是一样的
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
throw new SparkException(
"Checkpoint RDD has a different number of partitions from original RDD. Original " +
s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
s"${newRDD.partitions.length}].")
}
newRDD
}
执行到这,其实调用过checkpoint方法的RDD就被保存到HDFS上了。
注意:在这里通过checkpoint操作将RDD中的数据写入到HDFS中的时候,会调用RDD中的iterator方法,遍历RDD中所有分区的数据。
那我们来分析一下这块的代码
此时我们没有对RDD进行持久化,所以走else中的代码
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
//如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘
if (storageLevel != StorageLevel.NONE) {
//直接从持久化存储中读取或者计算(如果数据丢失)
getOrCompute(split, context)
} else {
//进行rdd partition的计算或者从checkpoint中读取数据
computeOrReadCheckpoint(split, context)
}
}
进入computeOrReadCheckpoint(split, context)中
此时这个RDD是将要进行checkpoint,还没有完成checkpoint,所以走else,会执行compute方法
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
//当前rdd是否已经checkpoint并且物化,如果已经checkpoint并且物化
//则调用firstParent的iterator方法获取
if (isCheckpointedAndMaterialized) {
//注意:针对checpoint操作,它的血缘关系已经被切断了,那么它的firstParent就是前面创建的CheckpointRDD
firstParent[T].iterator(split, context)
} else {
//如果没有,则表示持久化数据丢失,或者根本就没有持久化,
//需要调用rdd的compute方法开始重新计算,返回一个Iterator对象
compute(split, context)
}
}
在这会执行RDD的子类HadoopRDD中的compute方法
在这里会通过RecordReader获取RDD中指定分区的数据
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
private val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
private val jobConf = getJobConf()
private val inputMetrics = context.taskMetrics().inputMetrics
private val existingBytesRead = inputMetrics.bytesRead
// Sets InputFileBlockHolder for the file block's information
split.inputSplit.value match {
case fs: FileSplit =>
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
case _ =>
InputFileBlockHolder.unset()
}
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}
// We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
private def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}
private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader =
try {
inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
null
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
null
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener[Unit] { context =>
// Update the bytes read before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
updateBytesRead()
closeIfNeeded()
}
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
override def getNext(): (K, V) = {
try {
finished = !reader.next(key, value)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
}
(key, value)
}
override def close(): Unit = {
if (reader != null) {
InputFileBlockHolder.unset()
try {
reader.close()
} catch {
case e: Exception =>
if (!ShutdownHookManager.inShutdown()) {
logWarning("Exception in RecordReader.close()", e)
}
} finally {
reader = null
}
if (getBytesReadCallback.isDefined) {
updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
}
}
}
}
}
new InterruptibleIterator[(K, V)](context, iter)
}
这样经过几次迭代之后就可以获取到RDD中所有分区的数据了,因为这个compute是一次获取一个分区的数据。获取到之后checkpoint就可以把这个RDD的数据存储到HDFS上了。
这就是checkpoint的写操作
那接下来我们来分析一下checkpoint读数据这个操作
当RDD中的数据丢失了以后,需要通过checkpoint读取存储在hdfs上的数据,
2.1:这个时候还是会执行RDD中的iterator方法
由于我们没有做持久化,只做了checkpoint,所以还是会走else
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
//如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘
if (storageLevel != StorageLevel.NONE) {
//直接从持久化存储中读取或者计算(如果数据丢失)
getOrCompute(split, context)
} else {
//进行rdd partition的计算或者从checkpoint中读取数据
computeOrReadCheckpoint(split, context)
}
}
进入computeOrReadCheckpoint方法
此时rdd已经checkpoint并且物化,所以if分支满足
执行firstParent[T].iterator(split, context)这行代码
这行代码的意思是会找到当前这个RDD的父RDD,其实这个RDD执行过checkpoint之后,血缘关系已经被切断了,它的父RDD就是我们前面创建的那个ReliableCheckpointRDD
这个ReliableCheckpointRDD中没有覆盖iterator方法,所以在调用iterator的时候还是执行RDD这个父类中的iterator,重新进来之后再判断,这个ReliableCheckpointRDD再执行if判断的时候就不满足了,因为它的checkpoint属性不满足,所以会走else,执行compute
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
//当前rdd是否已经checkpoint并且物化,如果已经checkpoint并且物化
//则调用firstParent的iterator方法获取
if (isCheckpointedAndMaterialized) {
//注意:针对checpoint操作,它的血缘关系已经被切断了,那么它的firstParent就是前面创建的CheckpointRDD
firstParent[T].iterator(split, context)
} else {
//如果没有,则表示持久化数据丢失,或者根本就没有持久化,
//需要调用rdd的compute方法开始重新计算,返回一个Iterator对象
compute(split, context)
}
}
此时会执行ReliableCheckpointRDD这个子类中的compute方法
这里面就会找到之前checkpoint的文件,从HDFS上恢复RDD中的数据。
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
//获取checkpoint文件
val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
//从HDFS上的checkpoint文件中读取checkpoint的数据
ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}
这是从checkpoint中读取数据的流程
咱们前面说过,建议对需要做checkpoint的数据先进行持久化,如果我们设置了持久化,针对checkpoint的写操作,在执行iterator方法的时候会是什么现象呢?
此时在最后将RDD中的数据通过checkpoint存储到HDFS上的时候,会调用RDD的iterator方法,不过此时storageLevel就不为null了,因为我们对这个RDD做了基于磁盘的持久化,所以会走if分支,执行getOrCompute
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
//如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘
if (storageLevel != StorageLevel.NONE) {
//直接从持久化存储中读取或者计算(如果数据丢失)
getOrCompute(split, context)
} else {
//进行rdd partition的计算或者从checkpoint中读取数据
computeOrReadCheckpoint(split, context)
}
}
进入getOrCompute方法
由于这个RDD的数据已经做了持久化,所以在这就可以从blockmanager中读取数据了,就不需要重新从源头计算或者拉取数据了,所以会提高checkpoint的效率
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
val blockId = RDDBlockId(id, partition.index)
var readCachedBlock = true
// This method is called on executors, so we need call SparkEnv.get instead of sc.env.
SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
}