调度器结构
type Scheduler struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache internalcache.Cache
Algorithm core.ScheduleAlgorithm
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
podConditionUpdater podConditionUpdater
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
// the preemptor pod.
podPreemptor podPreemptor
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *framework.PodInfo
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*framework.PodInfo, error)
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder scheduling.SchedulerVolumeBinder
// Disable pod preemption or not.
DisablePreemption bool
// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
// Profiles are the scheduling profiles.
Profiles profile.Map
scheduledPodsHasSynced func() bool
}
关键数据结构
SchedulingQueue
等待调度的 Pod 队列
调度队列是一个优先队列,其关键数据结构如下
type PriorityQueue struct {
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod.
activeQ *heap.Heap
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ
podBackoffQ *heap.Heap
// unschedulableQ holds pods that have been tried and determined unschedulable.
unschedulableQ *UnschedulablePodsMap
// nominatedPods is a structures that stores pods which are nominated to run
// on nodes.
nominatedPods *nominatedPodMap
}
共有三个结构用于保存不同状态的 Pod,Pending Pods 为以下三个队列数量之和
activeQ
核心调度队列,要调度的 Pod 只会从这个队列 Pop 数据,数据结构为大根堆,堆顶部为优先级最高的 Pod,新创建的 Pod 会先加入到此队列podBackoffQ
unschedulableQ
尝试过调度但是未能找到合适的节点的 Pod
backoff 队列和 unschedulable 队列有什么区别,不都是调度失败 Pod 保存的队列吗?如果有 move request 则进入 backoffQ 否则进入 unschedulableQ
启动 Run(ctx)
kube-scheduler 一旦获取 leader,便开始执行 Run
方法
func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
sched.SchedulingQueue.Run()
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}
SchedulingQueue 调度队列
调度队列后台会启动两个 goroutine,分别将 podBackoffQ
unschedulableQ
两个队列中的 Pod 根据是否满足条件重新移动到 activeQ
重新参与一次调度
func (p *PriorityQueue) Run() {
// 每秒从 backoff 队列中循环一遍所有 Pod
// 到达 backoff 时间的 Pod pop 出来加入到 activeQ 中
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
// 每 30s 从 unscheduableQ 中
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
集群有一些事件发生时会将 unscheduableQ 中的 Pod 全部放到 activeQ 或 backoffQ 中重新参与一遍调度,使用队列的 MoveAllToActiveOrBackoffQueue
方法,有如下事件会触发:
- Node 新增和更新
- PV、PVC 新增和更新
- Service 任何变化
- StorageClass 新增
- Pod 删除
如何决定一个未能成功调度的 Pod 放入到哪个队列中
// If a move request has been received, move it to the BackoffQ, otherwise move
// it to unschedulableQ.
if p.moveRequestCycle >= podSchedulingCycle {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
} else {
p.unschedulableQ.addOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
}
scheduleOne
调度一个 Pod
调度主要有两个步骤
- Filtering 筛选,筛选出一批满足要求的节点列表
- Scoring 打分,为每个节点打分选择最适合的节点,若多个相同最高分,则随机选择一个最高的
Pod 调度完整流程
-
从调度队列中 Pop 一个 Pod,若队列为空则阻塞
-
根据 Pod 获取该 Pod 指定调度器的 profile
-
调度 Schedule
sched.Algorithm.Schedule(ctx, prof, state, pod)
上图绿色部分
-
snapshot node
-
RunPreFilterPlugins
-
开始 Predicate:
findNodesThatFitPod
- 判断通过 framework 的 Filter: RunFilterPlugins,此处调度器启动了 16 个 worker 去对所有 node 并发 check 是否满足*
- 判断通过 extender 的 Filter
-
RunPreScorePlugins 打分前再次过滤一遍
-
打分
prioritizeNodes
- 若没有配置打分插件则直接为所有节点设置为默认 1 分
- *RunScorePlugins* 并汇总所有打分结果
- 若有 extender 则并发调用 webhook 的 Prioritize
-
选择节点
selectHost
-
选择节点源码,选择最高分者,若最高分相同则随机选择一个
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) { if len(nodeScoreList) == 0 { return "", fmt.Errorf("empty priorityList") } maxScore := nodeScoreList[0].Score selected := nodeScoreList[0].Name cntOfMaxScore := 1 for _, ns := range nodeScoreList[1:] { if ns.Score > maxScore { maxScore = ns.Score selected = ns.Name cntOfMaxScore = 1 } else if ns.Score == maxScore { cntOfMaxScore++ if rand.Intn(cntOfMaxScore) == 0 { // Replace the candidate with probability of 1/cntOfMaxScore selected = ns.Name } } } return selected, nil }
-
-
至此,已经选择出一个唯一建议的调度节点
-
-
VolumeBind
sched.VolumeBinder.AssumePodVolumes
-
RunReservePlugins
-
assume
-
RunPermitPlugins
-
异步 Bind,上图中的黄色部分
- WaitOnPermit 若失败则 RunUnreservePlugins
- RunPreBindPlugins
- RunBindPlugins
- RunPostBindPlugins
调度失败的处理
在整个调度周期中若调度错误,会调用 recordSchedulingFailure 方法记录失败的调度,其中主要做两个事
- 调用注册到 sched 中的 Error 方法
- 更新 Pod 的 Status 字段的 PodScheduled Condition 为 False
其中调度失败的处理逻辑 MakeDefaultErrorFunc
如下
- 判断错误类型并做出相应的后续处理:节点未找到时从调度 cache 中清除该节点,其他错误只是打印日志
- 再次重试:
- 重新获取 Pod 信息查看 nodeName 有没有被指定,若已经直接直接 return
- 重新获取 Pod 信息查看 Pod 是否还存在,若已经被删除则直接 return
- 若确实未被指定 node,则重新加入到调度队列中(podBackoffQ/unschedulableQ)
func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.PodInfo, error) {
return func(podInfo *framework.PodInfo, err error) {
pod := podInfo.Pod
// 不同错误信息处理,不是特别关键,忽略
podSchedulingCycle := podQueue.SchedulingCycle()
// Retry asynchronously.
// Note that this is extremely rudimentary and we need a more real error handling path.
go func() {
defer utilruntime.HandleCrash()
podID := types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
}
// An unschedulable pod will be placed in the unschedulable queue.
// This ensures that if the pod is nominated to run on a node,
// scheduler takes the pod into account when running predicates for the node.
// Get the pod again; it may have changed/been scheduled already.
// 启动一个独立 goroutine 去 backoff 重试加入到调度队列(不是重试调度)
// 初始 backoff 时间为 100ms,每次重试时间 *2,最大为 1min
getBackoff := initialGetBackoff
for {
pod, err := client.CoreV1().Pods(podID.Namespace).Get(context.TODO(), podID.Name, metav1.GetOptions{})
if err == nil {
if len(pod.Spec.NodeName) == 0 {
podInfo.Pod = pod
if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podSchedulingCycle); err != nil {
klog.Error(err)
}
}
break
}
if apierrors.IsNotFound(err) {
klog.Warningf("A pod %v no longer exists", podID)
return
}
klog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err)
if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff {
getBackoff = maximalGetBackoff
}
time.Sleep(getBackoff)
}
}()
}
}
为什么调度失败需要启动独立的 goroutine 循环 backoff 判断并加入到调度队列中?不是加入一次后调度队列去重试调度就可以了吗?
调度器扩展方式
调度器配置文件:
https://kubernetes.io/docs/reference/config-api/kube-scheduler-config.v1beta2/
Policy & Profile
- Scheduler Policy,用于配置过滤的断言 (Predicates) 和打分的优先级 (Priorities)
- Scheduler Profile,用于配置不同调度阶段的插件
Extender (Webhook 形式)
https://kubernetes.io/docs/reference/config-api/kube-scheduler-policy-config.v1/
community/scheduler_extender.md at master · kubernetes/community
配置文件中的 pilicy 字段下面可以指定 extenders,extender 就是在各个扩展点调用外部的 HTTP 服务
extender 的不足
- 通信成本:需要多一次 HTTP 调用
- 扩展点有限
- 可能有新增节点的需求
- 缓存无法共享
Framework
定义了多个扩展点,每个扩展点可以注册 plugin,使用定义的 interface 用代码实现编译为新的调度器程序,Pod 指定新的调度器名。
扩展点的 plugin 调用顺序如下:
- 扩展点没有配置 plugin:使用默认插件中的扩展
- 扩展点有配置 plugin:先调用默认插件的扩展,再调用配置中的扩展
- 默认插件的扩展始终被最先调用,然后按照
KubeSchedulerConfiguration
中扩展的激活enabled
顺序逐个调用扩展点的扩展 - 可以先禁用默认插件的扩展,然后在
enabled
列表中的某个位置激活默认插件的扩展,这种做法可以改变默认插件的扩展被调用时的顺序
调度器性能优化
percentageOfNodesToScore
参数,指定多少百分比的节点打分后就足够调度
若不指定,使用一个线性公式:在 100-节点集群取 50%,5000-节点集群取 10%,最低 5%