LiFengMing LiFengMing
首页
云原生
中间件
工具导航
资源导航
  • 分类
  • 标签
  • 归档
关于作者
GitHub (opens new window)

LiFengMing

IT届李哥
首页
云原生
中间件
工具导航
资源导航
  • 分类
  • 标签
  • 归档
关于作者
GitHub (opens new window)
  • 编程语言

  • 问题排查手册

  • 容器编排技术

    • Linux 之 iptables 原理分析
    • K8s常见问题排查技巧
    • K8s应用存储与存储卷
    • k8s scheduler调度器原理以及核心源码分析
    • k8s scheduler调度算法及源码分析
    • k8s scheduler之拓展调度器
    • 云原生
    • 容器编排技术
    LiFengMing
    2022-06-20
    目录

    k8s scheduler之拓展调度器

    # 拓展调度器

    Kubernetes 自带了一个默认调度器kube-scheduler,其内置了很多节点预选和优选的调度算法,一般调度场景下可以满足要求。但是在一些特殊场景下,默认调度器不能满足我们复杂的调度需求。我们就需要对调度器进行扩展,以达到调度适合业务场景的目的。例如CSI存储插件,基于容量调度。该场景下,默认调度器内置的预选、优选算法不能满足需求,我们有以下三种选择:

    1. 将新的调度算法添加到默认调度程序中,并重新编译镜像,最终该镜像运行的实例作为kubernetes集群调度器;

    2. 参考kube-scheduler实现满足自己业务场景的调度程序,并编译镜像,将该程序作为独立的调度器运行到kubernetes集群内,需要用该调度器调度的pod实例,在spec.schedulerName里指定该调度器;

    3. 实现“调度扩展程序“:默认调度器kube-scheduler在进行预选时会调用该扩展程序进行过滤节点;在优选时会调用该扩展程序进行给节点打分,或者在bind操作时,调用该扩展器进行bind操作。

      对上述三种方式进行评估:

    • 第一种:将自己的调度算法添加到默认调度器kube-scheduler中,对原生代码侵入性较高,而且随着kubernetes版本升级,维护成本也较高;

    • 第二种:默认调度器里内置了很多优秀调度算法,如:检查节点资源是否充足;端口是否占用;volume是否被其他pod挂载;亲和性;均衡节点资源利用等,如果完全使用自己开发的调度器程序,可能在达到了实际场景调度需求同时,失去更佳的调度方案,除非集成默认调度器中的算法到自己独立调度程序中,但这无疑是不现实的;

    • 第三种:通过启动参数的policy配置,选用某些默认调度器中的预选、优选调度算法的同时,也可以调用外部扩展调度程序的算法,计算得到最优的调度节点,无需修改kube-scheduler代码,只需要在启动参数中增加配置文件即可将默认调度程序和扩展调度程序相互关联。

      第三种方法对原生代码侵入性小,也不会破坏原有的策略,是我们的最优解。

    # 源码分析

    # Extender调度器接口 (opens new window)

    对于kube-scheduler,需要定义调度扩展程序的接口,就是Extender接口,源码链接:

    type SchedulerExtender interface {
    
      // 调度扩展程序的唯一名字,因为可能会有多个调度扩展程序。
    	Name() string
    
    	// Filter()和FilterPlugin.Filter()类似,不同的是传入了全部的Node,而插件传入的是一个Node,这个是出于调用效率考虑的,毕竟是远程调用。
    	// 因为参数与过滤插件不同,所以返回值也略有不同,返回了已过滤的Node(通过过滤)和过滤失败(未通过过滤)的Node。
    	Filter(pod *v1.Pod,
    		nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
    	) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error)
    
      // Prioritize()这接口名字是有历史原因的,因为以前的调度器分为‘predicate’和‘prioritize’两个阶段,对应调度插件的Filter和Score。
    	// Prioritize()接口要求输入Pod以及Filter()返回的Node集合,输出所有Node的评分(hostPriorities)以及调度扩展程序的权重。
    	// 这样kube-scheduler就可以将所有扩展程序返回的分数乘以权重再累加起来,这一点和调度插件原理是一样的。
    	Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error)
    
    	// Bind()与BindPlugin.Bind()功能一样,只是参数的差异,了解DefaultBinder.Bind()读者应该知道,该函数最终将接口参数转换成了v1.Binding类型在执行绑定的。
    	Bind(binding *v1.Binding) error
    
    	// 告诉kube-scheduler调度扩展程序是否有绑定能力,如果有绑定能力kube-scheduler会优先用调度扩展程序绑定。
    	// 需要注意: kube-scheduler会优先用调度扩展程序绑定还有一个条件,那就是Pod有些资源是由Extender管理。
    	IsBinder() bool
    
    	// 判断Pod是否有任何资源是被Extender管理的,因为有资源被Extender管理交给它绑定才有意义,否则不如直接用默认的绑定插件。
    	IsInterested(pod *v1.Pod) bool
    
    	// Extender的抢占调度接口,传入待调度Pod,�Node和被强占的Pod候选‘nodeNameToVictims’,key是node名字,value是node上被强占的Pod。
    	// 有同学肯定会问,不是让Extender执行抢占调度么?哪来的Node和被强占的Pod候选?这些不应该是ProcessPreemption()返回的么?
    	// 这是因为DefaultPreemption(唯一的抢占调度插件)在调用Extender.ProcessPreemption()之前已经执行了一部分抢占调度的来降低
    	// Extender.ProcessPreemption()候选的数量,毕竟想要实现抢占调度既要满足调度插件的需求也要满足Extender的要求。
    	// 所以先用调度插件选出一部分候选,可以减少不必要的数据传输,因为这是http调用。关于抢占调度的实现笔者会单独写一个文件解析。
    	ProcessPreemption(
    		pod *v1.Pod,
    		nodeToVictims map[*v1.Node]*schedulerapi.Victims,
    		nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
    	) (map[*v1.Node]*schedulerapi.Victims, error)
    
    	// 告知kube-scheduler是否具有抢占调度的能力
    	SupportsPreemption() bool
    
    	// 告知kube-scheduler如果Extender不可用是否忽略,如果忽略,kube-scheduler不会返回错误。
    	// 因为Extender的实现是HTTP服务,所以不可用是一种正常现象。
    	IsIgnorable() bool
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44

    # HTTPExtender (opens new window)

    // HTTPExtender实现了Extender接口
    type HTTPExtender struct {
      // 调度扩展程序的URL,比如https://127.0.0.1:8080。
    	extenderURL      string  
      // xxxVerb是HTTPExtender.Xxx()接口的HTTP请求的URL,比如https://127.0.0.1:8080/'preemptVerb' 用ProcessPreemption()接口。
    	preemptVerb      string
      
      //预选拓展
    	filterVerb       string
      
      //优选拓展
    	prioritizeVerb   string
      
      //绑定拓展
    	bindVerb         string
      
      // 调度扩展程序的权重,用来与ScorePlugin计算出最终的分数
      weight           int
      
      // HTTP客户端
      client           *http.Client
      
      // 调度扩展程序是否缓存了Node信息,如果调度扩展程序已经缓存了集群中所有节点的全部详细信息,那么只需要发送非常少量的Node信息即可,比如Node名字。
    	// 毕竟是HTTP调用,想法设法提升效率。但是为什么有podCacheCapable?这就要分析一下HTTPExtender发送的数据包括哪些了?
    	// 1. 待调度的Pod
    	// 2. Node(候选)
    	// 3. 候选Node上的候选Pod(仅抢占调度)
    	// 试想一下每次HTTP请求中Pod(包括候选Pod)可能不是不同的,而Node呢?有的请求可能会有不同,但于Filter请求因为需要的是Node全量,所以基本是相同。
    	// 会造成较大的无效数据传输,所以当调度扩展程序能够缓存Node信息时,客户端只需要传输很少的信息就可以了。
    	nodeCacheCapable bool
      
    	// 调度扩展程序管理的资源名称
    	managedResources sets.String
      
      // 如果调度扩展程序不可用是否忽略
    	ignorable        bool  
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37

    # Extender构造函数 (opens new window)

    func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerExtender, error) {
    	
      // 没有配置超时,就用默认超时,5秒钟
      if config.HTTPTimeout.Nanoseconds() == 0 {
    		config.HTTPTimeout = time.Duration(DefaultExtenderTimeout)
    	}
    
      // 创建http.Client,
    	transport, err := makeTransport(config)
    	if err != nil {
    		return nil, err
    	}
    	client := &http.Client{
    		Transport: transport,
    		Timeout:   config.HTTPTimeout,
    	}
      
      // 管理的资源从slice转为map[string]struct{}
    	managedResources := sets.NewString()
    	for _, r := range config.ManagedResources {
    		managedResources.Insert(string(r.Name))
    	}
      
      
    	// 各种通过配置赋值
    	return &HTTPExtender{
    		extenderURL:      config.URLPrefix,
    		preemptVerb:      config.PreemptVerb,
    		filterVerb:       config.FilterVerb,
    		prioritizeVerb:   config.PrioritizeVerb,
    		bindVerb:         config.BindVerb,
    		weight:           config.Weight,
    		client:           client,
    		nodeCacheCapable: config.NodeCacheCapable,
    		managedResources: managedResources,
    		ignorable:        config.Ignorable,
    	}, nil
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38

    # ExtendConfig (opens new window)

    type ExtenderConfig struct {
      //拓展调度器的URL前缀
    	URLPrefix string
      //拓展调度器中,预选的函数名
    	FilterVerb string
      //拓展调度器中,抢占的函数名
    	PreemptVerb string
      //拓展调度器中,优选函数名
    	PrioritizeVerb string
      //权重
    	Weight int
      //拓展调度器中,绑定函数名
    	BindVerb string
      //是否开启https
    	EnableHTTPS bool
    	TLSConfig *ExtenderTLSConfig
      //超时时间
    	HTTPTimeout time.Duration
      //拓展的资源列表
    	ManagedResources []ExtenderManagedResource
      //调度失败是否可以忽略
    	Ignorable bool
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

    # Filter-预选 (opens new window)

    func (h *HTTPExtender) Filter(
    	pod *v1.Pod,
    	nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
    ) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
    	var (
    		result     schedulerapi.ExtenderFilterResult
    		nodeList   *v1.NodeList
    		nodeNames  *[]string
    		nodeResult []*v1.Node
    		args       *schedulerapi.ExtenderArgs
    	)
    
    	if h.filterVerb == "" {
    		return nodes, schedulerapi.FailedNodesMap{}, nil
    	}
    
    	if h.nodeCacheCapable {
    	// 如果调度扩展程序缓存了Node信息,则参数中只需要设置Node的名字
    		nodeNameSlice := make([]string, 0, len(nodes))
    		for _, node := range nodes {
    			nodeNameSlice = append(nodeNameSlice, node.Name)
    		}
    		nodeNames = &nodeNameSlice
    	} else {
    	// 如果调度扩展程序没有缓存Node信息,就只能把全量的Node放在参数中
    		nodeList = &v1.NodeList{}
    		for _, node := range nodes {
    			nodeList.Items = append(nodeList.Items, *node)
    		}
    	}
    
      // 构造HTTP请求参数
    	args = &schedulerapi.ExtenderArgs{
    		Pod:       pod,
    		Nodes:     nodeList,
    		NodeNames: nodeNames,
    	}
    	//发送请求
    	if err := h.send(h.filterVerb, args, &result); err != nil {
    		return nil, nil, err
    	}
    	if result.Error != "" {
    		return nil, nil, fmt.Errorf(result.Error)
    	}
    
    	// 如果调度扩展程序缓存Node信息并且结果中设置了Node名字
    	if h.nodeCacheCapable && result.NodeNames != nil {
    		nodeResult = make([]*v1.Node, 0, len(*result.NodeNames))
        // 根据返回结果的Node名字找到Node并输出
    		for i := range *result.NodeNames {
    			nodeResult = append(nodeResult, nodeNameToInfo[(*result.NodeNames)[i]].Node())
    		}
    	} else if result.Nodes != nil {
        // 直接从结果中获取Node
    		nodeResult = make([]*v1.Node, 0, len(result.Nodes.Items))
    		for i := range result.Nodes.Items {
    			nodeResult = append(nodeResult, &result.Nodes.Items[i])
    		}
    	}
    
    	return nodeResult, result.FailedNodes, nil
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62

    # Send 实现 (opens new window)

    
    func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
    // 将请求参数(比如filter和prioritize请求是ExtenderArgs,preempt请求是ExtenderPreemptionArgs)序列化为JSON格式。
    	out, err := json.Marshal(args)
    	if err != nil {
    		return err
    	}
    
    // 格式化请求的最终URL
    	url := strings.TrimRight(h.extenderURL, "/") + "/" + action
    	req, err := http.NewRequest("POST", url, bytes.NewReader(out))
    	if err != nil {
    		return err
    	}
    
    	req.Header.Set("Content-Type", "application/json")
      // 发送HTTP请求
    	resp, err := h.client.Do(req)
    	if err != nil {
    		return err
    	}
    	defer resp.Body.Close()
      // 检查HTTP的状态码,如果不是200就返回错误
    	if resp.StatusCode != http.StatusOK {
    		return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
    	}
    
    	return json.NewDecoder(resp.Body).Decode(result)
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29

    # 参考文档

    • https://github.com/jindezgm/k8s-src-analysis/blob/master/kube-scheduler/KubeSchedulerConfiguration.md#Extender
    • https://blog.csdn.net/weixin_42663840/article/details/117430830 (opens new window)
    编辑 (opens new window)
    #scheduler
    上次更新: 2025/01/19, 23:15:59
    k8s scheduler调度算法及源码分析

    ← k8s scheduler调度算法及源码分析

    最近更新
    01
    云原生资源
    05-25
    02
    快速搭建Spring项目
    03-27
    03
    kafka版本迭代说明
    03-11
    更多文章>
    Theme by Vdoing | Copyright © 2018-2025 LiFengMing | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式