简介
Scheduler extender
Scheduler extender 扩展Scheduler 的三种方式
- by adding these rules to the scheduler and recompiling, described here 改源码
- implementing your own scheduler process that runs instead of, or alongside of, the standard Kubernetes scheduler, 另写一个scheduler。 多个Scheduler 可以共存,pod
spec.schedulerName
来指定 pod 由哪个Scheduler 调度 - implementing a “scheduler extender” process that the standard Kubernetes scheduler calls out to as a final pass when making scheduling decisions. 给默认Scheduler 做参谋长
This approach is needed for use cases where scheduling decisions need to be made on resources not directly managed by the standard Kubernetes scheduler. The extender helps make scheduling decisions based on such resources. (Note that the three approaches are not mutually exclusive.) 第三种方案一般用在 调度决策依赖于 非默认支持的资源的场景
使用
scheduler 调用 SchedulerExtender
Create a custom Kubernetes scheduler 以二进制运行为例,kube-scheduler 启动命令一般为 kube-scheduler --address=0.0.0.0 --kubeconfig=/etc/kubernetes/kube-scheduler.kubeconfig --leader-elect=true
。kube-scheduler 启动时可以指定 –config 参数,对应一个yaml 配置文件,带有 scheduler-extender 示例如下
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
clientConnection:
kubeconfig: "/etc/kubernetes/kube-scheduler.kubeconfig" # kubeconfig 文件
algorithmSource:
policy:
file:
path: "/etc/kubernetes/scheduler-extender-policy.json" # 指定自定义调度策略文件
scheduler-extender 策略文件示例如下
{
"kind" : "Policy",
"apiVersion" : "v1",
"extenders" : [{
"urlPrefix": "http://localhost:8888/",
"filterVerb": "filter",
"prioritizeVerb": "prioritize",
"weight": 1,
"enableHttps": false
}]
}
policy文件定义了一个 HTTP 的扩展程序服务,该服务运行在 127.0.0.1:8888
下面,并且已经将该策略注册到了默认的调度器中,这样在过滤和打分阶段结束后,可以将结果分别传递给该扩展程序的端点 <urlPrefix>/<filterVerb>=http://localhost:8888/filter
和 <urlPrefix>/<prioritizeVerb>=http://localhost:8888/prioritize
做进一步过滤和打分。
源码分析
// Scheduler 的核心组件genericScheduler 聚合了 SchedulerExtender
type genericScheduler struct {
cache internalcache.Cache
schedulingQueue internalqueue.SchedulingQueue
extenders []SchedulerExtender
...
}
// k8s.io/kubernetes/pkg/scheduler/core/extender.go
type SchedulerExtender interface {
Name() string
Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)
Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error)
Bind(binding *v1.Binding) error
...
}
// SchedulerExtender 默认实现 HTTPExtender
type HTTPExtender struct {
extenderURL string
preemptVerb string
filterVerb string
prioritizeVerb string
bindVerb string
weight int64
client *http.Client
ignorable bool
}
HTTPExtender本质上是一个 webhook, SchedulerExtender.Filter 会在 genericScheduler.Schedule 时被执行
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
...
filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
...
}
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
filteredNodesStatuses := make(framework.NodeToStatusMap)
filtered, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses)
return filtered, filteredNodesStatuses, nil
}
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
for _, extender := range g.extenders {
...
filteredList, failedMap, err := extender.Filter(pod, filtered)
...
}
return filtered, nil
}
示例实现
SchedulerExtender 首先是一个 http server,为了不影响scheduler 的调度, 应确保 http 接口响应时间不要过长。
func main() {
router := httprouter.New()
router.GET("/", Index)
router.POST("/filter", Filter)
router.POST("/prioritize", Prioritize)
log.Fatal(http.ListenAndServe(":8888", router))
}
以Filter 逻辑为例,Filter 方法入参和出餐 被限定为 schedulerapi.ExtenderArgs 和 schedulerapi.ExtenderFilterResult
func filter(args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
var filteredNodes []v1.Node
failedNodes := make(schedulerapi.FailedNodesMap)
pod := args.Pod
for _, node := range args.Nodes.Items {
fits, failReasons, _ := podFitsOnNode(pod, node)
if fits {
filteredNodes = append(filteredNodes, node)
} else {
failedNodes[node.Name] = strings.Join(failReasons, ",")
}
}
result := schedulerapi.ExtenderFilterResult{
Nodes: &v1.NodeList{Items: filteredNodes,},
FailedNodes: failedNodes,
Error: "",
}
return &result
}
func podFitsOnNode(pod *v1.Pod, node v1.Node) (bool, []string, error) {
fits := true
failReasons := []string{}
// 做一下逻辑判断
return fits, failReasons, nil
}
自定义实现
scheduler 既然提供了那么多扩展的plugin,我们可以自定义实现一个调度器,调度器的本质就是监听pending pod,并给pod 设置node
- 从0 到1 自己实现,插件 机制自己写
- 自己基于controller-runtime 实现
- 自己使用client-go 原生实现,比如karmada 集群调度器(调度的不是pod)
- 复用k8s scheduler 的整体设计,自己只是扩展插件,比如cnych/sample-scheduler-framework
使用
需要设置一个 ConfigMap ,用于存放调度器的配置文件
apiVersion: v1
kind: ConfigMap
metadata:
name: scheduler-config
namespace: kube-system
data:
scheduler-config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: true
lockObjectName: scheduler-framework-sample
lockObjectNamespace: kube-system
profiles:
- schedulerName: scheduler-framework-sample
plugins:
filter:
enabled:
- name: "sample"
preBind:
enabled:
- name: "sample" # enable 才能保证该扩展点运行了你的插件
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: scheduler-framework-sample
spec:
replicas: 1
template:
spec:
volumes:
- name: scheduler-config
configMap:
name: scheduler-config
containers:
- name: scheduler-ctrl
args:
- scheduler-framework-sample
- --config=/scheduler/scheduler-config.yaml
- --v=3
volumeMounts:
- name: scheduler-config
mountPath: /scheduler
源码分析
sample-scheduler-framework
/cmd
/scheduler
/main.go
/pkg
/plugins
/sample
/sample.go
k8s.io/kubernetes/cmd/kube-scheduler/app
提供了 NewSchedulerCommand
- app.WithPlugin 即可将 自定义plugin 注入到 默认scheduler 中
- cmd.Execute 即可直接启动 默认scheduler 的代码。
// main.go
import (
"fmt"
"math/rand"
"os"
"time"
"github.com/angao/scheduler-framework-sample/pkg/plugins/sample"
"k8s.io/component-base/logs"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
)
func main() {
...
cmd := app.NewSchedulerCommand(
app.WithPlugin(sample.Name, sample.New),
)
if err := cmd.Execute(); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
app.WithPlugin 如何让自定义 plugin 被默认 scheduler 调用呢?
// kubernetes/cmd/kube-scheduler/app/server.go
func WithPlugin(name string, factory runtime.PluginFactory) Option {
return func(registry runtime.Registry) error {
return registry.Register(name, factory)
}
}
// kubernetes/pkg/scheduler/framework/runtime/registry.go
type Registry map[string]PluginFactory
func (r Registry) Register(name string, factory PluginFactory) error {
r[name] = factory
return nil
}
func (r Registry) Unregister(name string) error {
delete(r, name)
return nil
}
自定义plugin 先是被保存到了 Registry 中,最后经由 Configurator 传递给 Scheduler.Profiles
// kubernetes/pkg/scheduler/scheduler.go
func Setup(ctx context.Context, opts *options.Options, ...){
...
outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}
sched, err := scheduler.New(cc.Client,
...
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
)
...
}
func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option {
return func(o *schedulerOptions) {
o.frameworkOutOfTreeRegistry = registry
}
}
func New(client clientset.Interface,...)(*Scheduler, error){
...
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
...
configurator := &Configurator{
registry: registry,
extenders: options.extenders,
...
}
}
自定义插件实现例子
// sample.go
const (
// Name is plugin name
Name = "sample"
)
var _ framework.FilterPlugin = &Sample{}
var _ framework.PreBindPlugin = &Sample{}
type Sample struct {
handle framework.Handle
}
func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
return &Sample{
handle: handle,
}, nil
}
func (s *Sample) Name() string {
return Name
}
func (s *Sample) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
klog.V(2).Infof("filter pod: %v", pod.Name)
return framework.NewStatus(framework.Success, "")
}
func (s *Sample) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
klog.V(2).Infof("prebind node info: %+v", nodeInfo.Node())
return framework.NewStatus(framework.Success, "")
}