源码阅读 (7)分布式缓存
在Spark中要缓存中间结果时我们会调用RDD的cache方法。那我们来看一下RDD中的cache方法。
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
那我们来看一下persist方法。
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet. Local checkpointing is an exception.
*/
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with
// one that is explicitly requested by the user (after adapting it to use disk).
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
/**
* Mark this RDD for persisting using the specified level.
*
* @param newLevel the target storage level
* @param allowOverride whether to override any existing level with the new one
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
// 如果这是第一次标记这个RDD是持久化的,那么为了清理与核算,
// 在SparkContext中为它注册。只执行一次。
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
//调用SparkContext去缓存这个RDD
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
SparkContext中的persistRDD代码: ~~~ scala /** * Register an RDD to be persisted in memory and/or disk storage */ private[spark] def persistRDD(rdd: RDD[_]) { _executorAllocationManager.foreach { _ => logWarning( s”Dynamic allocation currently does not support cached RDDs. Cached data for RDD “ + s”${rdd.id} will be lost when executors are removed.”) } persistentRdds(rdd.id) = rdd } ~~~
其中persistentRdds是一个map
private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
现在只是声明这个RDD需要缓存,具体需要到RDD执行的时候才能被缓存。入口是Task的runTask方法。
以ResultTask为例 ~~~ scala override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize(RDD[T], (TaskContext, Iterator[T]) => U) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics) func(context, rdd.iterator(partition, context)) } ~~~
它调用的RDD的iterator方法
/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
如果设置的storageLevel那么就从CacheManager中取数据。那么我们就来看看CacheManager中的getOrCompute方法:
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
// 已经缓存好了,更新度量信息后直接返回
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)
val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
// 获取加载当前分区的锁,如果已经有其他线程获取到了锁,那么等它结束后获取它的结果
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}
// Otherwise, we have to load the partition ourselves
// 不然,我们就得自己加载这个分区。
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
// If the task is running locally, do not persist the result
// 如果任务是在本地运行的,那么就不需要持久化结果
if (context.isRunningLocally) {
return computedValues
}
// Otherwise, cache the values and keep track of any updates in block statuses
// 不然,缓存结果,并跟踪块的任何信息变更。
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}
这里使用putInBlockManager把计算结果缓存如blockManager。
blog comments powered by Disqus