09 September 2015

###Stage生成 Stage的调度是由DAGScheduler承担的。由RDD的DAG切分出了Stage的DAG。Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始的Stage再执行。如果提交的Stage有仍未完成的父Stage,那么Stage需要等待其完成才能执行。
DAGScheduler中还维护了几个重要的Key-Value集合结构,用来记录Stage的状态,这样能够避免过早执行或重复提交。

waitingStages 记录仍有未执行父Stage的Stage,防止过早执行
runningStages 保存正在执行的Stage,防止重复执行
failedStages 保存执行失败Stage,需要重复执行

// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]

// Stages we are running right now
private[scheduler] val runningStages = new HashSet[Stage]

// Stages that must be resubmitted due to fetch failures
private[scheduler] val failedStages = new HashSet[Stage]

###依赖关系 调度会计算RDD之间的依赖关系,将拥有持续窄依赖的RDD归并到同一个Stage中,而宽依赖则作为划分不同Stage的标准。

###Stage分类

####ShuffleMapStage 非最终Stage,后面还有其他Stage。他的输出需要shuffle之后才能作为后续输入。 以shuffle为输出边界。其数据可以从外部获取,也可以是另一个ShuffleMapStage的输出。 最后的Task是ShuffleMapTask。 一个Job中可能有也可能没有这种Stage

####ResultStage 最终Stage,没有输出,直接产生结果或存储。 直接输出结果,其数据可以从外部获取,也可以是ShuffleMapStage的输出。 最后的Task是ResultTask 一个Job中必须有这种Stage

###Stage划分 RDD转换本身存在ShuffleDependency,像ShuffleRDD,CoGroupRDD,SubtractedRDD会返回ShuffleDependency。 如果RDD中存在ShuffleDependency,就会创建一个新的Stage。

Stage划分完毕就明确了一下东西:

产生的Stage需要从多少个Partition中读取数据
产生的Stage会生产多少个Partition
产生的Stage是否属于ShuffleMap类型

Partition用来决定需要产生多少不同的Task,是否为ShuffleMap类型用来确定生成的Task类型。 Spark有两种Task:
1. ShuffleMapTask
2. ResultTask

###Stage类 因为在一个Stage中所有RDD都是map,parition不会有任何改变,所以Stage类中只有一个(而不是一系列)RDD参数。(只在data上依次执行不同的map function)

private[spark] abstract class Stage(
    val id: Int,      //Stage序号,数值越大,优先级越高
    val rdd: RDD[_],  //归属于本Stage的最后一个rdd
    val numTasks: Int,        //创建的Task数目,等于父RDD的输出Partition数目
    val parents: List[Stage],  //父Stage列表
    val firstJobId: Int,        //作业ID
    val callSite: CallSite)
  extends Logging {

###Job处理
1. 分割Job为Stage 2. 封装Stage成TaskSet 3. 提交给TaskScheduler

####DAGScheduler的handleJobSubmited、submitStage函数 这两个函数主要负责依赖分析,对其处理逻辑做进一步的分析。 handleJobSubmited的主要工作是生产Stage,并根据finalStage来产生ActiveJob。

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
  var finalStage: ResultStage = null
  try {
    // New stage creation may throw an exception if, for example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite)
  } catch {
    //错误处理,告诉监听器作业失败,并返回
    case e: Exception =>
      logWarning("Creating new stage failed due to exception - job: " + jobId, e)
      listener.jobFailed(e)
      return
  }
  if (finalStage != null) {
    //创建ActiveJob
    val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))
    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.resultOfJob = Some(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    //提交stage
    submitStage(finalStage)
  }
  //提交stage
  submitWaitingStages()
}

####newResultStage函数 ~~~ scala /** * Create a ResultStage associated with the provided jobId. */ private def newResultStage( rdd: RDD[_], numTasks: Int, jobId: Int, callSite: CallSite): ResultStage = { val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)

stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage } ~~~ 创建Stage需要知道该Stage需要从多少Partition读入数据,这个数字直接影响需要创建多少个Task。也就是说创建Stage的时候就已经清楚该Stage需要从多少个不同的Partition读入数据,并写到多少个Partition中,输入输出的个数都已经明确。 ~~~ scala /** * Helper function to eliminate some code re-use when creating new stages. */ private def getParentStagesAndId(rdd: RDD[], firstJobId: Int): (List[Stage], Int) = { val parentStages = getParentStages(rdd, firstJobId) val id = nextStageId.getAndIncrement() (parentStages, id) } ~~~ ####getParentStages函数 ~~~ scala /** * Get or create the list of parent stages for a given RDD. The new Stages will be created with * the provided firstJobId. */ private def getParentStages(rdd: RDD[], firstJobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[]] def visit(r: RDD[]) { if (!visited(r)) { visited += r // Kind of ugly: need to register RDDs with the cache here since // we can’t do it in its constructor because # of partitions is unknown for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[, _, _] => parents += getShuffleMapStage(shufDep, firstJobId) case _ => waitingForVisit.push(dep.rdd) } } } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } parents.toList } ~~~ 通过不停的遍历他之前的RDD,如果碰到有依赖是ShuffleMapDependency类型的,就通过getShuffleMapStage方法计算出它的Stage来。

###ActiveJob类

用户所提交的job在得到DAGScheduler的调度后,会被 ~~~ scala val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) ~~~ 包装成ActiveJob,同时会启动JobWaiter阻塞监听job的完成状况。
~~~ scala private[spark] class ActiveJob( val jobId: Int, val finalStage: ResultStage, val func: (TaskContext, Iterator[_]) => _, val partitions: Array[Int], val callSite: CallSite, val listener: JobListener, val properties: Properties) {

val numPartitions = partitions.length val finished = Array.fillBoolean(false) var numFinished = 0 } ~~~

依据job中RDD的dependency和dependency属性(NarrowDependency,ShufflerDependecy),DAGScheduler会根据依赖关系的先后产生出不同的stage DAG(result stage, shuffle map stage)。 在每一个stage内部,根据stage产生出相应的task,包括ResultTask或是ShuffleMapTask,这些task会根据RDD中partition的数量和分布,产生出一组相应的task,并将其包装为TaskSet提交到TaskScheduler上去。

###submitStage函数

submitStage函数中会根据依赖关系划分stage,通过递归调用从finalStage一直往前找它的父stage,直到stage没有父stage时就调用submitMissingTasks方法提交改stage。这样就完成了将job划分为一个或者多个stage。

submitStage处理流程:

所依赖的Stage是否都已经完成,如果没有完成则先执行所依赖的Stage 如果所有的依赖已经完成,则提交自身所处的Stage 最后会在submitMissingTasks函数中将stage封装成TaskSet通过taskScheduler.submitTasks函数提交给TaskScheduler处理。

  /** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)  // 如果没有parent stage需要执行, 则直接submit当前stage的task
        } else {
          for (parent <- missing) {
            submitStage(parent) // 提交父stage的task,这里是个递归,直到没有父stage才在上面的语句中提交task
          }
          waitingStages += stage // 暂时不能提交的stage,先添加到等待队列
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

这个提交stage的过程是一个递归的过程,它是先要把父stage先提交,然后把自己添加到等待队列中,直到没有父stage之后,就提交该stage中的任务。等待队列在最后的submitWaitingStages方法中提交。

###getMissingParentStages函数 getMissingParentStages通过图的遍历,来找出所依赖的所有父Stage。

  private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

###submitMissingTasks函数
可见无论是哪种stage,都是对于每个stage中的每个partitions创建task,并最终封装成TaskSet,将该stage提交给taskscheduler。 ~~~ scala /** Called when stage’s parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug(“submitMissingTasks(“ + stage + “)”) // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear()

// First figure out the indexes of partition ids to compute.
val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
  stage match {
    case stage: ShuffleMapStage =>
      val allPartitions = 0 until stage.numPartitions
      val filteredPartitions = allPartitions.filter { id => stage.outputLocs(id).isEmpty }
      (allPartitions, filteredPartitions)
    case stage: ResultStage =>
      val job = stage.resultOfJob.get
      val allPartitions = 0 until job.numPartitions
      val filteredPartitions = allPartitions.filter { id => !job.finished(id) }
      (allPartitions, filteredPartitions)
  }
}

// Create internal accumulators if the stage has no accumulators initialized.
// Reset internal accumulators only if this stage is not partially submitted
// Otherwise, we may override existing accumulator values from some tasks
if (stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute) {
  stage.resetInternalAccumulators()
}

val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
outputCommitCoordinator.stageStart(stage.id)
val taskIdToLocations = try {
  stage match {
    case s: ShuffleMapStage =>
      partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
    case s: ResultStage =>
      val job = s.resultOfJob.get
      partitionsToCompute.map { id =>
        val p = job.partitions(id)
        (id, getPreferredLocs(stage.rdd, p))
      }.toMap
  }
} catch {
  case NonFatal(e) =>
    stage.makeNewStageAttempt(partitionsToCompute.size)
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
    abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
    runningStages -= stage
    return
}

stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
try {
  // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
  // For ResultTask, serialize and broadcast (rdd, func).
  val taskBinaryBytes: Array[Byte] = stage match {
    case stage: ShuffleMapStage =>
      closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
    case stage: ResultStage =>
      closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
  }

  taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
  // In the case of a failure during serialization, abort the stage.
  case e: NotSerializableException =>
    abortStage(stage, "Task not serializable: " + e.toString, Some(e))
    runningStages -= stage

    // Abort execution
    return
  case NonFatal(e) =>
    abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e))
    runningStages -= stage
    return
}

val tasks: Seq[Task[_]] = try {
  stage match {
    case stage: ShuffleMapStage =>
      partitionsToCompute.map { id =>
        val locs = taskIdToLocations(id)
        val part = stage.rdd.partitions(id)
        new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, stage.internalAccumulators)
      }

    case stage: ResultStage =>
      val job = stage.resultOfJob.get
      partitionsToCompute.map { id =>
        val p: Int = job.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = taskIdToLocations(id)
        new ResultTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, id, stage.internalAccumulators)
      }
  }
} catch {
  case NonFatal(e) =>
    abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
    runningStages -= stage
    return
}

if (tasks.size > 0) {
  logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  stage.pendingTasks ++= tasks
  logDebug("New pending tasks: " + stage.pendingTasks)
  taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
  stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
  // Because we posted SparkListenerStageSubmitted earlier, we should mark
  // the stage as completed here in case there are no tasks to run
  markStageAsFinished(stage, None)

  val debugString = stage match {
    case stage: ShuffleMapStage =>
      s"Stage ${stage} is actually done; " +
        s"(available: ${stage.isAvailable}," +
        s"available outputs: ${stage.numAvailableOutputs}," +
        s"partitions: ${stage.numPartitions})"
    case stage : ResultStage =>
      s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
  }
  logDebug(debugString)
}   } ~~~


blog comments powered by Disqus