技术

LLM一些探索 Agent实践 LLM预训练 向量数据库的一些考量 fastapi+sqlalchemy进行项目开发 LLM微调实践 Python协程实现 Agent Functon Calling LLamaIndex入门 Multi-Agent探索 Python虚拟机 LLM工作流编排 Python实践 下一个平台Agent 激发LLM涌现——提示工程 LLM微调理论 大佬沉思 LLM外挂知识库 LLMOps 多模态LLM Python一些比较有意思的库 Transformers源码学习 LangChain源码学习 通用分布式计算引擎Ray Python并发 go依赖注入 go collection gc的基本原理 golang性能分析及优化 数据湖 高性能计算与存储 Linux2.1.13网络源代码学习 《大数据经典论文解读》 三驾马车学习 Spark 内存管理及调优 Yarn学习 从Spark部署模式开始讲源码分析 容器狂占内存资源怎么办? 多角度理解一致性 golang io使用及优化模式 Flink学习 c++学习 学习ebpf go设计哲学 ceph学习 学习mesh kvm虚拟化 学习MQ go编译器以及defer实现 学习go 为什么要有堆栈 汇编语言 计算机组成原理 运行时和库 Prometheus client mysql 事务 mysql 事务的隔离级别 mysql 索引 坏味道 学习分布式 学习网络 学习Linux go堆内存分配 golang 系统调用与阻塞处理 Goroutine 调度过程 重新认识cpu mosn有的没的 负载均衡泛谈 单元测试的新解读 《Redis核心技术与实现》笔记 《Prometheus监控实战》笔记 Prometheus 告警学习 calico源码分析 对容器云平台的理解 Prometheus 源码分析 并发的成本 基础设施优化 hashicorp raft源码学习 docker 架构 mosn细节 与微服务框架整合 Java动态代理 编程范式 并发通信模型 《网络是怎样连接的》笔记 go channel codereview gc分析 jvm 线程实现 go打包机制 go interface及反射 如何学习Kubernetes 《编译原理之美》笔记——后端部分 《编译原理之美》笔记——前端部分 Pilot MCP协议分析 go gc 内存管理玩法汇总 软件机制 istio流量管理 Pilot源码分析 golang io 学习Spring mosn源码浅析 MOSN简介 《datacenter as a computer》笔记 学习JVM Tomcat源码分析 Linux可观测性 学习存储 学计算 Gotty源码分析 kubernetes operator kaggle泰坦尼克问题实践 kubernetes扩缩容 神经网络模型优化 直觉上理解深度学习 如何学习机器学习 TIDB源码分析 什么是云原生 Alibaba Java诊断工具Arthas TIDB存储——TIKV 《Apache Kafka源码分析》——简介 netty中的线程池 guava cache 源码分析 Springboot 启动过程分析 Spring 创建Bean的年代变迁 Linux内存管理 自定义CNI IPAM 共识算法 spring redis 源码分析 kafka实践 spring kafka 源码分析 Linux进程调度 让kafka支持优先级队列 Codis源码分析 Redis源码分析 C语言学习 《趣谈Linux操作系统》笔记 docker和k8s安全访问机制 jvm crash分析 Prometheus 学习 Kubernetes监控 Kubernetes 控制器模型 容器日志采集 容器狂占资源怎么办? Kubernetes资源调度——scheduler 时序性数据库介绍及对比 influxdb入门 maven的基本概念 《Apache Kafka源码分析》——server Kubernetes类型系统 源码分析体会 《数据结构与算法之美》——算法新解 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 ansible学习 Kubernetes源码分析——从kubectl开始 jib源码分析之Step实现 线程排队 jib源码分析之细节 跨主机容器通信 jib源码分析及应用 为容器选择一个合适的entrypoint kubernetes yaml配置 《持续交付36讲》笔记 mybatis学习 程序猿应该知道的 无锁数据结构和算法 CNI——容器网络是如何打通的 为什么很多业务程序猿觉得数据结构和算法没用? 串一串一致性协议 当我在说PaaS时,我在说什么 《数据结构与算法之美》——数据结构笔记 PouchContainer技术分享体会 harbor学习 用groovy 来动态化你的代码 精简代码的利器——lombok 学习 《深入剖析kubernetes》笔记 编程语言那些事儿 rxjava3——背压 rxjava2——线程切换 spring cloud 初识 《深入拆解java 虚拟机》笔记 《how tomcat works》笔记 hystrix 学习 rxjava1——概念 Redis 学习 TIDB 学习 如何分发计算 Storm 学习 AQS1——论文学习 Unsafe Spark Stream 学习 linux vfs轮廓 《自己动手写docker》笔记 java8 实践 中本聪比特币白皮书 细读 区块链泛谈 比特币 大杂烩 总纲——如何学习分布式系统 hbase 泛谈 forkjoin 泛谈 看不见摸不着的cdn是啥 《jdk8 in action》笔记 程序猿视角看网络 bgp初识 calico学习 AQS——粗略的代码分析 我们能用反射做什么 web 跨域问题 《clean code》笔记 《Elasticsearch权威指南》笔记 mockito简介及源码分析 2017软件开发小结—— 从做功能到做系统 《Apache Kafka源码分析》——clients dns隐藏的一个坑 《mysql技术内幕》笔记 log4j学习 为什么netty比较难懂? 递归、回溯、动态规划 apollo client源码分析及看待面向对象设计 学习并发 docker运行java项目的常见问题 OpenTSDB 入门 spring事务小结 分布式事务 javascript应用在哪里 《netty in action》读书笔记 netty对http2协议的解析 ssl证书是什么东西 http那些事 苹果APNs推送框架pushy apple 推送那些事儿 编写java框架的几大利器 java内存模型和jvm内存布局 java exception Linux IO学习 netty内存管理 测试环境docker化实践 netty在框架中的使用套路 Nginx简单使用 《Linux内核设计的艺术》小结 Go并发机制及语言层工具 Linux网络源代码学习——数据包的发送与接收 《docker源码分析》小结 docker namespace和cgroup zookeeper三重奏 数据库的一些知识 Spark 泛谈 链式处理的那些套路 netty回顾 Thrift基本原理与实践(二) Thrift基本原理与实践(一) 回调 异步执行抽象——Executor与Future Docker0.1.0源码分析 java gc Jedis源码分析 深度学习泛谈 Linux网络命令操作 JTA与TCC 换个角度看待设计模式 Scala初识 向Hadoop学习NIO的使用 以新的角度看数据结构 并发控制相关的硬件与内核支持 systemd 简介 quartz 源码分析 基于docker搭建测试环境(二) spring aop 实现原理简述 自己动手写spring(八) 支持AOP 自己动手写spring(七) 类结构设计调整 分析log日志 自己动手写spring(六) 支持FactoryBean 自己动手写spring(九) 总结 自己动手写spring(五) bean的生命周期管理 自己动手写spring(四) 整合xml与注解方式 自己动手写spring(三) 支持注解方式 自己动手写spring(二) 创建一个bean工厂 自己动手写spring(一) 使用digester varnish 简单使用 关于docker image的那点事儿 基于docker搭建测试环境 分布式配置系统 JVM执行 git maven/ant/gradle/make使用 再看tcp kv系统 java nio的多线程扩展 《Concurrency Models》笔记 回头看Spring IOC IntelliJ IDEA使用 Java泛型 vagrant 使用 Go常用的一些库 Python初学 Goroutine 调度模型 虚拟网络 《程序员的自我修养》小结 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件 Go基础 JVM类加载 硬币和扑克牌问题 LRU实现 virtualbox 使用 ThreadLocal小结 docker快速入门

架构

大模型推理tips RAG向量检索与微调 dddfirework源码分析 RAG与知识图谱 大模型推理服务框架vLLM 大模型推理服务框架 模型服务化(未完成) 大模型Post-Training 大模型训练 大模型推理 从Attention到Transformer k8s设备管理 ddd从理念到代码 如何应用LLM 小鼠如何驾驭大象(LLM)? 多类型负载协调员Koordinator controller-runtime细节分析 finops学习 kubevela多集群 kubevela中cue的应用 基于k8s的工作流 kubevela源码分析 容器和CPU那些事儿 数据集管理fluid 应用管理平台kubevela karmada支持crd 多集群管理 AutoML和AutoDL 特征平台 实时训练 分布式链路追踪 K8S YAML 资源清单管理方案 tensorflow原理——python层分析 如何学习tensorflow 数据并行——allreduce 数据并行——ps embedding的原理及实践 机器学习中的python调用c 机器学习训练框架概述 tensornet源码分析 大模型训练和推理 X的生成——特征工程 tvm tensorflow原理——core层分析 模型演变 《深度学习推荐系统实战》笔记 keras 和 Estimator tensorflow分布式训练 分布式训练的一些问题 基于Volcano的弹性训练 图神经网络 pytorch弹性分布式训练 从混部到统一调度 从RNN到Attention pytorch分布式训练 CNN 《动手学深度学习》笔记 pytorch与线性回归 多活 volcano特性源码分析 推理服务 kubebuilder 学习 mpi 学习pytorch client-go学习 提高gpu 利用率 GPU与容器的结合 GPU入门 AI云平台梳理 tensorflow学习 tf-operator源码分析 k8s批处理调度/Job调度 喜马拉雅容器化实践 Kubernetes 实践 学习rpc BFF openkruise学习 可观察性和监控系统 基于Kubernetes选主及应用 《许式伟的架构课》笔记 Admission Controller 与 Admission Webhook 发布平台系统设计 k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 深入controller openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go informer源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? 《软件设计之美》笔记 mecha 架构学习 Kubernetes events学习及应用 CRI 资源调度泛谈 业务系统设计原则 grpc学习 元编程 以应用为中心 istio学习 下一代微服务Service Mesh 《实现领域驱动设计》笔记 概率论 serverless 泛谈 《架构整洁之道》笔记 处理复杂性 那些年追过的并发 服务器端编程 网络通信协议 架构大杂烩 如何学习架构 《反应式设计模式》笔记 项目的演化特点 反应式架构摸索 函数式编程的设计模式 服务化 ddd反模式——CRUD的败笔 研发效能平台 重新看面向对象设计 业务系统设计的一些体会 函数式编程 《左耳听风》笔记 业务程序猿眼中的微服务管理 DDD实践——CQRS 项目隔离——案例研究 《编程的本质》笔记 系统故障排查汇总及教训 平台支持类系统的几个点 代码腾挪的艺术 abtest 系统设计汇总 《从0开始学架构》笔记 初级权限系统设计 领域驱动理念 现有上传协议分析 移动网络下的文件上传要注意的几个问题 推送系统的几个基本问题 做配置中心要想好的几个基本问题 不同层面的异步 分层那些事儿 性能问题分析 用户认证问题 资源的分配与回收——池 消息/任务队列

标签

k8s设备管理 多类型负载协调员Koordinator controller-runtime细节分析 finops学习 kubevela多集群 kubevela中cue的应用 基于k8s的工作流 kubevela源码分析 容器和CPU那些事儿 数据集管理fluid 应用管理平台kubevela karmada支持crd 多集群管理 K8S YAML 资源清单管理方案 从混部到统一调度 volcano特性源码分析 kubebuilder 学习 client-go学习 tf-operator源码分析 k8s批处理调度/Job调度 喜马拉雅容器化实践 Kubernetes 实践 openkruise学习 基于Kubernetes选主及应用 Admission Controller 与 Admission Webhook k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 深入controller openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go informer源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? Kubernetes events学习及应用 CRI 资源调度泛谈 如何学习Kubernetes 以应用为中心 kubernetes operator kubernetes扩缩容 serverless 泛谈 什么是云原生 自定义CNI IPAM docker和k8s安全访问机制 Kubernetes监控 Kubernetes 控制器模型 Kubernetes资源调度——scheduler Kubernetes类型系统 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 Kubernetes源码分析——从kubectl开始 kubernetes yaml配置 CNI——容器网络是如何打通的 当我在说PaaS时,我在说什么 《深入剖析kubernetes》笔记 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件
大模型推理tips LLM一些探索 Agent实践 LLM预训练 RAG向量检索与微调 LLM微调实践 RAG与知识图谱 大模型推理服务框架vLLM Agent Functon Calling LLamaIndex入门 Multi-Agent探索 LLM工作流编排 大模型推理服务框架 模型服务化(未完成) 大模型Post-Training 大模型训练 大模型推理 下一个平台Agent 从Attention到Transformer 激发LLM涌现——提示工程 LLM微调理论 大佬沉思 LLM外挂知识库 LLMOps 多模态LLM Transformers源码学习 LangChain源码学习 如何应用LLM 小鼠如何驾驭大象(LLM)? AutoML和AutoDL 特征平台 实时训练 tensorflow原理——python层分析 如何学习tensorflow 数据并行——allreduce 数据并行——ps embedding的原理及实践 机器学习中的python调用c 机器学习训练框架概述 tensornet源码分析 大模型训练和推理 X的生成——特征工程 tvm tensorflow原理——core层分析 模型演变 《深度学习推荐系统实战》笔记 keras 和 Estimator tensorflow分布式训练 分布式训练的一些问题 基于Volcano的弹性训练 图神经网络 pytorch弹性分布式训练 从RNN到Attention pytorch分布式训练 CNN 《动手学深度学习》笔记 pytorch与线性回归 推理服务 mpi 学习pytorch 提高gpu 利用率 GPU与容器的结合 GPU入门 AI云平台梳理 tensorflow学习 kaggle泰坦尼克问题实践 神经网络模型优化 概率论 直觉上理解深度学习 如何学习机器学习 深度学习泛谈

openkruise cloneset学习

2020年08月12日

简介

openkruise 面向自动化场景的 Kubernetes workload扩展controller,它是一组controller,可在应用程序工作负载管理上扩展和补充Kubernetes核心控制器。cloneset 在很多方面上借鉴了 statefulset ,只是没有 statefulset 的 ordinal 序号。

类似功能 的还有 Argo Rollouts Argo Rollouts is a Kubernetes controller and set of CRDs which provide advanced deployment capabilities such as blue-green, canary, canary analysis, experimentation, and progressive delivery features to Kubernetes.

CloneSet

CloneSet 控制器提供了高效管理无状态应用的能力,一个简单的 CloneSet yaml 文件如下:

apiVersion: apps.kruise.io/v1alpha1
kind: CloneSet
metadata:
  labels:
    app: sample
  name: sample
spec:
  replicas: 5
  selector:
    matchLabels:
      app: sample
  template:
    metadata:
      labels:
        app: sample
    spec:
      containers:
      - name: nginx
        image: nginx:alpine

spec.template 中定义了当前 CloneSet 中最新的 Pod 模板。 控制器会为每次更新过的 spec.template 计算一个 revision hash 值。在运行过程中,还会额外 为cloneset 管理的pod 加上 label=controller-revision-hash 标记pod 所属的revision。

运行后,describe cloneset 示例:

Name:         sample
Namespace:    default
Labels:       app=sample
Annotations:  API Version:  apps.kruise.io/v1alpha1
Kind:         CloneSet
Metadata:
  Creation Timestamp:  2020-08-17T09:39:08Z
  Generation:          1
  Resource Version:    307518
  Self Link:           /apis/apps.kruise.io/v1alpha1/namespaces/default/clonesets/sample
  UID:                 65534592-a998-4374-a2c7-56eeb1dd273b
Spec:
  ...
Status:
  Available Replicas:      5
  Collision Count:         0
  Label Selector:          app=sample
  Observed Generation:     1
  Ready Replicas:          5
  Replicas:                5
  Update Revision:         sample-5cdbb7d879
  Updated Ready Replicas:  5
  Updated Replicas:        5

原地升级nginx pod 的status 示例

Labels:         app=sample
                apps.kruise.io/cloneset-instance-id=75v7q
                controller-revision-hash=sample-549647c4b4
Annotations:    inplace-update-state:
                  {"revision":"sample-549647c4b4","updateTimestamp":"2020-08-14T11:04:25Z","lastContainerStatuses":...}
Controlled By:  CloneSet/sample
Status:       Running               // Pending/Running/Succeeded/Failed/Unknown
Readiness Gates:
  Type                 Status
  InPlaceUpdateReady   True         // 原地升级添加的自定义Readiness Gates
Conditions:
  Type                 Status
  InPlaceUpdateReady   True
  Initialized          True         // 所有的 Init 容器 都已成功启动
  Ready                False        // Pod 可以为请求提供服务,并且应该被添加到对应服务的负载均衡池中
  ContainersReady      False        // Pod 中所有容器都已就绪
  PodScheduled         True         // Pod 已经被调度到某节点

CloneSet可以对标原生的 Deployment,但 CloneSet 提供了很多增强功能,比如扩缩容时删除特定pod,尤其提供了丰富的升级策略

spec:
    // 升级功能
    updateStrategy:
        type:               // 升级方式,默认为重建升级ReCreate,支持尽可能原地升级InPlaceIfPossible 和 只能原地升级InPlaceOnly
        inPlaceUpdateStrategy:  // 原地升级策略,比如graceful period  等
        partition:          // Partition 的语义是 保留旧版本 Pod 的数量,默认为 0。 如果在发布过程中设置了 partition,则控制器只会将 (replicas - partition) 数量的 Pod 更新到最新版本。
        MaxUnavailable:     // 最大不可用的Pod数量,是一个绝对值或百分比
        MaxSurge:           // 最大弹性数量,即最多能扩出来超过 replicas 的 Pod 数量,是一个绝对值或百分比。
        priorityStrategy:   // 优先级策略,升级顺序相关
            ...
        scatterStrategy:    // 打散策略,升级顺序相关
        paused:             // 为true时发布暂停
        PreUpdate:          // 升级前钩子函数
        PostUpdate:         // 升级后钩子函数
    // 扩缩容功能
    scaleStrategy:
        podsToDelete:       // 允许用户在缩小 replicas 数量的同时,指定想要删除的 Pod 名字

CloneSet status 中的字段说明:

  1. status.replicas: Pod 总数
  2. status.readyReplicas: ready Pod 数量
  3. status.availableReplicas: ready and available Pod 数量 (满足 minReadySeconds)
  4. status.updateRevision: 最新版本的 revision hash 值
  5. status.updatedReplicas: 最新版本的 Pod 数量
  6. status.updatedReadyReplicas: 最新版本的 ready Pod 数量

Partition 的语义是保留旧版本 Pod 的数量或百分比。比如说一个 100 个副本的 CloneSet,在升级镜像时将 partition 数值阶段性改为 80 -> 60 -> 40 -> 20 -> 0,则完成了分 5 批次发布。在灰度发布的过程中,只需要前后调节 partition 数值,就能灵活得控制新旧版本的比例数量。CloneSet 所依据的 “新旧版本” 对应的是其 status 中的 updateRevision 和 currentRevision。 cloneset作者提到:cloneset partition其实是继承了原生 statefulset 的 partition 理念,只是没有 statefulset 的 ordinal 序号。Partition 的语义是 保留旧版本 Pod 的数量,笔者曾觉得有点违反直觉。但如果 partition 来表示新版本数量的话,每次全量发布、扩容时都应同步设置partition 的值(与replicas保持一致),partition 的默认值就不能是0 或不填了。

Reconcile 逻辑

在kubebuilder 把Controller 控制器模型 的代码 都自动生成之后,不同Controller 之间的逻辑差异便只剩下 Reconcile 了

整体逻辑

背景知识

  1. Kubernetes v1.7 之后添加了一个 API 对象,名叫 ControllerRevision,专门用来记录某种 crd 对象的版本
  2. CloneSet Owned 三个资源:ControllerRevision、Pod、PVC。
  3. 控制器会为每次更新过的 spec.template 计算一个 revision hash 值并上报到 CloneSet status 中
  4. 比如上文中提到的 nginx,在创建之初拥有的第一个 template 版本,会创建一个对应的 ControllerRevision。而当修改了 image 版本之后,CloneSet Controller 会创建一个新的 ControllerRevision。通过ControllerRevision,CloneSet 可以很方便地管理不同版本,还原 CloneSet
  5. Pod label 中定义的 ControllerRevision hash(label name = “controller-revision-hash”),就是 ControllerRevision 的名字
// kruise/pkg/controller/cloneset/cloneset_controller.go
func (r *ReconcileCloneSet) Reconcile(request reconcile.Request) (reconcile.Result, error) {
    // ReconcileCloneSet.reconcileFunc = ReconcileCloneSet.doReconcile
	return r.reconcileFunc(request)
}
func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcile.Result, retErr error) {
	startTime := time.Now()
	// Fetch the CloneSet instance
	instance := &appsv1alpha1.CloneSet{}
	err := r.Get(context.TODO(), request.NamespacedName, instance)
	coreControl := clonesetcore.New(instance)
	selector, err := metav1.LabelSelectorAsSelector(instance.Spec.Selector)
	// list all active Pods and PVCs belongs to cs
	filteredPods, filteredPVCs, err := r.getOwnedResource(instance)
	//release Pods ownerRef
	filteredPods, err = r.claimPods(instance, filteredPods)
	// list all revisions and sort them
    revisions, err := r.controllerHistory.ListControllerRevisions(instance, selector)
    // 排序规则 byRevision.Less 优先根据 CreationTimestamp排序 其次根据Name
	history.SortControllerRevisions(revisions)
	// get the current, and update revisions
	currentRevision, updateRevision, collisionCount, err := r.getActiveRevisions(instance, revisions, clonesetutils.GetPodsRevisions(filteredPods))
	newStatus := appsv1alpha1.CloneSetStatus{
		ObservedGeneration: instance.Generation,
		CurrentRevision:    currentRevision.Name,
		UpdateRevision:     updateRevision.Name,
		CollisionCount:     new(int32),
		LabelSelector:      selector.String(),
	}
	*newStatus.CollisionCount = collisionCount
	// scale and update pods
	delayDuration, syncErr := r.syncCloneSet(instance, &newStatus, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)
	// update new status
	if err = r.statusUpdater.UpdateCloneSetStatus(instance, &newStatus, filteredPods); err != nil {
		return reconcile.Result{}, err
	}
	if err = r.truncatePodsToDelete(instance, filteredPods); err != nil {
		klog.Warningf("Failed to truncate podsToDelete for %s: %v", request, err)
	}
	if err = r.truncateHistory(instance, filteredPods, revisions, currentRevision, updateRevision); err != nil {
		klog.Errorf("Failed to truncate history for %s: %v", request, err)
	}
	...
	return reconcile.Result{RequeueAfter: delayDuration}, syncErr
}

扩缩容

syncCloneSet 根据 cloneSet 期望状态( 由replicas 以及updateStrategy描述 )以及pod的真实状态, 执行scale update 逻辑

  1. scale逻辑 对应 scale.Interface:
    1. 需要做扩容或缩容的时候(也就是pod 实际数量不等于 replicas时),scale 通过 删除或创建特定Revision的pod 使得 新旧Revision pod 的数量符合replicas/partition/MaxSurge/maxUnavailable 要求
    2. 如果 pod 实际数量等于 replicas,scale 并不会进行处理,本次syncCloneSet 主要执行 update 逻辑。
  2. update逻辑对应 update.Interface:找到不符合 updateRevision 的pod,根据 partition/MaxSurge/maxUnavailable 以及pod 的ready 情况,计算需要更新的pod 的数量needToUpdateCount,从排序好的 pod 中选取 needToUpdateCount 个pod 执行更新逻辑。PS:选择该删的删掉,之后创建
    1. 如果配置了原地升级策略, 原地升级pod
    2. 如果是默认ReCreate 策略,按序删除pod
// kruise/pkg/controller/cloneset/cloneset_controller.go
func (r *ReconcileCloneSet) syncCloneSet(
	instance *appsv1alpha1.CloneSet, newStatus *appsv1alpha1.CloneSetStatus,
	currentRevision, updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
	filteredPods []*v1.Pod, filteredPVCs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
	// 根据 ControllerRevision 还原CloneSet
	currentSet, err := r.revisionControl.ApplyRevision(instance, currentRevision)
    updateSet, err := r.revisionControl.ApplyRevision(instance, updateRevision)
	scaling, podsScaleErr = r.scaleControl.Manage(currentSet, updateSet, currentRevision.Name, updateRevision.Name, filteredPods, filteredPVCs)
	if scaling {
		return delayDuration, podsScaleErr
	}
	delayDuration, podsUpdateErr = r.updateControl.Manage(updateSet, updateRevision, revisions, filteredPods, filteredPVCs)
	return delayDuration, err
}

扩容逻辑:如果发布的时候设置了 maxSurge,控制器会先多扩出来 maxSurge 数量的 Pod(此时 Pod 总数为 (replicas+maxSurge)),然后再开始发布存量的 Pod。 然后,当新版本 Pod 数量已经满足 replicas - partition 要求之后,控制器会再把多余的 maxSurge 数量的 Pod 删除掉,保证最终的 Pod 数量符合 replicas。此外,maxSurge 还受 升级方式(type)的影响:maxSurge 不允许配合 InPlaceOnly 更新模式使用(可以认为此时maxSurge=0?)。 另外,如果是与 InPlaceIfPossible 策略配合使用,控制器会先扩出来 maxSurge 数量的 Pod,再对存量 Pod 做原地升级。

// kruise/pkg/controller/cloneset/scale/cloneset_scale.go
func (r *realControl) Manage(
	currentCS, updateCS *appsv1alpha1.CloneSet,
	currentRevision, updateRevision string,
	pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (bool, error) {
	// 获取 podsToDelete 中指定的pod
	if podsToDelete := getPodsToDelete(updateCS, pods); len(podsToDelete) > 0 {
		return r.deletePods(updateCS, podsToDelete, pvcs)
    }
    // 符合 updateRevision 的pod 为 updatedPods ,不符合的为notUpdatedPods
    updatedPods, notUpdatedPods := clonesetutils.SplitPodsByRevision(pods, updateRevision)
    // 一个CloneSet 最多允许(replicas + MaxSurge)个pod 存在,如果实际pod 小于这个数量(diff<0)则需要创建pod,否则(diff>0) 删除pod。 
    // diff 标记pod 总量是否ready;currentRevDiff 表示 currentRev 总量是否ready
	diff, currentRevDiff := calculateDiffs(updateCS, updateRevision == currentRevision, len(pods), len(notUpdatedPods))
	if diff < 0 {
		// total number of this creation
		expectedCreations := diff * -1
		// lack number of current version
		expectedCurrentCreations := 0
		if currentRevDiff < 0 {
			expectedCurrentCreations = currentRevDiff * -1
		}
		// available instance-id come from free pvc
        availableIDs := getOrGenAvailableIDs(expectedCreations, pods, pvcs)
        // pod 数量不足,就要创建expectedCreations 个pod,创建时需指明 currentRevision和updateRevision 的pod 分别创建几个
		return r.createPods(expectedCreations, expectedCurrentCreations,
			currentCS, updateCS, currentRevision, updateRevision, availableIDs.List(), existingPVCNames)
	} else if diff > 0 {
        // pod 数量较多 则选择 多余的 新老Revision pod 删除
		podsToDelete := choosePodsToDelete(diff, currentRevDiff, notUpdatedPods, updatedPods)
		return r.deletePods(updateCS, podsToDelete, pvcs)
    }
    // 如果diff = 0 什么都不做
	return false, nil
}

更新逻辑

// 处理 升级间隔,计算真正需要 更新的pod
// kruise/pkg/controller/cloneset/update/cloneset_update.go
func (c *realControl) Manage(cs *appsv1alpha1.CloneSet,
	updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
	pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
	// 1. find currently updated and not-ready count and all pods waiting to update
	var waitUpdateIndexes []int
	for i := range pods {
        // 支持 在pod 上设置一些注解,控制升级间隔,比如inplace-update-grace,如果距离上一次pod 的grace period 还未到,则先放弃本次升级
        // 如果pod 的revision 不是最新的revision ,则加入到waitUpdateIndexes
		if clonesetutils.GetPodRevision(pods[i]) != updateRevision.Name {
			waitUpdateIndexes = append(waitUpdateIndexes, i)
		}
	}
	// 2. sort all pods waiting to update  排序规则ActivePods.Less , 越新的 pod 越靠前
	// 3. 根据 replicas/partition/MaxSurge/maxUnavailable 以及pod Status(比如not ready)等 calculate max count of pods can update
	needToUpdateCount := calculateUpdateCount(coreControl, cs.Spec.UpdateStrategy, cs.Spec.MinReadySeconds, int(*cs.Spec.Replicas), waitUpdateIndexes, pods)
	if needToUpdateCount < len(waitUpdateIndexes) {
		waitUpdateIndexes = waitUpdateIndexes[:needToUpdateCount]
	}
	// 4. update pods
	for _, idx := range waitUpdateIndexes {
		pod := pods[idx]
		if duration, err := c.updatePod(cs, coreControl, updateRevision, revisions, pod, pvcs); err != nil {...}
	}
	return requeueDuration.Get(), nil
}
// 根据CloneSet 升级策略,执行升级逻辑
func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control,
	updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
	pod *v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
    // 如果clone set 升级策略为 (尽量)原地升级,则进入原地升级流程
	if cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType ||
		cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType {
		var oldRevision *apps.ControllerRevision
		for _, r := range revisions {
			if r.Name == clonesetutils.GetPodRevision(pod) {
				oldRevision = r
				break
			}
		}
		res := c.inplaceControl.Update(pod, oldRevision, updateRevision, coreControl.GetUpdateOptions())
        ...
        return ...
    }
    // 如果不是原地升级,则本次Reconcile 删除pod,待下次Reconcile 扩容时创建pod
    if err := c.Delete(context.TODO(), pod); err != nil {
		return 0, err
	}
	// handle pvc
	return 0, nil
}

虽然 spec.updateStrategy.partition 指定了旧版的数量。但 update 逻辑的主要目的是 更新 (replicas - partition) 个 updateRevision 实例。如果连续多次灰度发布,则旧版 可能存在多个 revision(也就是说不是最新的revision 都是旧版,旧版不都是某一个revision),整个cloneset 可能存在2个以上 revision的 pod。 这与直觉上的 多版本pod管理 还是不一样的

高级特性

原地升级

如何为 Kubernetes 实现原地升级? 如何在Kubernetes中实现容器原地升级一个Pod中可能包含了主业务容器,还有不可剥离的依赖业务容器,以及SideCar组件容器等,如果因为要更新其中一个SideCar Container而继续按照ReCreate Pod的方式进行整个Pod的重建,那负担还是很大的。更新一个轻量的SideCar却导致了分钟级的单个Pod的重建过程,因此,我们迫切希望能实现,只升级Pod中的某个Container,而不用重建整个Pod,这样可以节省调度、网络分配(Pod还使用原有IP)、分配挂载远程盘(Pod使用原有PV)、拉取镜像(新镜像只需下载很少的几层layer)耗时。

原地升级核心原理:每个Node上的kubelet 会针对本机上所有Pod.spec.containers中的每个container计算一个hash值,并记录到实际创建的容器中。如果我们修改了Pod中某个container的image字段,kubectl会发现container的hash发生了变化,与机器上过去创建的容器hash不一致,而后kubelet就会把旧容器停掉,然后根据最新Pod spec中的container 来创建新的容器。

Kubernetes把容器原地升级的能力只做在Kubelet这一层,并没有暴露在Deployment、StatefulSet等Controller中直接提供给用户,原因很简单,还是建议大家把Pod作为完整的部署单元。为了实现容器原地升级,我们更改Pod.Spec中对应容器的Image,就会生成kubetypes.UPDATE类型的事件,kubelet 将容器优雅终止。旧的容器被杀死之后,kubelet启动新的容器,如此即完成Pod不重建的前提下实现容器的原地升级。

不过这样可能存在的几个风险:

  1. 容器 升级时 有一段时间服务不可用,但k8s 组件 无法感知,这用到了 readinessGatesYour application can inject extra feedback or signals into PodStatus: Pod readiness. To use this, set readinessGates in the Pod’s spec to specify a list of additional conditions that the kubelet evaluates for Pod readiness.Readiness gates are determined by the current state of status.condition fields for the Pod. If Kubernetes cannot find such a condition in the status.conditions field of a Pod, the status of the condition is defaulted to “False”. 当一个 Pod 被原地升级时,控制器会先利用 readinessGates 把 Pod status 中修改为 not-ready 状态,然后再更新 Pod spec 中的 image 字段来触发 Kubelet 重建对应的容器。
  2. 有时候 Kubelet 重建容器太快,还没等到其他控制器如 endpoints-controller 感知到 Pod not-ready,可能会导致流量受损。因此又在原地升级中提供了 graceful period 选项,作为优雅原地升级的策略。用户如果配置了 gracePeriodSeconds 这个字段,控制器在原地升级的过程中会先把 Pod status 改为 not-ready,然后等一段时间(gracePeriodSeconds),最后再去修改 Pod spec 中的镜像版本。 这样,就为 endpoints-controller 这些控制器留出了充足的时间来将 Pod 从 endpoints 端点列表中去除。
// kruise/pkg/util/inplaceupdate/inplace_utils.go
func (c *realControl) Update(pod *v1.Pod, oldRevision, newRevision *apps.ControllerRevision, opts *UpdateOptions) UpdateResult {
	// 1. calculate inplace update spec 
	var spec *UpdateSpec
	if opts == nil || opts.CustomizeSpecCalculate == nil {
        //  只对更新了 spec.containers[x].image 的pod 进行原地升级
		spec = calculateInPlaceUpdateSpec(oldRevision, newRevision)
	}
	if opts != nil && opts.GracePeriodSeconds > 0 {
		spec.GraceSeconds = opts.GracePeriodSeconds
	}
	// 2. update condition for pod with readiness-gate
	if containsReadinessGate(pod) {
		newCondition := v1.PodCondition{
			Type:               appsv1alpha1.InPlaceUpdateReady,
			LastTransitionTime: c.now(),
			Status:             v1.ConditionFalse,
			Reason:             "StartInPlaceUpdate",
		}
		if err := c.updateCondition(pod, newCondition); err != nil {
			return UpdateResult{InPlaceUpdate: true, UpdateErr: err}
		}
	}
	// 3. update container images
	if err := c.updatePodInPlace(pod, spec, opts); err != nil {
		return UpdateResult{InPlaceUpdate: true, UpdateErr: err}
	}
    ...
}
func (c *realControl) updatePodInPlace(pod *v1.Pod, spec *UpdateSpec, opts *UpdateOptions) error {
	return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
		clone, err := c.adp.getPod(pod.Namespace, pod.Name)
		// update new revision   给pod 加上revision label
		if c.revisionKey != "" {
			clone.Labels[c.revisionKey] = spec.Revision
		}
		// record old containerStatuses
		inPlaceUpdateStateJSON, _ := json.Marshal(inPlaceUpdateState)
		clone.Annotations[appsv1alpha1.InPlaceUpdateStateKey] = string(inPlaceUpdateStateJSON)
		if spec.GraceSeconds <= 0 {
			if clone, err = patchUpdateSpecToPod(clone, spec, opts); err != nil {
				return err
			}
			delete(clone.Annotations, appsv1alpha1.InPlaceUpdateGraceKey)
		} else {
			inPlaceUpdateSpecJSON, _ := json.Marshal(spec)
			clone.Annotations[appsv1alpha1.InPlaceUpdateGraceKey] = string(inPlaceUpdateSpecJSON)
        }
        // 使用k8s api 更新pod 
		return c.adp.updatePod(clone)
	})
}

计算待更新pod 的spec ,condition,container status 等数据, 加上revision label, inplace-update-grace annotation ,最终使用k8s api 更新pod 到k8s cluster

协程间同步状态

事件一直在产生 并由不同的协程处理, 如果一个协程正在对cloneset 做扩容操作,那么另一个协程需要等待一下,所以需要一个协程间的协调机制。

type ScaleExpectations interface {
	ExpectScale(controllerKey string, action ScaleAction, name string)
	ObserveScale(controllerKey string, action ScaleAction, name string)
	SatisfiedExpectations(controllerKey string) (bool, time.Duration, map[ScaleAction][]string)
	DeleteExpectations(controllerKey string)
	GetExpectations(controllerKey string) map[ScaleAction]sets.String
}
type realScaleExpectations struct {
	sync.Mutex
	// key: parent key, workload namespace/name
	controllerCache map[string]*realControllerScaleExpectations
}

一个协程进行某操作前先 ExpectScale,操作完成后再ObserveScale,另一个协程可以通过 SatisfiedExpectations 来检查 操作是否完成。

// github.com/openkruise/kruise/pkg/controller/cloneset/scale/cloneset_scale.go
func (r *realControl) createPods(...){
    // 先ExpectScale
    for _, p := range newPods {
		clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, p.Name)
		podsCreationChan <- p
	}
	// 创建pod 逻辑
	...
	// 完成后 ObserveScale
	for _, pod := range newPods {
		if _, ok := successPodNames.Load(pod.Name); !ok {
			clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, pod.Name)
		}
	}
}
// github.com/openkruise/kruise/pkg/controller/cloneset/cloneset_controller.go
func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcile.Result, retErr error) {
    ...
    // 另一个协程做检测
    if scaleSatisfied, unsatisfiedDuration, scaleDirtyPods := clonesetutils.ScaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied {
		if unsatisfiedDuration >= expectations.ExpectationTimeout {
			klog.Warningf("Expectation unsatisfied overtime for %v, scaleDirtyPods=%v, overtime=%v", request.String(), scaleDirtyPods, unsatisfiedDuration)
			return reconcile.Result{}, nil
		}
		klog.V(4).Infof("Not satisfied scale for %v, scaleDirtyPods=%v", request.String(), scaleDirtyPods)
		return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
	}
    ...   
}

通过操作pod 来影响cloneset的策略

apiVersion: apps.kruise.io/v1alpha1
kind: CloneSet
spec:
  # ...
  updateStrategy:
    priorityStrategy:
      weightPriority:
      - weight: 50
        matchSelector:
          matchLabels:
            test-key: foo
      - weight: 30
        matchSelector:
          matchLabels:
            test-key: bar

在操作cloneset 发布之前,为pod 打上label,则test-key=foo 会比test-key= bar的pod 先升级。

CloneSet管理的Pod有以下状态 • Normal:正常状态 • PreparingUpdate: 准备原地升级 • Updating: 原地升级中 • Updated:原地升级完成 • PreparingDelete:准备删除

apiVersion: apps.kruise.io/v1alpha1
kind: CloneSet
spec:
  # 通过 finalizer 定义 hook
  lifecycle:
    preDelete:
      finalizersHandler:
      - example.io/unready-blocker
    inPlaceUpdate:
      finalizersHandler:
      - example.io/unready-blocker
  # 或者也可以通过 label 定义
  lifecycle:
    inPlaceUpdate:
      labelsHandler:
        example.io/block-unready: "true"

如果定义了 lifecycle hook /preDelete,cloneset先只将 Pod 状态改为 PreparingDelete,当开发移除 label/finalizer后,kruise 才执行 Pod 删除,否则会直接删除pod。PS:也就是说,如果没有定义lifecycle hook /preDelete,pod 是没有PreparingDelete 状态的。