5 分钟阅读

简介

SchedulerCache

借一下kube-batch 设计图

SchedulerCache 将调度所需的数据缓存起来,并保持与apiserver 同步。Cache 模块还封装了对 API server 接口的调用。比如 Cache.Bind 接口,会调用 API Server 的 Bind 接口,将容器绑定到指定节点上。在 kube-batch/volcano 中,只有 cache 模块需要和 API Server 交互,其他模块只需要调用 Cache 模块接口。

type SchedulerCache struct {
   xxInformer	...	    // 各种informer
   ...
   Jobs                 map[schedulingapi.JobID]*schedulingapi.JobInfo
   Nodes                map[string]*schedulingapi.NodeInfo
   Queues               map[schedulingapi.QueueID]*schedulingapi.QueueInfo
   PriorityClasses      map[string]*v1beta1.PriorityClass
}

SchedulerCache 会持有很多informer, 初始化的 informer 注册各个 eventHandler,然后pod/podgroup等变动会被同步在 Jobs, Nodes, Queues, PriorityClasses 等几个 map 中。pg 加入jobInfo,pod 加入taskInfo。

session 与调度周期

Session 模块是将action/plugin/cache三个模块串联起来的模块。Kube-batch 在每个调度周期开始时,都会新建一个 Session 对象,这个 Session 初始化时会做以下操作:

  1. 调用 Cache.Snapshot 接口,将 Cache 中节点、任务和队列信息拷贝一份副本,之后在这个调度周期中使用这份副本进行调度。因为 Cache 的数据会不断变化,为了保持同个调度周期中的数据一致性,在一开始就拷贝一份副本。PS:在一个调度周期,基于snapshot 数据,找到当前资源可以运行的最高优先级的pod,优先调度。全局决策,也是批量调度的一个内涵。
  2. 将配置中的各个 plugin 初始化,然后调用 plugin 的 OnSessionOpen 接口。plugin 在 OnSessionOpen 中,会初始化自己需要的数据,并将一些回调函数注册到 session 中。

plugin 会根据自己的语义 注册相关的函数到 Session中,在Action.Execute 中被调用。

// kube-batch/pkg/scheduler/framework/session.go
type Session struct {
	UID types.UID
	cache cache.Cache
	Jobs    map[api.JobID]*api.JobInfo
	Nodes   map[string]*api.NodeInfo
	Queues  map[api.QueueID]*api.QueueInfo
	Backlog []*api.JobInfo
	Tiers   []conf.Tier
	plugins          map[string]Plugin
	eventHandlers    []*EventHandler   
}

Action 只有一个Session 一个入参,从 Session.jobs 等拿到数据,处理完成后写回Session.jobs 等,Session 既是数据载体,也是action 之间的信息传递渠道。比如 Enqueue action 将session.Jobs 中符合条件的job 状态从pending 改为非pending,allocate/backfill action 不处理pending状态的job。 allocate 不处理 request resource 为空的task,backfill 会处理。

type Session struct {
	jobOrderFns      map[string]api.CompareFn	// 决定哪个训练任务优先被处理(调度、回收、抢占)。
	queueOrderFns    map[string]api.CompareFn	// 决定哪个训练队列优先被处理。
	taskOrderFns     map[string]api.CompareFn	// 决定任务中哪个容器优先被处理。
	predicateFns     map[string]api.PredicateFn		// 判断某个节点是否满足容器的基本调度要求。比如容器中指定的节点的标签。
	nodePrioritizers map[string][]priorities.PriorityConfig	//  当多个节点满足容器的调度要求时,优先选择哪个节点。
	preemptableFns   map[string]api.EvictableFn		//  决定某个容器是否可以被抢占
	reclaimableFns   map[string]api.EvictableFn		// 决定某个容器是否可以被回收
	overusedFns      map[string]api.ValidateFn		// 决定某个队列使用的资源是否超过限额,是的话不再调度对队列中的任务
	jobReadyFns      map[string]api.ValidateFn		// 判断某个任务是否已经准备好,可以调用 API Server 的接口将任务的容器调度到节点。
	jobPipelinedFns  map[string]api.ValidateFn		// 判断某个任务是否处于 Pipelined 状态
	jobValidFns      map[string]api.ValidateExFn	// 判断某个任务是否有效
	
}
// kube-batch/pkg/scheduler/framework/session_plugins.go
func (ssn *Session) AddJobReadyFn(name string, vf api.ValidateFn) {...}
func (ssn *Session) JobReady(obj interface{}) bool {...jobValidFns...}
func (ssn *Session) AddJobValidFn(name string, fn api.ValidateExFn) {...}
func (ssn *Session) JobValid(obj interface{}) *api.ValidateResult {...jobReadyFns...}
// kube-batch/pkg/scheduler/framework/session.go
func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error {...}
func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error {...}
func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error {...}

Session 一共有两类方法

  1. session_plugins,与plugin 相关的各种Function 注入与调用;
  2. 真正操作Pod的Allocate/Pipeline/Evict。 Action.Execute中,Action 依次遍历 pending 状态的task,根据session_plugins方法判断task 和job 状态,最终调用Pod的Allocate/Pipeline/Evict。这或许是Action 和Plugin ,机制和策略分离的一种解释。

gang-scheduler 如何实现

kube-batch 具体如何实现gang scheduler kube-batch 从代码中找出gang scheduler这个过程

gang-scheduler 非常类似分布式事务/tcc,tcc 有一个预留的动作,要实现gang-scheduler的效果,Pod 自带的Pending/Running/Succeeded/Failed/Unknown 是不够的, 为此Pod 对应struct TaskInfo 定义了Pending/Allocated/Pipelined/Binding/Bound/Running/Releasing/Succeeded/Failed/Unknown 状态,其中 Allocated 用来标记pod 已分配资源但未实际运行的状态。

当需要进行gang-scheduler 时,上层operator/controller 会将pod 的schedulerName 设置为kube-batch或volcano,并带上 annotation scheduling.k8s.io/group-name,创建 name= scheduling.k8s.io/group-name 的podgroup,即podgroup 和 pod 通过scheduling.k8s.io/group-name 关联。

一个podGroup 对应一个JobInfo,kube-batch 将pod 转换为taskInfo,每一个node对应NodeInfo,所谓 为pod分配Node:taskInfo.NodeName=nodeName,NodeInfo减去pod 标定的资源。当发现 JobInfo 下的taskInfo 符合minMember,即真正为 pod 赋值nodeName。具体代码还要再捋捋。

func (alloc *allocateAction) Execute(ssn *framework.Session) {
	...
	// 对queue和job 进行排序  queues 和jobs 都是优先级队列
	for {
		if queues.Empty() {break}
		queue := queues.Pop().(*api.QueueInfo)	// 取出优先级最高的queue
		if ssn.Overused(queue) {continue} // 某个queue 占用的资源过多,不再为其pod进行调度了
		jobs, found := jobsMap[queue.UID]	 // 取出queue 对应的jobs
		job := jobs.Pop().(*api.JobInfo)
		// 赋值api.Pending状态的task 到  pendingTasks
		tasks := pendingTasks[job.UID]
		glog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
		tasks.Len(), job.Namespace, job.Name)
		for !tasks.Empty() {
			task := tasks.Pop().(*api.TaskInfo)
			predicateNodes := util.PredicateNodes(task, allNodes, predicateFn)  // 预选
			if len(predicateNodes) == 0 {break} 
			priorityList, err := util.PrioritizeNodes(task, predicateNodes, ssn.NodePrioritizers()) // 优选
			if err != nil {break} 
			nodeName := util.SelectBestNode(priorityList)
			node := ssn.Nodes[nodeName]
			// Allocate idle resource to the task.
			if task.InitResreq.LessEqual(node.Idle) {
				if err := ssn.Allocate(task, node.Name); err != nil {...} // 绑定task到node
			} else {  
				//store information about missing resources
				job.NodesFitDelta[node.Name] = node.Idle.Clone()
				job.NodesFitDelta[node.Name].FitDelta(task.InitResreq)
				// Allocate releasing resource to the task if any.
				if task.InitResreq.LessEqual(node.Releasing) {
					if err := ssn.Pipeline(task, node.Name); err != nil {...}
				}
			}
			// job ready(比如job一共10个 minMember=5)当前job 放在jobs的最后,还剩的5的pod 调度优先级就不高了,可以放放,暂停对这个job的调度
			if ssn.JobReady(job) && !tasks.Empty() {  
				jobs.Push(job)
				break
			}
		}
		queues.Push(queue)  // Added Queue back until no job in Queue.
	}
}

Session.Allocate ==> if ssn.JobReady(job) Session.dispatch==> Cache.Bind(task *api.TaskInfo, hostname string) error 真正 更新pod 即设定pod.nodeName。

  1. 对于jobReadyFns 来说,只有gang plugin 注册了jobReadyFns 到Session 上,Session.JobReady 默认返回true。也就是,如果不用gang plugin,则每一次 Session.Allocate,job ready默认为true,为pod 真正分配node。
  2. 用了gang plugin之后,则每次Session.Allocate,要先校验 gang.jobReadyFns,校验通过,则为pod 真正分配node,否则只是将 task 标记为 api.Allocated ,记住了taskInfo 所属的nodeInfo,并在nodeInfo 中扣掉了taskInfo的资源NodeInfo.addTask
  3. 比如job一共10个task, minMember=5。如果已经分配了4个,第5找不到合适的节点。gang plugin 向Session 注册了 ReclaimableFn/PreemptableFn(计算牺牲作业)。对于 这种状态的 job,这4个task 可以被抢占。如果已经分配了 6个,则多出来的一个task 也可以被抢占

Volcano 调度器源码分析(scheduler 03)整体上,gang plugins 的逻辑就是围绕着 jobInfos.MinAvailable 这个属性展开的,几个函数位点主要也是为了增加跟 jobInfos.MinAvailable 的比较条件。

资源配额管理

假设存在queue1/queue2,包含运行或排队的任务task1/task2/task3/task4, 在一个调度周期内,queue 和 task 是确定的,volcano 负责在这个确定的任务列表和资源约束下 调度排队的任务。

主要逻辑在哪

如何实现了Queue 的weight 资源软约束与capacity 硬约束?主要在 proportion plugin 中

在allocate.go 中,针对一个task 的调度,会先找到task 所属queue 富余的资源(包括cpu/memory/自定义),如果queue 富余资源够 task 使用,则资源申请成功,开始进入预选优选环节。

// queue 有没有资源运行task
taskRequest := task.Resreq.ResourceNames()
if underusedResources := ssn.UnderusedResources(queue); underusedResources != nil && !underusedResources.Contains(taskRequest) {
	klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name)
	continue
}
// 预选优选

一个queue富余资源的计算来自ssn.UnderusedResources ,只有proportion 一个plugin 注册了 underUsedFns,所以主要工作 在 proportion plugin 里。

如何计算 queue.deserved

type proportionPlugin struct {
    // 集群总资源
	totalResource *api.Resource     
	queueOpts     map[api.QueueID]*queueAttr
	// Arguments given for the plugin
	pluginArguments framework.Arguments
}
type queueAttr struct {
	queueID api.QueueID
	name    string
	weight  int32
	share   float64

	deserved  *api.Resource        // queue 应得的资源
	allocated *api.Resource        // running task 已经申请的资源
	request   *api.Resource        // running + pending task 准备申请的资源
	// inqueue represents the resource request of the inqueue job
	inqueue    *api.Resource
	capability *api.Resource        // queue capability 配置
}

一个queue 对应 一个queueAttr,proportionPlugin.OnSessionOpen 主要工作就是 为queue 的deserved/allocated/request/capability 属性赋值

  1. 其中capability 从queue配置获取
  2. allocated/request 从queue 所属的 pod 申请的资源累加得到
  3. deserved 需要计算,分为多次循环,每次循环尝试给 queue 一点资源,直到给queue 的资源满足request,这个queue的分配就结束。这一点资源= weight * remaing,下文称weight remaing resoruce (简称wrr)

如何计算 queue.deserved

每个queue deserved 初始为0
for{
    for 每个queue{
        deserved = deserved + wrr
        1. 如果 deserved 大于 capability,则取  deserved/capability/request 较小值,不管资源够不够用,meet了,不再继续分配资源了
        2. 如果 deserved 小于 capability 大于 request,也是meet 了 退出循环
        3. 如果 deserved 小于 request ,那就先给其它queue 分配资源,若还有剩余remaining 再来一轮
    }
    更新remaining
    如果上一次remaining 和这一次 没有变化或者没有remaining了,说明queue 都分配好了或没有资源了,退出分配过程
}

deserved 计算的本质是 三者求最小,即min(deserved,capacity,request)。

假设资源总量是100,queue1和queue2 weight=1

  1. 一次分配循环给 queue 属于它weight 配额的资源(deserved += wrr),够了就算了(deserved > request),不够等别人的配额分完了再来一轮。
  2. 哪怕集群资源足够,proportion 也只是给queue 分配刚好够 request的资源。attr.deserved = helpers.Min(attr.deserved, attr.request), 假设queue1,queue2 request=30 那么最终 queue1,queue2 deserved=30
  3. 结合前2点,可能会出现一种情况:假设queue1 request=40,queue2 request=60,则第一轮queue1 deserved=40,queue2 deserved=50,第二轮queue2 deserved=60

其它:在一个调度周期内,一个queue 不会用到超过 deserved 的资源。 多轮调度周期下, t1 时刻 queue.deserve 是一个值,因为有新的task等导致 t2 时刻queue.deserved 值变小,此刻,可能会出现一个queue的 allocated 大于 deserved的情形。

如何判断queue 是否可以执行

proportion.underUsedFns 计算富余资源:session 初始化的核心是 计算Queue的 deserved ,之后每次 调度task 时,对queue deserved 与 allocated 求差值即 得到富余的资源,进而确定 富余资源是否够 task 运行。

plugin为什么要分成tiers

actions: "enqueue, allocate, backfill"
tiers:
- plugins:
	- name: priority			# 让用户自定义job、task优先级
	- name: gang				# 满足了调度过程中的“All or nothing”的调度需求
	- name: conformance			# 除了 critical pod 都ok
- plugins:
	- name: drf					# 优先占用资源小的业务
	- name: predicates			# 快速筛选出来需要GPU的进行集中调度
	- name: proportion			# 如果投递的作业量超过所属queue最大可用资源,就需要排队。
	- name: nodeorder			# 从各个维度为node打分,打分参数由用户来配置。
	- name: binpack				# binpack调度算法的目标是尽量把已有的节点填满,每种资源配置的权重值、不同的插件在计算节点分数时权重值由用户来配置

为什么要分成tiers?背景:每个plugin 注册了一堆函数,action 会在会在适当的实际调用Session.函数()执行。Session.函数() 的大体逻辑都是 遍历plugin 注册的所有函数并执行,每个plugin 只注册了跟自己逻辑有关的函数。

  1. priority, TaskOrderFn/JobOrderFn/PreemptableFn
  2. gang, JobValidFn/ReclaimableFn/PreemptableFn/JobOrderFn/JobReadyFn/JobPipelinedFn/JobStarvingFns
  3. conformance, PreemptableFn/ReclaimableFn
  4. drf, PreemptableFn/QueueOrderFn/ReclaimableFn/JobOrderFn/NamespaceOrderFn
  5. predicates, PredicateFn
  6. proportion, QueueOrderFn/ReclaimableFn/OverusedFn/UnderusedResourceFn/JobEnqueueableFn
  7. nodeorder, NodeOrderFn/BatchNodeOrderFn
  8. binpack , NodeOrderFn

Session.函数() 核心逻辑是两层循环,分为三种情况

  1. “一言不合”直接返回的
     func (ssn *Session) xx() xx{
         for _, tier := range ssn.Tiers {
             for _, plugin := range tier.Plugins {
                 if(xx){
                     return xx
                 }
             }
         }
         return xx
     }
    
  2. 所有plugin 一起配合计算的,比如给某个node 打分
     func (ssn *Session) xx() xx{
         for _, tier := range ssn.Tiers {
             for _, plugin := range tier.Plugins {
                 sum += xx
             }
         }
         return sum
     }
    
  3. tier内 的所有plugin 参与计算。比如 Reclaimable 决定回收哪些正在运行的pod,即寻找victim。如果在第一层tier 中可以找到牺牲者 就直接返回了,毕竟能牺牲少点就牺牲少点,实在不行,才会计算第二层tier。
     func (ssn *Session) xx() xx{
         var victims []*api.TaskInfo
         for _, tier := range ssn.Tiers {
             for _, plugin := range tier.Plugins {
                 // 寻找victim
             }
             if victims != nil {
                 return victims
             }	
         }
         return nil
    	
    

    前两种情况,是不需要区分两层的,此时所有的plugin 先后顺序是重要的,是不是在一个tier 里不重要,即要么立即结束要么全局聚合。第三种情况, tier内 的plugin 局部聚合,两层for 之间做判断,如果有数据则 return。以默认的配置文件scheudler.conf来说,第一个tier 更多是基于用户设置,第二个tier 是基于task 和集群的实际情况,以用户设置为优先。

其它

鲁直:做开源,我觉得首先肯定要做一个心理准备,就是说你要有一个核心部分,再在这个基础上做扩展,在维护的成本上肯定有一定的上升,但是你要接受这样的成本——我觉得这种成本是可以接受的。……项目本身要设计好,具备一定分拆的可能性。如果不具备分拆可能性,那没法做了。像微内核这样的设计方式就会比较适合——就是开源一个核心模块,然后再去扩展,各种模块是可插拔的。

  1. job plugin:ssh提供免密登陆,svc提供作业运行所需要的网络信息如host文件、headless service等,来提供计算集群参数的自动化配置。1.7版本提供了Pytorch Job强化功能插件
  2. policies字段:一旦有pod被杀死,重启还是算了?RestartJob/CompeletedJob。因为这种HPC场景、基于Gang的训练作业,如果一个worker运行失败,通常来说整个作业运行下去是没什么意义的。
  3. volcano 中代码做了优化,将很多实际操作的逻辑从 session 剥离到 statment 中。statement 支持Evict/Pipeline/Allocate/Discard/Commit,关键就是这个Discard/Commit,前面几个操作(都有两个对象的小写方法,正反向操作,比如allocate/unallocate)被调用时并没有真正执行,而是一起丢弃或提交,有点类似sql 中的事务

留下评论