k8s scheduler调度器原理以及核心源码分析
k8s 调度器要解决哪些问题?
通俗点说,k8s整个调度的过程就是在回答两个问题:
1. 候选节点有哪些?
2. 候选节点中最适合的是哪个?
# 简介
在 Kubernetes 项目中,默认调度器的主要职责,就是为一个新创建出来的 Pod,寻找一个最合适的节点(Node)。
而这里“最合适”的含义,包括两层:
- 从集群所有的节点中,根据调度算法挑选出所有可以运行该 Pod 的节点;
- 从第一步的结果中,再根据调度算法挑选一个最符合条件的节点作为最终结果。
所以在具体的调度流程中,默认调度器会首先调用一组叫作 Predicate(预选) 的调度算法,来检查每个 Node。然后,再调用一组叫作 Priority(优选) 的调度算法,来给上一步得到的结果里的每个 Node 打分。最终的调度结果,就是得分最高的那个 Node。
# 调度流程
# 调度组件
下图是 kube-scheduler 的主要几大组件:
# Policy
Scheduler 的调度策略启动配置目前支持三种方式,配置文件 / 命令行参数 / ConfigMap。调度策略可以配置指定调度主流程中要用哪些过滤器 (Predicates)、打分器 (Priorities) 、外部扩展的调度器 (Extenders),以及最新支持的 SchedulerFramwork 的自定义扩展点 (Plugins)。
# Informer
Scheduler 在启动的时候通过 K8s 的 informer 机制以 List+Watch 从 kube-apiserver 获取调度需要的数据例如:Pods、Nodes、Persistant Volume(PV), Persistant Volume Claim(PVC) 等等,并将这些数据做一定的预处理作为调度器的的 Cache。
# Scheduler Pipeline
通过 Informer 将需要调度的 Pod 插入 Queue 中,Pipeline 会循环从 Queue Pop 等待调度的 Pod 放入 Pipeline 执行。
调度流水线 (Schedule Pipeline) 主要有三个阶段:Scheduler Thread,Wait Thread,Bind Thread。
Scheduler Thread 阶段: 从如上的架构图可以看到 Schduler Thread 会经历 Pre Filter -> Filter -> Post Filter-> Score -> Reserve,可以简单理解为 Filter -> Score -> Reserve(预定)。
- Filter 阶段用于选择符合 Pod Spec 描述的 Nodes;
- Score 阶段用于从 Filter 过后的 Nodes 进行打分和排序;
- Reserve 阶段将 Pod 跟排序后的最优 Node 的 NodeCache 中,表示这个 Pod 已经分配到这个 Node 上, 让下一个等待调度的 Pod 对这个 Node 进行 Filter 和 Score 的时候能看到刚才分配的 Pod。
Wait Thread 阶段: 这个阶段可以用来等待 Pod 关联的资源的 Ready 等待,例如等待 PVC 的 PV 创建成功等等;
Bind Thread 阶段: 用于将 Pod 和 Node 的关联持久化 Kube APIServer。
整个调度流水线只有在 Scheduler Thread 阶段是串行的一个 Pod 一个 Pod 的进行调度,在 Wait 和 Bind 阶段 Pod 都是异步并行执行。
# 源码分析
# 代码目录结构
kubernetes 版本: v1.14.4 (opens new window),scheduler的pkg部分核心代码目录结构如下:
scheduler
├── algorithm # 主要包含调度的算法
│ ├── predicates # 预选的策略
│ ├── priorities # 优选的策略
│ ├── scheduler_interface.go # ScheduleAlgorithm、SchedulerExtender接口定义
│ └── types.go # 使用到的type的定义
├── algorithmprovider
│ ├── defaults # 默认算法的初始化操作,包括预选和优选策略
│ ├── plugins.go
│ └── plugins_test.go
├── core
│ ├── extender.go #拓展调度相关配置
│ ├── generic_scheduler.go # genericScheduler,主要包含默认调度器的调度逻辑
├── equivalence
├── eventhandlers.go
├── factory
│ ├── factory.go # 主要包括NewConfigFactory、NewPodInformer,监听pod事件来更新调度队列
│ ├── plugins.go
├── internal
│ ├── cache # scheduler调度使用到的cache
│ └── queue # 调度使用到的队列,主要用来存储需要被调度的pod
├── metrics
│ └── metrics.go # 主要给prometheus使用
├── nodeinfo
│ ├── host_ports.go
│ ├── node_info.go
├── scheduler.go # pkg部分的Run入口(核心代码),主要包含Run、scheduleOne、schedule、preempt等函数
└── volumebinder
└── volume_binder.go
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 启动流程
# 1. Scheduler.Run (opens new window)
此部分代码位于pkg/scheduler/scheduler.go
此处为具体调度逻辑的入口。
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
2
3
4
5
6
7
# 2. Scheduler.scheduleOne (opens new window)
此部分代码位于pkg/scheduler/scheduler.go
scheduleOne
主要为单个pod选择一个适合的节点,为调度逻辑的核心函数。
对单个pod进行调度的基本流程如下:
- 通过podQueue的待调度队列中弹出需要调度的pod。
- 通过具体的调度算法为该pod选出合适的节点,其中调度算法就包括预选和优选两步策略。
- 如果上述调度失败,则会尝试抢占机制,将优先级低的pod剔除,让优先级高的pod调度成功。
- 将该pod和选定的节点进行假性绑定,存入scheduler cache中,方便具体绑定操作可以异步进行。
- 实际执行绑定操作,将node的名字添加到pod的节点相关属性中。
完整代码如下:
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
plugins := sched.config.PluginSet
// Remove all plugin context data at the beginning of a scheduling cycle.
if plugins.Data().Ctx != nil {
plugins.Data().Ctx.Reset()
}
//1.获取下一个POD
pod := sched.config.NextPod()
// pod could be nil when schedulerQueue is closed
if pod == nil {
return
}
if pod.DeletionTimestamp != nil {
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
return
}
klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
// 同步尝试为 pod 找到合适的位置。(调度算法的核心)
start := time.Now()
scheduleResult, err := sched.schedule(pod)
if err != nil {
if fitError, ok := err.(*core.FitError); ok {
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
} else {
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
}
metrics.PodScheduleFailures.Inc()
} else {
klog.Errorf("error selecting node for pod: %v", err)
metrics.PodScheduleErrors.Inc()
}
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedSchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
assumedPod := pod.DeepCopy()
// 在假设 pod 之前先假设卷。
// 如果所有卷都完全绑定,则 allBound 为真,将跳过绑定。
// 否则,卷的绑定是在假定 pod 之后但在 pod 绑定之前开始的。
// 如果需要卷绑定,此函数会修改 'assumedPod'。
allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
klog.Errorf("error assuming volumes: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
// Run "reserve" plugins.
for _, pl := range plugins.ReservePlugins() {
if err := pl.Reserve(plugins, assumedPod, scheduleResult.SuggestedHost); err != nil {
klog.Errorf("error while running %v reserve plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
sched.recordSchedulingFailure(assumedPod, err, SchedulerError,
fmt.Sprintf("reserve plugin %v failed", pl.Name()))
metrics.PodScheduleErrors.Inc()
return
}
}
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
//假设通过设置NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
klog.Errorf("error assuming pod: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
//将 pod 异步绑定到其主机(我们可以按照上述假设步骤执行此操作)
go func() {
// Bind volumes first before Pod
if !allBound {
err := sched.bindVolumes(assumedPod)
if err != nil {
klog.Errorf("error binding volumes: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
}
// Run "prebind" plugins.
for _, pl := range plugins.PrebindPlugins() {
approved, err := pl.Prebind(plugins, assumedPod, scheduleResult.SuggestedHost)
if err != nil {
approved = false
klog.Errorf("error while running %v prebind plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
metrics.PodScheduleErrors.Inc()
}
if !approved {
sched.Cache().ForgetPod(assumedPod)
var reason string
if err == nil {
msg := fmt.Sprintf("prebind plugin %v rejected pod %v.", pl.Name(), assumedPod.Name)
klog.V(4).Infof(msg)
err = errors.New(msg)
reason = v1.PodReasonUnschedulable
} else {
reason = SchedulerError
}
sched.recordSchedulingFailure(assumedPod, err, reason, err.Error())
return
}
}
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: scheduleResult.SuggestedHost,
},
})
metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {
klog.Errorf("error binding pod: %v", err)
metrics.PodScheduleErrors.Inc()
} else {
klog.V(2).Infof("pod %v/%v is bound successfully on node %v, %d nodes evaluated, %d nodes were found feasible", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
metrics.PodScheduleSuccesses.Inc()
}
}()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# 调度核心逻辑
sched.scheduleOne会被wait.UntilWithContext定时调用,直到ctx.Done()返回true为止。sched.scheduleOne是核心实现,主要做了以下几件事:
- 通过sched.NextPod()函数从优先队列中获取一个优先级最高的待调度Pod资源对象,如果没有获取到,那么该方法会阻塞住;
- 通过sched.Algorithm.Schedule调度函数执行Predicates的调度算法与Priorities算法,挑选出一个合适的节点;
- 当没有找到合适的节点时,调度器会尝试调用prof.RunPostFilterPlugins抢占低优先级的Pod资源对象的节点;
- 将该pod和选定的节点进行假性绑定,存入scheduler cache中,方便具体绑定操作可以异步进行。
- 实际执行绑定操作,将node的名字添加到pod的节点相关属性中。
其中核心的部分为通过具体的调度算法选出调度节点的过程,即genericScheduler.Schedule
的实现部分。该部分包括预选和优选两个部分。
genericScheduler.Schedule
调度的基本流程如下:
- 对pod做基本性检查,目前主要是对pvc的检查。
- 通过
findNodesThatFit
预选策略选出满足调度条件的node列表。 - 通过
PrioritizeNodes
优选策略给预选的node列表中的node进行打分。 - 在打分的node列表中选择一个分数最高的node作为调度的节点。