0
点赞
收藏
分享

微信扫一扫

Spark12:checkpoint

中间件小哥 2022-03-12 阅读 73

一、checkpoint概述

checkpoint,是Spark提供的一个比较高级的功能。有时候,我们的Spark任务,比较复杂,从初始化RDD开始,到最后整个任务完成,有比较多的步骤,比如超过10个transformation算子。而且,整个任务运行的时间也特别长,比如通常要运行1~2个小时。

在这种情况下,就比较适合使用checkpoint功能了。
因为对于特别复杂的Spark任务,有很高的风险会出现某个要反复使用的RDD因为节点的故障导致丢失,虽然之前持久化过,但是还是导致数据丢失了。那么也就是说,出现失败的时候,没有容错机制,所以当后面的transformation算子,又要使用到该RDD时,就会发现数据丢失了,此时如果没有进行容错处理的话,那么就需要再重新计算一次数据了。

所以针对这种Spark Job,如果我们担心某些关键的,在后面会反复使用的RDD,因为节点故障导致数据丢失,那么可以针对该RDD启动checkpoint机制,实现容错和高可用

那如何使用checkPoint呢?

首先要调用SparkContext的setCheckpointDir()方法,设置一个容错的文件系统的目录,比如HDFS;然后,对RDD调用checkpoint()方法。
最后,在RDD所在的job运行结束之后,会启动一个单独的job,将checkpoint设置过的RDD的数据写入之前设置的文件系统中。

这是checkpoint使用的基本步骤,很简单,那我们下面先从理论层面分析一下当我们设置好checkpoint之后,Spark底层都做了哪些事情

二、RDD之checkpoint流程

看这个图

在这里插入图片描述

1:SparkContext设置checkpoint目录,用于存放checkpoint的数据;
对RDD调用checkpoint方法,然后它就会被RDDCheckpointData对象进行管理,此时这个RDD的checkpoint状态会被设置为Initialized

2:待RDD所在的job运行结束,会调用job中最后一个RDD的doCheckpoint方法,该方法沿着RDD的血缘关系向上查找被checkpoint()方法标记过的RDD,并将其checkpoint状态从Initialized设置为CheckpointingInProgress

3:启动一个单独的job,来将血缘关系中标记为CheckpointInProgress的RDD执行checkpoint操作,也就是将其数据写入checkpoint目录

4:将RDD数据写入checkpoint目录之后,会将RDD状态改变为Checkpointed;
并且还会改变RDD的血缘关系,即会清除掉RDD所有依赖的RDD;
最后还会设置其父RDD为新创建的CheckpointRDD

三、checkpoint与持久化的区别

那这里所说的checkpoint和我们之前讲的RDD持久化有什么区别吗?

1、lineage是否发生改变

linage(血缘关系)说的就是RDD之间的依赖关系
持久化,只是将数据保存在内存中或者本地磁盘文件中,RDD的lineage(血缘关系)是不变的;Checkpoint执行之后,RDD就没有依赖的RDD了,也就是它的lineage改变了

2、丢失数据的可能性

持久化的数据丢失的可能性较大,如果采用 persist 把数据存在内存中的话,虽然速度最快但是也是最不可靠的,就算放在磁盘上也不是完全可靠的,因为磁盘也会损坏。
Checkpoint的数据通常是保存在高可用文件系统中(HDFS),丢失的可能性很低

建议:对需要checkpoint的RDD,先执行persist(StorageLevel.DISK_ONLY)
为什么呢?

因为默认情况下,如果某个RDD没有持久化,但是设置了checkpoint,那么这个时候,本来Spark任务已经执行结束了,但是由于中间的RDD没有持久化,在进行checkpoint的时候想要将这个RDD的数据写入外部存储系统的话,就需要重新计算这个RDD的数据,再将其checkpoint到外部存储系统中。

如果对需要checkpoint的rdd进行了基于磁盘的持久化,那么后面进行checkpoint操作时,就会直接从磁盘上读取rdd的数据了,就不需要重新再计算一次了,这样效率就高了。

那在这能不能使用基于内存的持久化呢?当然是可以的,不过没那个必要。

四、checkPoint的使用

那我们来演示一下:将一个RDD的数据持久化到HDFS上面

1、scala代码如下:

package com.imooc.scala

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:checkpoint的使用
 * 
 */
object CheckPointOpScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("CheckPointOpScala")
    val sc = new SparkContext(conf)
    if(args.length==0){
      System.exit(100)
    }

    val outputPath = args(0)

    //1:设置checkpint目录
    sc.setCheckpointDir("hdfs://bigdata01:9000/chk001")

    val dataRDD = sc.textFile("hdfs://bigdata01:9000/hello_10000000.dat")
    //2:对rdd执行checkpoint操作
    dataRDD.checkpoint()
    dataRDD.flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_ + _)
      .saveAsTextFile(outputPath)

    sc.stop()

  }

}

2、java代码如下:

package com.imooc.java;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * 需求:checkpoint的使用
 * 
 */
public class CheckPointOpJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("CheckPointOpJava");
        JavaSparkContext sc = new JavaSparkContext(conf);

        if(args.length==0){
            System.exit(100);
        }

        String outputPath = args[0];

        //1:设置checkpint目录
        sc.setCheckpointDir("hdfs://bigdata01:9000/chk002");

        JavaRDD<String> dataRDD = sc.textFile("hdfs://bigdata01:9000/hello_10000000.dat");
        //2: 对rdd执行checkpoint操作
        dataRDD.checkpoint();

        dataRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word,1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        }).saveAsTextFile(outputPath);

        sc.stop();
    }
}

3、验证效果

下面我们把这个任务打包提交到集群上运行一下,看一下效果。
先确保hadoop集群是正常运行的,以及hadoop中的historyserver进程和spark的historyserver进程也是正常运行的。

(1)数据准备

测试数据之前已经上传到了hdfs上面,如果没有则需要上传

[root@bigdata01 soft]# hdfs dfs -ls /hello_10000000.dat
-rw-r--r--   2 root supergroup 1860100000 2020-04-28 22:15 /hello_10000000.dat

(2)打包前配置

将pom.xml中的spark-core的依赖设置为provided,然后编译打包

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.3</version>
    <scope>provided</scope>
</dependency>

(3)打包

mvn clean package -DskipTests

将打包的jar包上传到bigdata04的/data/soft/sparkjars目录,创建一个新的spark-submit脚本

[root@bigdata04 sparkjars]# cp wordCountJob.sh checkPointJob.sh
[root@bigdata04 sparkjars]# vi checkPointJob.sh 
spark-submit \
--class com.imooc.scala.CheckPointOpScala \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 1 \
db_spark-1.0-SNAPSHOT-jar-with-dependencies.jar \
/out-chk001

提交任务

[root@bigdata04 sparkjars]# sh -x checkPointJob.sh 

执行成功之后可以到setCheckpointDir指定的目录中查看一下,可以看到目录中会生成对应的文件保存rdd中的数据,只不过生成的文件不是普通文本文件,直接查看文件中的内容显示为乱码。

[root@bigdata04 sparkjars]# hdfs dfs -ls /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1
Found 14 items
-rw-r--r--   2 root supergroup  134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00000
-rw-r--r--   2 root supergroup  134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00001
-rw-r--r--   2 root supergroup  134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00002
-rw-r--r--   2 root supergroup  134946420 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00003
-rw-r--r--   2 root supergroup  134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00004
-rw-r--r--   2 root supergroup  134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00005
-rw-r--r--   2 root supergroup  134946420 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00006
-rw-r--r--   2 root supergroup  134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00007
-rw-r--r--   2 root supergroup  134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00008
-rw-r--r--   2 root supergroup  134946420 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00009
-rw-r--r--   2 root supergroup  134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00010
-rw-r--r--   2 root supergroup  134946607 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00011
-rw-r--r--   2 root supergroup  134946420 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00012
-rw-r--r--   2 root supergroup  115894912 2020-05-27 15:28 /chk001/ac228425-3249-467d-9e77-f65384ed07c8/rdd-1/part-00013

接下来进到YARN的8088界面查看
点击Tracking UI进入spark的ui界面
看第一个界面jobs

在这里插入图片描述
在这可以看出来产生了2个job,
第一个job是我们正常的任务执行,执行了39s,一共产生了28个task任务
第二个job是checkpoint启动的job,执行了35s,一共产生了14个task任务

看第二个界面Stages,这里面的3个Stage是前面2个job产生的

在这里插入图片描述
具体想知道哪些Stage属于哪个job任务的话,可以在任务界面,点击Description中的链接就可以看到job对应的Stage

先看第一个job的Stage

在这里插入图片描述
在这里插入图片描述
第一个job其实就是我们实现的单词计数的功能,这个任务产生了两个stage,这两个stage具体是如何划分的呢?
咱们前面讲过,stage的划分是由宽依赖决定的,在这个任务中reduceByKey这个过程会产生宽依赖,所以会产生2个Stage

这里面显示的有这两个stage的一些基本信息

在这里插入图片描述
stage id:stage的编号,从0开始
Duration:stage执行消耗的时间
Tasks:Successed/Total:task执行成功数量/task总量
Input:输入数据量
ouput:输出数据量
shuffle read/shuffle read:shuffle过程传输数据量

点击这个界面中的DAG Visualization可以看到当前这个任务stage的划分情况,可以看到每个Stage包含哪些算子

在这里插入图片描述
进到Stage内部看一下
在这里插入图片描述
在这里插入图片描述
下面可以看到每个task的具体执行情况,执行状态,执行消耗的时间,GC消耗的时间,处理的数据量和数据条数、通过shuffle输出的数据量和数据条数
其实从这里也可以看出来文件的每一个block块会产生一个task

在这里插入图片描述
这就是这个Stage执行的基本信息了。

加下来看一下第二个Job,这个job是checkpoint启动的任务,查看它的stage的信息
在这里插入图片描述
这个job只会产生一个stage,因为我们只针对textFile的结果设置了checkpoint

在这里插入图片描述
这个stage执行消耗了35s,说明这份数据是重新通过textFile读取过来的。

针对Storage这块,显示的其实就是持久化的数据,如果对RDD做了持久化,那么在任务执行过程中能看到,任务执行结束就看不到了。

在这里插入图片描述
下面我们来验证一下在开启持久化的情况下执行checkpoint操作时的区别
在代码中针对RDD开启持久化
1:对比此时产生的两个job总的消耗的时间,以及job中的Stage消耗的时间
其实你会发现开启持久化之后,checkpoint的那个job消耗的时间就变少了

在这里插入图片描述
2:查看DAG Visualization,你会发现stage里面也会有有一些不一样的地方
在这里插入图片描述

五、checkpoint源码分析

前面我们通过理论层面分析了checkpoint的原理,以及演示了checkpoint的使用
下面我们通过源码层面来对我们前面分析的理论进行验证
先下载spark源码,下载流程和下载spark安装包的流程一样
在这里插入图片描述
把下载的安装包解压到idea项目目录中

D:\IdeaProjects\spark-2.4.3

打开spark-2.4.3源码目录,进入core目录,这个是spark的核心代码,我们要查看的checkpoint的源码就在这个项目中
在idea中打开core这个子项目
在这里插入图片描述
下面我们就来分析一下RDD的checkpoint功能:
checkpoint功能可以分为两块
1:checkpoint的写操作
将指定RDD的数据通过checkpoint存储到指定外部存储中
2:checkpoint的读操作
任务中RDD数据在使用过程中丢失了,正好这个RDD之前做过checkpoint,所以这时就需要通过checkpoint来恢复数据

先看checkpoint的写操作
1.1:当我们在自己开发的spark任务中先调用sc.setCheckpointDir时,底层其实就会调用SparkContext中的setCheckpointDir方法

def setCheckpointDir(directory: String) {

  // If we are running on a cluster, log a warning if the directory is local.
  // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
  // its own local file system, which is incorrect because the checkpoint files
  // are actually on the executor machines.
  if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
    logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
      s"must not be on the local filesystem. Directory '$directory' " +
      "appears to be on the local filesystem.")
  }
  //根据我们传过来的目录,后面再拼上一个子目录,子目录使用一个UUID随机字符串
  //使用HDFS的javaAPI 在HDFS上创建目录
  checkpointDir = Option(directory).map { dir =>
    val path = new Path(dir, UUID.randomUUID().toString)
    val fs = path.getFileSystem(hadoopConfiguration)
    fs.mkdirs(path)
    fs.getFileStatus(path).getPath.toString
  }
}

1.2:接着我们会调用RDD.checkpoint方法,此时会执行RDD这个class中的checkpoint方法

//这里相当于是checkpoint的一个标记,并没有真正执行checkpoint
def checkpoint(): Unit = RDDCheckpointData.synchronized {
  // NOTE: we use a global lock here due to complexities downstream with ensuring
  // children RDD partitions point to the correct parent partitions. In the future
  // we should revisit this consideration.
  //如果SparkContext没有设置checkpointDir,则抛出异常
  if (context.checkpointDir.isEmpty) {
    throw new SparkException("Checkpoint directory has not been set in the SparkContext")
  } else if (checkpointData.isEmpty) {
    //如果设置了,则创建RDDCheckpointData的子类,这个子类主要负责管理RDD的checkpoint的进程和状态等
    //并且会初始化checkpoint状态为Initialized
    checkpointData = Some(new ReliableRDDCheckpointData(this))
  }
}

这个checkpoint方法执行完成之后,这个流程就结束了。

1.3:剩下的就是在这个设置了checkpint的RDD所在的job执行结束之后,Spark会调用job中最后一个RDD的doCheckpoint方法
这个逻辑是在SparkContext这个class的runJob方法中,当执行到Spark中的action算子时,这个runJob方法会被触发,开始执行任务。
这个runJob的最后一行会调用rdd中的doCheckpoint方法

//在有action动作时,会触发sparkcontext对runJob的调用
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStageException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  //在这里会执行doCheckpoint()
  rdd.doCheckpoint()
}

1.4:接着会进入到RDD中的doCheckpoint方法
这里面最终会调用RDDCheckpointData的checkpoint方法
checkpointData.get.checkpoint()

private[spark] def doCheckpoint(): Unit = {
  RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
    //该rdd是否已经调用doCheckpoint,如果还没有,则开始处理
    if (!doCheckpointCalled) {
      doCheckpointCalled = true
      //若已经被checkpoint()标记过,则checkpointData.isDefined为true
      if (checkpointData.isDefined) {
        //查看是否需要把该rdd的所有依赖全部checkpoint
        //checkpointAllMarkedAncestors取自配置"spark.checkpoint.checkpointAllMarkedAncestors",
        //默认不配时值为false
        if (checkpointAllMarkedAncestors) {
          // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
          // them in parallel.
          // Checkpoint parents first because our lineage will be truncated after we
          // checkpoint ourselves
          // 血缘上的每一个父rdd递归调用该方法
          dependencies.foreach(_.rdd.doCheckpoint())
        }
        //调用RDDCheckpointData的checkpoint方法
        checkpointData.get.checkpoint()
      } else {
        //沿着rdd的血缘关系向上查找被checkpoint()标记过的RDD
        dependencies.foreach(_.rdd.doCheckpoint())
      }
    }
  }
}

1.5:接下来进入到RDDCheckpointData的checkpoint方法中
这里面会调用子类ReliableCheckpointRDD中的doCheckpoint()方法

final def checkpoint(): Unit = {
  // Guard against multiple threads checkpointing the same RDD by
  // atomically flipping the Stage of this RDDCheckpointData
  将checkpoint的状态从Initialized置为CheckpointingInProgress
  RDDCheckpointData.synchronized {
    if (cpStage == Initialized) {
      cpStage = CheckpointingInProgress
    } else {
      return
    }
  }
  //调用子类的doCheckpoint,默认会使用ReliableCheckpointRDD子类,创建一个新的CheckpointRDD
  val newRDD = doCheckpoint()

  // Update our Stage and truncate the RDD lineage
  //将checkpoint状态置为Checkpointed状态,并且改变rdd之前的依赖,设置父rdd为新创建的CheckpointRDD
  RDDCheckpointData.synchronized {
    cpRDD = Some(newRDD)
    cpStage = Checkpointed
    rdd.markCheckpointed()
  }
}

1.6:接着来进入ReliableCheckpointRDD中的doCheckpoint()方法
这里面会调用ReliableCheckpointRDD中的writeRDDToCheckpointDirectory方法将rdd的数据写入HDFS中的checkpoint目录,并且返回创建的CheckpointRDD

protected override def doCheckpoint(): CheckpointRDD[T] = {
  //将rdd的数据写入HDFS中的checkpoint目录,并且创建的CheckpointRDD
  val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)

  // Optionally clean our checkpoint files if the reference is out of scope
  if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
    rdd.context.cleaner.foreach { cleaner =>
      cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
    }
  }


  logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
  newRDD
}

1.7:接下来进入ReliableCheckpointRDD的writeRDDToCheckpointDirectory方法
这里面最终会启动一个job,将checkpoint的数据写入到指定的HDFS目录中

 //将rdd的数据写入HDFS中checkpoint目录,并且创建CheckpointRDD
def writeRDDToCheckpointDirectory[T: ClassTag](
    originalRDD: RDD[T],
    checkpointDir: String,
    blockSize: Int = -1): ReliableCheckpointRDD[T] = {
  val checkpointStartTimeNs = System.nanoTime()

  val sc = originalRDD.sparkContext

  //Create the output path for the checkpoint
  //创建checkpoint输出目录
  val checkpointDirPath = new Path(checkpointDir)
  //获取HDFS文件系统API接口
  val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
  //创建目录
  if (!fs.mkdirs(checkpointDirPath)) {
    throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
  }

  // Save to file, and reload it as an RDD
  //将Hadoop配置文件信息广播到所有节点
  val broadcastedConf = sc.broadcast(
    new SerializableConfiguration(sc.hadoopConfiguration))
  // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
  //这里强调了checkpoint是一个昂贵的操作,主要是说它昂贵在需要沿着血缘关系重新计算该RDD
  //重新启动一个job,将rdd的分区数据写入HDFS
  sc.runJob(originalRDD,
    writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
  //如果rdd的partitioner不为空,则将partitioner写入checkpoint目录
  if (originalRDD.partitioner.nonEmpty) {
    writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
  }
  val checkpointDurationMs =
    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
  logInfo(s"Checkpointing took $checkpointDurationMs ms.")
  //创建一个CheckpointRDD,该RDD的分区数目和原始的rdd的分区数是一样的
  val newRDD = new ReliableCheckpointRDD[T](
    sc, checkpointDirPath.toString, originalRDD.partitioner)
  if (newRDD.partitions.length != originalRDD.partitions.length) {
    throw new SparkException(
      "Checkpoint RDD has a different number of partitions from original RDD. Original " +
        s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
        s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
        s"${newRDD.partitions.length}].")
  }
  newRDD
}

执行到这,其实调用过checkpoint方法的RDD就被保存到HDFS上了。

注意:在这里通过checkpoint操作将RDD中的数据写入到HDFS中的时候,会调用RDD中的iterator方法,遍历RDD中所有分区的数据。

那我们来分析一下这块的代码
此时我们没有对RDD进行持久化,所以走else中的代码

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  //如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘
  if (storageLevel != StorageLevel.NONE) {
    //直接从持久化存储中读取或者计算(如果数据丢失)
    getOrCompute(split, context)
  } else {
    //进行rdd partition的计算或者从checkpoint中读取数据
    computeOrReadCheckpoint(split, context)
  }
}

进入computeOrReadCheckpoint(split, context)中
此时这个RDD是将要进行checkpoint,还没有完成checkpoint,所以走else,会执行compute方法

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  //当前rdd是否已经checkpoint并且物化,如果已经checkpoint并且物化
  //则调用firstParent的iterator方法获取
  if (isCheckpointedAndMaterialized) {
    //注意:针对checpoint操作,它的血缘关系已经被切断了,那么它的firstParent就是前面创建的CheckpointRDD
    firstParent[T].iterator(split, context)
  } else {
    //如果没有,则表示持久化数据丢失,或者根本就没有持久化,
    //需要调用rdd的compute方法开始重新计算,返回一个Iterator对象
    compute(split, context)
  }
}

在这会执行RDD的子类HadoopRDD中的compute方法
在这里会通过RecordReader获取RDD中指定分区的数据

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
  val iter = new NextIterator[(K, V)] {

    private val split = theSplit.asInstanceOf[HadoopPartition]
    logInfo("Input split: " + split.inputSplit)
    private val jobConf = getJobConf()

    private val inputMetrics = context.taskMetrics().inputMetrics
    private val existingBytesRead = inputMetrics.bytesRead

    // Sets InputFileBlockHolder for the file block's information
    split.inputSplit.value match {
      case fs: FileSplit =>
        InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
      case _ =>
        InputFileBlockHolder.unset()
    }

    // Find a function that will return the FileSystem bytes read by this thread. Do this before
    // creating RecordReader, because RecordReader's constructor might read some bytes
    private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
      case _: FileSplit | _: CombineFileSplit =>
        Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
      case _ => None
    }

    // We get our input bytes from thread-local Hadoop FileSystem statistics.
    // If we do a coalesce, however, we are likely to compute multiple partitions in the same
    // task and in the same thread, in which case we need to avoid override values written by
    // previous partitions (SPARK-13071).
    private def updateBytesRead(): Unit = {
      getBytesReadCallback.foreach { getBytesRead =>
        inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
      }
    }

    private var reader: RecordReader[K, V] = null
    private val inputFormat = getInputFormat(jobConf)
    HadoopRDD.addLocalConfiguration(
      new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
      context.stageId, theSplit.index, context.attemptNumber, jobConf)

    reader =
      try {
        inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
      } catch {
        case e: FileNotFoundException if ignoreMissingFiles =>
          logWarning(s"Skipped missing file: ${split.inputSplit}", e)
          finished = true
          null
        // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
        case e: FileNotFoundException if !ignoreMissingFiles => throw e
        case e: IOException if ignoreCorruptFiles =>
          logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
          finished = true
          null
      }
    // Register an on-task-completion callback to close the input stream.
    context.addTaskCompletionListener[Unit] { context =>
      // Update the bytes read before closing is to make sure lingering bytesRead statistics in
      // this thread get correctly added.
      updateBytesRead()
      closeIfNeeded()
    }

    private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
    private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()

    override def getNext(): (K, V) = {
      try {
        finished = !reader.next(key, value)
      } catch {
        case e: FileNotFoundException if ignoreMissingFiles =>
          logWarning(s"Skipped missing file: ${split.inputSplit}", e)
          finished = true
        // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
        case e: FileNotFoundException if !ignoreMissingFiles => throw e
        case e: IOException if ignoreCorruptFiles =>
          logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
          finished = true
      }
      if (!finished) {
        inputMetrics.incRecordsRead(1)
      }
      if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
        updateBytesRead()
      }
      (key, value)
    }

    override def close(): Unit = {
      if (reader != null) {
        InputFileBlockHolder.unset()
        try {
          reader.close()
        } catch {
          case e: Exception =>
            if (!ShutdownHookManager.inShutdown()) {
              logWarning("Exception in RecordReader.close()", e)
            }
        } finally {
          reader = null
        }
        if (getBytesReadCallback.isDefined) {
          updateBytesRead()
        } else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
                   split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
          // If we can't get the bytes read from the FS stats, fall back to the split size,
          // which may be inaccurate.
          try {
            inputMetrics.incBytesRead(split.inputSplit.value.getLength)
          } catch {
            case e: java.io.IOException =>
              logWarning("Unable to get input size to set InputMetrics for task", e)
          }
        }
      }
    }
  }
  new InterruptibleIterator[(K, V)](context, iter)
}

这样经过几次迭代之后就可以获取到RDD中所有分区的数据了,因为这个compute是一次获取一个分区的数据。获取到之后checkpoint就可以把这个RDD的数据存储到HDFS上了。

这就是checkpoint的写操作

那接下来我们来分析一下checkpoint读数据这个操作
当RDD中的数据丢失了以后,需要通过checkpoint读取存储在hdfs上的数据,
2.1:这个时候还是会执行RDD中的iterator方法
由于我们没有做持久化,只做了checkpoint,所以还是会走else

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  //如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘
  if (storageLevel != StorageLevel.NONE) {
    //直接从持久化存储中读取或者计算(如果数据丢失)
    getOrCompute(split, context)
  } else {
    //进行rdd partition的计算或者从checkpoint中读取数据
    computeOrReadCheckpoint(split, context)
  }
}

进入computeOrReadCheckpoint方法
此时rdd已经checkpoint并且物化,所以if分支满足
执行firstParent[T].iterator(split, context)这行代码
这行代码的意思是会找到当前这个RDD的父RDD,其实这个RDD执行过checkpoint之后,血缘关系已经被切断了,它的父RDD就是我们前面创建的那个ReliableCheckpointRDD
这个ReliableCheckpointRDD中没有覆盖iterator方法,所以在调用iterator的时候还是执行RDD这个父类中的iterator,重新进来之后再判断,这个ReliableCheckpointRDD再执行if判断的时候就不满足了,因为它的checkpoint属性不满足,所以会走else,执行compute

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  //当前rdd是否已经checkpoint并且物化,如果已经checkpoint并且物化
  //则调用firstParent的iterator方法获取
  if (isCheckpointedAndMaterialized) {
    //注意:针对checpoint操作,它的血缘关系已经被切断了,那么它的firstParent就是前面创建的CheckpointRDD
    firstParent[T].iterator(split, context)
  } else {
    //如果没有,则表示持久化数据丢失,或者根本就没有持久化,
    //需要调用rdd的compute方法开始重新计算,返回一个Iterator对象
    compute(split, context)
  }
}

此时会执行ReliableCheckpointRDD这个子类中的compute方法
这里面就会找到之前checkpoint的文件,从HDFS上恢复RDD中的数据。

override def compute(split: Partition, context: TaskContext): Iterator[T] = {
  //获取checkpoint文件
  val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
  //从HDFS上的checkpoint文件中读取checkpoint的数据
  ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}

这是从checkpoint中读取数据的流程

咱们前面说过,建议对需要做checkpoint的数据先进行持久化,如果我们设置了持久化,针对checkpoint的写操作,在执行iterator方法的时候会是什么现象呢?

此时在最后将RDD中的数据通过checkpoint存储到HDFS上的时候,会调用RDD的iterator方法,不过此时storageLevel就不为null了,因为我们对这个RDD做了基于磁盘的持久化,所以会走if分支,执行getOrCompute

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  //如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘
  if (storageLevel != StorageLevel.NONE) {
    //直接从持久化存储中读取或者计算(如果数据丢失)
    getOrCompute(split, context)
  } else {
    //进行rdd partition的计算或者从checkpoint中读取数据
    computeOrReadCheckpoint(split, context)
  }
}

进入getOrCompute方法
由于这个RDD的数据已经做了持久化,所以在这就可以从blockmanager中读取数据了,就不需要重新从源头计算或者拉取数据了,所以会提高checkpoint的效率

private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
  val blockId = RDDBlockId(id, partition.index)
  var readCachedBlock = true
  // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
  SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
    readCachedBlock = false
    computeOrReadCheckpoint(partition, context)
  }) match {
    case Left(blockResult) =>
      if (readCachedBlock) {
        val existingMetrics = context.taskMetrics().inputMetrics
        existingMetrics.incBytesRead(blockResult.bytes)
        new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
          override def next(): T = {
            existingMetrics.incRecordsRead(1)
            delegate.next()
          }
        }
      } else {
        new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
      }
    case Right(iter) =>
      new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
  }
}
举报

相关推荐

0 条评论