本文属于自己看源码后的记录

与不同的后端调度器一起,进行 task 的调度(task 是 DAGScheduler 中划分的 Stage 中的具体任务),后端调度器包括 LocalBackendSparkDeploySchedulerBackendMesosSchedulerBackendYarnClientSchedulerBackendYarnClusterSchedulerBackendSimrSchedulerBackend

整个 TaskSchedulerImpl 比较简单,复杂的地方在于和各种 后端调度器联合,以及具体 TasksetManager 进行联合

1
2
3
4
TaskSchedulerImpl(
val sc: SparkContext, // SparkContext
val maxTaskFailures: Int, //每个 task 允许失败的最大次数
isLocal: Boolean = false) //是否本地运行

对于没有传入 maxTaskFailures 参数的,使用 sc.conf.getInt("spark.task.maxFailures", 4),可以使用配置参数,默认为 4 次。

TaskSchedulerImple 的入口是 submitTasks() ,由 DAGScheduler 划分好 Stage 之后进行调用

submitTasks 中会进行资源的检测(是否申请到资源,标志位 hasLaunchedTask 由后面的 resourceOffers 进行设置),每 STARVATION_TIMEOUT_MS 检测一次,直到 hasLaunchedTask 为 True 为止,STARVATION_TIMEOUT_MS 默认 15s,可以通过 spark.starvation.timeout 进行配置,该函数会将当前的 taskSetManger 添加到整个 ScheduleablePool 中,然后通过 backend.reviveOffer 将 task 分配到 executor 上去

另外在 TaskSchedulerImpl 中会启动 推测执行的后台线程,默认 100ms 检测一次,可以通过 spark.speculation.interval 进行配置

每个 Task 默认使用的 CPU 数由 CPUS_PER_TASK 进行控制,默认为 1,可以通过 spark.task.cpus 进行设置

private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] 根据 StageId 和 Attempt 来确定 taskSets

private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO") task 调度的模式,默认为 FIFO

private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) --- Runs a thread pool that deserializes and remotely fetches (if necessary) task results

executorsByHost = new HashMap[String, HashSet[String]] 保存每个 Host 中运行的所有 executor

hostsByRack = new HashMap[String, HashSet[String]] 不同 Rack 对于的 host

executorIdToHost = new HashMap[String, String] 保存 executorId 到 host 的对于关系

另外还有 backend: SchedulerBackenddagScheduler: DAGSchedulermapOutputTracker = SparkEnv.get.mapOutputTracker

initialize() 主要设置 backend 以及 schedulerPool,
start() 启动 backend, 然后按需启动 推测执行的线程

Yarn-cluster 模式下的 backend 为 org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend,其中 backend 在 SparkContext 中初始化(根据运行的模式不同,初始化不同的 backend)

resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] 用于分配资源,将 task 分配到指定的 executor 上,传入的参数是所有可以分配 task 的 executor,流程如下:

  1. 首先更新 executorIdToHost,executorIdToRunningTaskIds 以及 hostsByRack,这些信息在分配 task 的时候会用到
  2. 将所有的 executor 进行打乱,防止全部 task 都分配到一个 executor 上(这里和下面的策略关联,下面使用 round-robin 进行分配),那么分配策略还有其他的吗?不同策略之间的对比是怎样的
  3. 根据具体的策略(FIFO 或者 FAIR 等)将所有 task 进行排序(如果本次有新加的 executor,那么所有的 taskset 都需要重新计算 locality)根据什么规则进行 locality 的重新计算?具体在 TaskSetManager 实现
  4. 通过 resourceOfferSingleTaskSet 进行分配

resourceOfferSingleTaskSet(taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int],tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {} 给一个 TaskSet 分配资源,具体逻辑如下:

枚举每个 executor,如果当前 executor 的可用 cpu 大于单个 task 需要的 cpu
则调用 tasksetManager.resourceOffer() 进行具体的 task 分配,并且更新相应的信息
包括 taskIdToTaskSetManager,taskIdToExecutorId, executorIdToRunningTaskIds,executorsByHost 等。然后直接返回

cancelTasks(stageId: Int, interruptThread: Boolean): Unit 取消某个 stage 的所有 task。分两种情况:

  • task set manager 已经创建,并且有部分 task 已经被调度,这种情况下,给对于的 executor 发送 kill 指令,然后终止整个 stage
  • task set manager 已经创建,但是还没有 task 被调度,直接终止整个 stage 即可

taskSetFinished(manager: TaskSetManager): Unit
整个 task set manager 的 task 都执行完成了,主要做一些清理公族藕,然后将 task set manager 删除

statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) 进行 task 的状态更新

  • 如果 task 的状态为 LOST
  • 这个状态仅仅在 Mesos 中使用,每个 executor 对于一个 task,所以将这个 executor 标记为 Failure
  • 如果 task 的状态为 {FINISHED, FAILED, KILLED, LOST} 中的任何一种
  • 首先清理 taskset scheduler 的一些状态信息(比如需要跟踪的 task 等),并根据状态信息(FINISHED 是一类,其他三种为一类)更新相应的状态信息
    如果检测到有失败的 executor(只在 mesos 模式下会产生),会给 dagscheduler 发送 executorLost 信号,并且调用 backend 的 makeOffer 重新调度 task

executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean) = {}
更新正在运行的 task 的状态, 并且充当 master 和 BlockManager 的心跳,如果 master 认识当前 blockManager,则返回 true,否则返回 false – 表示 blockManager 需要重新注册

  • taskMetrics: Array[(Long, TaskMetrics)] //taskId -> TaskMetrics
  • 根据每个 taskId 获取对于的信息(taskId, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, metrics) 发送给 dagScheduler, 其中 metric 为 taskMetrics 的第二维信息

handleTaskGettingResult() & handleSuccessfulTask() 直接调用对应的 taskSetManager 对应方法

handleFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, reason: TaskEndReason): Unit = synchronized {}
如果对应的 taskSetManger 不是僵尸状态,而且 taskState 不为 KILLED,就重新申请资源

executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}

  • 如果 executorIdToRunningTasksIds 包含当前上报的 executor(失败的 executor 有 task 正在运行), 则将当前 executor 从所有的数据结构中进行删除,然后通知 DAGScheduler 并且重新申请资源
  • 否则只进行一些数据结构的处理(包括从所有的数据结构中进行删除,然后打印 logger 等)

Comments