tasksetmanager
|
|
TaskSetManager
负责某个 TaskSet 的调度,对该 TaskSet 的所有 task 进行跟踪,如果有失败的 task,会负责重试(重试有上限),并且通过 delay scheduling(可以想想这个怎么实现的?) 实现 locality locality-aware scheduling. 主要的接口有 resourceOffer
– 用于判断一个 TaskSet 中的 task 是否需要运行到某个 node 上,statusUpdate
– 用于跟踪 task 的状态变化。不是线程安全的。
dequeeTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int]
负责从对应的 list 中删除返回一个 pending Task,如果没有合适的 Task 就返回 None,该 function 中会将那些已经运行的 task 进行删除,会跳过所有的不能在对应 execId 上运行的 task(通过 executorIsBlacklisted(execId, index) 进行判断)
|
|
dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)])
删除并返回一个可执行的 task,只返回符合 locality 约束的 task。首先逐个 locality 进行查找,如果有符合的 task 直接返回,否则返回一个合适的 推测执行的 task
executorIsBlacklisted(execId: String, taskId: Int): Boolean
进行判断某个 execId 上能否运行对应的 task(如果之前这个 taskId 在这个 execId 上运行失败了,而且当前时间和之间失败的时间差小于阈值 EXECUTOR_TASK_BLACKLIST_TIMEOUT
)
|
|
dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)]
负责删除并返回一个 推测执行的 task,如果没有合适的就返回 None。逻辑就是遍历所有 task,然后看 task 是否能运行在特定的 TaskLocality 上,如果可以就返回,并且将该 task 从推测执行的 task list 中删除。TaskLocality 的顺序为 {PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
如何实施推测执行,逻辑在 checkSpeculatableTasks
函数中,
- 如果该 TaskSetManager 变为 zoombie 了,或者只包含一个 task,就不推测执行(为什么一个 task 就不推测执行)
- 如果完成的 task 数大于等于
minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
(其中SPECULATION_QUANTILE
默认 0.75,可以通过spark.speculation.quantile
设置)且有 task 成功执行过,则执行下面的步骤 - 将所有执行成功的 task 的执行时间进行排序,取第
val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1)) val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
threshold 作为临界值,对每个 task 进行检测。 - 如果该 task 还没有运行成功,运行时间超过
threshold
,只有一个实例在跑,而且没有推测执行过,就进行推测执行 - 推测执行保证同一个 task 的不同实例不会调度到同一台主机上,且不会调度到以及被加进黑名单的主机中
resourceOffer(execId: String,host: String,maxLocality:TaskLocality.TaskLocality): Option[TaskDescription]
负责资源的实际分配,如果当前 taskSetManager 不是 zoombie 状态才进行处理。首先找出当前时间可以被调度到的最高 Locality,然后使用 dequeuTask
删除并找到一个符合条件的 task,如果找到就更新相关的状态数据(包括,更新现在正在运行的 task 有哪些,更新当前的 locality,然后将 task 所需要的文件等序列化,附加到一个 TaskDescription 结构中并且返回),并且通知 DAGScheduler 该 task 已经开始运行。如果序列化有问题,则直接抛异常。
getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality
获取当前时间对应的一个 TaskLocality, 这里面会有时间等待(等待的时间就是每个 TaskLocality 的等待时间,默认 3s,可以配置)
handleTaskGettingResult
主要进行状态标记,然后通知 DAGSchedulercanFetchMoreResults(size: Long): Boolean
检测是否还能 fetch size 字节大小已经序列化后的数据,如果不能,就将该 taskSetManger 标记为 zoombie,并且通知 DAGScheduler 该 TaskSet 为失败
handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit
负责将一个 task 标记为成功,并且如果当前 TaskSet 所有 task 都运行完成,就标记为 zoombie 状态,并且通知 DAGScheduler。
handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason)
将task 标记为失败,并且重新添加到 pendingTask 队列中,然后通知 DAGScheduler。根据失败的信息不同,做不同的处理:
- FetchFailure:直接想当前的
tasksetManager
标记为 zoombie,然后做一定的清理工作,就把当前的tasksetManager
标记为成功 - ExceptionFailure:如果是
NotSerializableException
就直接退出,否则会打印相应异常,然后进行重试 - 其他的异常,打印日志
executorLost(execId: String, host: String, reason: ExecutorLossReason)
负责处理 executorLost 的情况,由 taskSchedulerImp 调用。逻辑如下
- 如果是
ShufflleMapTask
且没有开启externalShuffleServiceEnabled
就进行如下操作:如果 task 以及成功了,就将这些 task 标记为失败,且进行重试(因为后续的 task 需要从这些 task 中获取数据) - 如果是其他的,就直接调用
handleFailedTask
进行处理,然后重新计算locality
getLocalityWait(level: TaskLocality.TaskLocality): Long =
用于获取 locality, 代码如下
|
|
疑问:
- 推测执行的时候,为什么 TaskSet 只有 1 个 task 的话就不推测执行
- 推测执行对每个 task 只会进行一次?
a. 可以被 推测执行多次,只执行一次的逻辑是使用speculatableTasks
做检测的,当运行一个 推测执行的 task 后,该 task 就会从speculatableTasks
进行删除,然后就可以进行推测执行了。严格的说法是,只运行同一个 task 的一个实例在“排队等待被推测执行” - 每个 task 的
preferredLocations
怎么得到的?根据什么规则?每一个的含义又是什么,总共有{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
这些可选项 - 每个 TaskSet 的所有 task 都是一样的 locality?
- 推测执行的时候,如果最后执行成功多个 task,会对结果有影响吗?怎么规避这种影响的