05 November 2015

Task

Spark有两种Task:
1. ShuffleMapTask
2. ResultTask

Task是Spark的Executor执行单元,介于DAGScheduler和TaskScheduler中间的接口。在DAGScheduler中,需要把DAG中的每个stage的每个partitions封装成task,再把这些taskset提交给TaskScheduler。

private[spark] abstract class Task[T](
    val stageId: Int,  //这个task属于那个stage
    val stageAttemptId: Int,
    val partitionId: Int,  //RDD中的序号
    internalAccumulators: Seq[Accumulator[Long]]) extends Serializable

由于Task需要保证工作节点具备本次Task需要的其他依赖注册到SparkContext下,所以Task的伴生对象提供了序列化和反序列化应用依赖的jar包的方法,来写入写出依赖流。

private[spark] object Task {
  /**
   * Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
   */
  def serializeWithDependencies(
  task: Task[_],
      currentFiles: HashMap[String, Long],
      currentJars: HashMap[String, Long],
      serializer: SerializerInstance)
    : ByteBuffer = {


  def deserializeWithDependencies(serializedTask: ByteBuffer)

ShuffleMapTask

对应于ShuffleMap Stage, 产生的结果作为其他stage的输入。

/**
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
* specified in the ShuffleDependency).
*
* See [[org.apache.spark.scheduler.Task]] for more information.
*
 * @param stageId id of the stage this task belongs to
 * @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized,
 *                   the type should be (RDD[_], ShuffleDependency[_, _, _]).
 * @param partition partition of the RDD this task is associated with
 * @param locs preferred task execution locations for locality scheduling
 */
private[spark] class ShuffleMapTask(
    stageId: Int,
    stageAttemptId: Int,
    taskBinary: Broadcast[Array[Byte]],
    partition: Partition,
    @transient private var locs: Seq[TaskLocation],
    internalAccumulators: Seq[Accumulator[Long]])
  extends Task[MapStatus](stageId, stageAttemptId, partition.index, internalAccumulators)
  with Logging {

ResultTask

对Result Stage直接产生结果

/**
 * A task that sends back the output to the driver application.
 *
 * See [[Task]] for more information.
 *
 * @param stageId id of the stage this task belongs to
 * @param taskBinary broadcasted version of the serialized RDD and the function to apply on each
 *                   partition of the given RDD. Once deserialized, the type should be
 *                   (RDD[T], (TaskContext, Iterator[T]) => U).
 * @param partition partition of the RDD this task is associated with
 * @param locs preferred task execution locations for locality scheduling
 * @param outputId index of the task in this job (a job can launch tasks on only a subset of the
 *                 input RDD's partitions).
 */
private[spark] class ResultTask[T, U](
    stageId: Int,
    stageAttemptId: Int,
    taskBinary: Broadcast[Array[Byte]],
    partition: Partition,
    @transient locs: Seq[TaskLocation],
    val outputId: Int,
    internalAccumulators: Seq[Accumulator[Long]])
  extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators)
  with Serializable {

TaskSet

TaskSet是用于封装一个stage的所有的task的数据结构,用来方便的提交给TaskScheduler。

/**
 * A set of tasks submitted together to the low-level TaskScheduler, usually representing
 * missing partitions of a particular stage.
 */
private[spark] class TaskSet(
    val tasks: Array[Task[_]],
    val stageId: Int,
    val stageAttemptId: Int,
    val priority: Int,
    val properties: Properties) {
    val id: String = stageId + "." + stageAttemptId

  override def toString: String = "TaskSet " + id
}

Executor注册到Driver

Driver发送LaunchTask消息被Executor接收,Executor会使用launchTask对消息进行处理。 不过在这个过程之前,如果Executor没有注册到Driver,即便接收到LaunchTask指令,也不会做任务处理。所以我们要先搞清楚,Executor是如何在Driver侧进行注册的。

Application注册

Executor的注册是发生在Application的注册过程中的,我们以Standalone模式为例:
1. SparkContext创建schedulerBackend和taskScheduler,schedulerBackend作为TaskScheduler对象的一个成员存在
2. 在TaskScheduler对象调用start函数时,其实调用了backend.start()函数
3. backend.start()函数中启动了AppClient,AppClient的其中一个参数ApplicationDescription就是封装的运行CoarseGrainedExecutorBackend的命令 4. AppClient内部启动了一个ClientActor,这个ClientActor启动之后,会尝试向Master发送一个指令actor ! RegisterApplication(appDescription) 注册一个Application

AppClient向Master提交Application


blog comments powered by Disqus