TaskScheduler
本文属于自己看源码后的记录
与不同的后端调度器一起,进行 task 的调度(task 是 DAGScheduler 中划分的 Stage 中的具体任务),后端调度器包括 LocalBackend
, SparkDeploySchedulerBackend
,MesosSchedulerBackend
,YarnClientSchedulerBackend
, YarnClusterSchedulerBackend
,SimrSchedulerBackend
等
整个 TaskSchedulerImpl 比较简单,复杂的地方在于和各种 后端调度器联合,以及具体 TasksetManager
进行联合
|
|
对于没有传入 maxTaskFailures
参数的,使用 sc.conf.getInt("spark.task.maxFailures", 4)
,可以使用配置参数,默认为 4 次。
TaskSchedulerImple
的入口是 submitTasks()
,由 DAGScheduler
划分好 Stage 之后进行调用
在 submitTasks
中会进行资源的检测(是否申请到资源,标志位 hasLaunchedTask
由后面的 resourceOffers
进行设置),每 STARVATION_TIMEOUT_MS
检测一次,直到 hasLaunchedTask
为 True 为止,STARVATION_TIMEOUT_MS
默认 15s,可以通过 spark.starvation.timeout
进行配置,该函数会将当前的 taskSetManger 添加到整个 ScheduleablePool
中,然后通过 backend.reviveOffer
将 task 分配到 executor 上去
另外在 TaskSchedulerImpl
中会启动 推测执行的后台线程,默认 100ms 检测一次,可以通过 spark.speculation.interval
进行配置
每个 Task 默认使用的 CPU 数由 CPUS_PER_TASK
进行控制,默认为 1,可以通过 spark.task.cpus
进行设置
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]
根据 StageId 和 Attempt 来确定 taskSets
private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
task 调度的模式,默认为 FIFO
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) --- Runs a thread pool that deserializes and remotely fetches (if necessary) task results
executorsByHost = new HashMap[String, HashSet[String]]
保存每个 Host 中运行的所有 executor
hostsByRack = new HashMap[String, HashSet[String]]
不同 Rack 对于的 host
executorIdToHost = new HashMap[String, String]
保存 executorId 到 host 的对于关系
另外还有 backend: SchedulerBackend
, dagScheduler: DAGScheduler
,mapOutputTracker = SparkEnv.get.mapOutputTracker
,
initialize()
主要设置 backend 以及 schedulerPool,start()
启动 backend, 然后按需启动 推测执行的线程
Yarn-cluster 模式下的 backend 为 org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
,其中 backend 在 SparkContext
中初始化(根据运行的模式不同,初始化不同的 backend)
resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]]
用于分配资源,将 task 分配到指定的 executor 上,传入的参数是所有可以分配 task 的 executor,流程如下:
- 首先更新 executorIdToHost,executorIdToRunningTaskIds 以及 hostsByRack,这些信息在分配 task 的时候会用到
- 将所有的 executor 进行打乱,防止全部 task 都分配到一个 executor 上(这里和下面的策略关联,下面使用 round-robin 进行分配),那么分配策略还有其他的吗?不同策略之间的对比是怎样的
- 根据具体的策略(FIFO 或者 FAIR 等)将所有 task 进行排序(如果本次有新加的 executor,那么所有的 taskset 都需要重新计算 locality)根据什么规则进行 locality 的重新计算?具体在 TaskSetManager 实现
- 通过 resourceOfferSingleTaskSet 进行分配
resourceOfferSingleTaskSet(taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int],tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {}
给一个 TaskSet 分配资源,具体逻辑如下:
枚举每个 executor,如果当前 executor 的可用 cpu 大于单个 task 需要的 cpu
则调用 tasksetManager.resourceOffer() 进行具体的 task 分配,并且更新相应的信息
包括 taskIdToTaskSetManager,taskIdToExecutorId, executorIdToRunningTaskIds,executorsByHost 等。然后直接返回
cancelTasks(stageId: Int, interruptThread: Boolean): Unit
取消某个 stage 的所有 task。分两种情况:
- task set manager 已经创建,并且有部分 task 已经被调度,这种情况下,给对于的 executor 发送 kill 指令,然后终止整个 stage
- task set manager 已经创建,但是还没有 task 被调度,直接终止整个 stage 即可
taskSetFinished(manager: TaskSetManager): Unit
整个 task set manager 的 task 都执行完成了,主要做一些清理公族藕,然后将 task set manager 删除
statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
进行 task 的状态更新
- 如果 task 的状态为 LOST
- 这个状态仅仅在 Mesos 中使用,每个 executor 对于一个 task,所以将这个 executor 标记为 Failure
- 如果 task 的状态为 {FINISHED, FAILED, KILLED, LOST} 中的任何一种
- 首先清理 taskset scheduler 的一些状态信息(比如需要跟踪的 task 等),并根据状态信息(FINISHED 是一类,其他三种为一类)更新相应的状态信息
如果检测到有失败的 executor(只在 mesos 模式下会产生),会给 dagscheduler 发送 executorLost 信号,并且调用 backend 的 makeOffer 重新调度 task
executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean) = {}
更新正在运行的 task 的状态, 并且充当 master 和 BlockManager 的心跳,如果 master 认识当前 blockManager,则返回 true,否则返回 false – 表示 blockManager 需要重新注册
- taskMetrics: Array[(Long, TaskMetrics)] //taskId -> TaskMetrics
- 根据每个 taskId 获取对于的信息(taskId, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, metrics) 发送给 dagScheduler, 其中 metric 为 taskMetrics 的第二维信息
handleTaskGettingResult() & handleSuccessfulTask()
直接调用对应的 taskSetManager 对应方法
handleFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, reason: TaskEndReason): Unit = synchronized {}
如果对应的 taskSetManger 不是僵尸状态,而且 taskState 不为 KILLED,就重新申请资源
executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
- 如果
executorIdToRunningTasksIds
包含当前上报的 executor(失败的 executor 有 task 正在运行), 则将当前 executor 从所有的数据结构中进行删除,然后通知DAGScheduler
并且重新申请资源 - 否则只进行一些数据结构的处理(包括从所有的数据结构中进行删除,然后打印 logger 等)