Home > Archives > Spark基础之Checkpoint与Cache

Spark基础之Checkpoint与Cache

Published on

在RDD的操作中,我们会用到Checkpoint和Cache两种操作,它们主要存在如下差异:

维度\操作 Cache Checkpoint
存储地点 一般是内存中,当然也可以使用磁盘 磁盘
作用时间 每计算一个RDD分区就会缓存一次 当RDD计算结束之后会再次运行任务将每个分区数据写入外部存储中
使用场景

通过缓存分区来减少重复计算; 典型的应用就是对于会被重复计算的RDD进行缓存,当一个RDD需要同时被写入HDFS和数据库中时,就可以先缓存该RDD然后再进行后续操作。

减少依赖维护难度,不同Job之间共享数据; 典型的应用就是在Streaming中,会定期将流相关的状态检出,一方面在出现故障时可以读取之间的状态信息用于恢复,另一方面也可以简化RDD的依赖链

后续影响 原RDD的所有依赖和分区仍然被保留,如果计算失败可以再次进行计算 原RDD的所有依赖和分区都被清除

下面我们来了解一下上述两个操作的大致流程:

缓存(Cache)

RDD缓存就是将分区的计算结果存放在内存或是磁盘中,下次需要计算同样分区的时候直接通过BlockManager去获取,大致分为如下几个流程:

// org.apache.spark.rdd.RDD
def persist(rdd: RDD[T]) {
    // persistRDDs.update(rdd.id, rdd) = 
    persistRDDs(rdd.id) = rdd
}
// org.apache.spark.rdd.RDD
getOrCompute
    SparkEnv.get.blockManager.getOrElseUpdate
        // 如果存在直接获取结果
        get(blockId) match {
          case Some(block) =>
            // Scala中很罕见的return使用啊
            return Left(block)
          case _ =>
            // Need to compute the block.
        }
        // 不存在,则进行计算再存储
        doPutIterator(blockId, makeIterator, ....)

检出(Checkpoint)

在RDD计算结束之后会将所有分区依次写入到可靠的外部存储中(reliable system, such as HDFS),一般是由于RDD之间的依赖链太长或是需要在不同的Job之间共享。

import org.apache.spark.rdd.RDD

private[spark] var checkPointData: Option[RDDCheckpointData[T]] = None
// RDD调用该方法的时候,给checkPointData赋值, 此时并没有真正的检出,而是设置了一种状态。
// 因为在该方法被调用的时候,RDD刚刚被构建好,所以计算逻辑还没有执行
def checkpoint() {
    ...
    checkPointData = Some(new ReliableRDDCheckpointData(this))
    ...
}

// Job执行完成之后,开始对checkPointData进行处理
def runJob[T, U: ClassTag](...): Unit = {
    dagScheduler.runJob()
    ...
    rdd.doCheckpoint()
}

在1.6中,Checkpoint操作会导致两次RDD计算,一次是通过runJob计算RDD,一次是计算结束后运行另外一个Job将该RDD的内容写到HDFS中。具体可以参考SPARK-8582。所以使用时建议先将该RDD缓存到内存中,然后再checkpoint

关于重复计算的问题org.apache.spark.rdd.ReliableCheckpointRDD的writeRDDToCheckpointDirectory方法中也有提到:

// org.apache.spark.rdd.ReliableCheckpointRDD
def writeRDDToCheckpointDirectory
    // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    sc.runJob(originalRDD,
        writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

Checkpoint大致可以分为如下三个流程:

而关于checkpointed RDD的获取,在每次计算的时候都会先尝试去获取,具体的逻辑可以参考org.apache.spark.rdd.RDD.iterator方法。

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方