简介(未完成)
使用
apiVersion: core.oam.dev/v1beta1
kind: Application
metadata:
name: first-vela-app
spec:
components:
- name: express-server
type: webservice
properties:
image: oamdev/hello-world
ports:
- port: 8000
expose: true
traits:
- type: scaler
properties:
replicas: 1
policies:
- name: target-default
type: topology
properties:
# The cluster with name local is installed the KubeVela.
clusters: ["local"]
namespace: "default"
- name: target-prod
type: topology
properties:
clusters: ["local"]
# This namespace must be created before deploying.
namespace: "prod"
- name: deploy-ha
type: override
properties:
components:
- type: webservice
traits:
- type: scaler
properties:
replicas: 2
workflow:
steps:
- name: deploy2default
type: deploy
properties:
policies: ["target-default"]
- name: manual-approval
type: suspend
- name: deploy2prod
type: deploy
properties:
policies: ["target-prod", "deploy-ha"]
vela def list
查看涉及到的 Definition,对于webservice,kubectl get ComponentDefinition -n vela-system webservice -o yaml
或者 vela def get $defname
看下其内容
Definition 种类 | cue template内容 | 处理方式 | |
---|---|---|---|
webservice | ComponentDefinition | Deployment | 转为 Deployment并分发 |
scaler | TraitDefinition | template: { parameter: { // +usage=Specify the number of workload replicas: *1 | int } // +patchStrategy=retainKeys patch: spec: replicas: parameter.replicas } |
|
topology | PolicyDefinition | ||
override | PolicyDefinition | ||
deploy | WorkflowStepDefinition | ||
suspend | WorkflowStepDefinition |
源码结构
kubevela
/cmd
/core
/pkg
/controller
/standard.oam.dev
/v1alpha1
/rollout
/core.oam.dev
/v1alpha2
/application
/application_controller.go
/applicationconfiguration
/core
/components
/policies
/scopes
/traits
/workflow
启动
// kubevela/cmd/core/main.go
func main() {
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
...
}
oamv1alpha2.Setup(mgr, controllerArgs)
...
mgr.Start(ctrl.SetupSignalHandler())
}
// kubevela/pkg/controller/core.oam.dev/v1alpha2/setup.go
func Setup(mgr ctrl.Manager, args controller.Args) error {
switch args.OAMSpecVer {
case "all":
for _, setup := range []func(ctrl.Manager, controller.Args) error{
application.Setup,
traitdefinition.Setup,
componentdefinition.Setup,
policydefinition.Setup,
workflowstepdefinition.Setup,
applicationconfiguration.Setup,
} {
setup(mgr, args)
}
case "minimal":
case "v0.3":
case "v0.2":
}
xxDefinition 主要是管理 用户 定义的cue 模版,核心逻辑是 application 和 applicationconfiguration(在较新的版本去除了)。
引入workflow之前的处理流程
源码解读:KubeVela 是如何将 appfile 转换为 K8s 特定资源对象的
- 起点:Application
- 中点:ApplicationConfiguration, Component
- 终点:Deployment, Service
根据 Application 创建ApplicationConfiguration和Component
// kubevela/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go
// Reconcile process app event
func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
app := new(v1beta1.Application)
r.Get(ctx, client.ObjectKey{Name: req.Name,Namespace: req.Namespace,}, app)
handler := &appHandler{
r: r,
app: app,
logger: applog,
}
// parse template
appParser := appfile.NewApplicationParser(r.Client, r.dm, r.pd)
handler.appfile = generatedAppfile
// build template to applicationconfig & component
ac, comps, err := appParser.GenerateApplicationConfiguration(generatedAppfile, app.Namespace)
handler.apply(ctx, appRev, ac, comps)
return ctrl.Result{}, r.UpdateStatus(ctx, app)
}
Application 提供 component/trait 的名字 找到cue 模版,加上 Application 配置的properties 拼凑ApplicationConfiguration(没找到create 逻辑)和Component 并创建。
// kubevela/pkg/appfile/parser.go
// GenerateApplicationConfiguration converts an appFile to applicationConfig & Components
func (p *Parser) GenerateApplicationConfiguration(app *Appfile, ns string) (*v1alpha2.ApplicationConfiguration,
[]*v1alpha2.Component, error) {
appconfig := &v1alpha2.ApplicationConfiguration{}
var components []*v1alpha2.Component
for _, wl := range app.Workloads {
// 将 Application 转换为 ApplicationConfiguration 和 Component
comp, acComp, err = generateComponentFromCUEModule(p.client, wl, app.Name, app.RevisionName, ns)
components = append(components, comp)
appconfig.Spec.Components = append(appconfig.Spec.Components, *acComp)
}
return appconfig, components, nil
}
// kubevela/pkg/controller/core.oam.dev/v1alpha2/application/apply.go
// 在集群中创建 ApplicationConfiguration 和 Component
func (h *appHandler) apply(..., ac *v1alpha2.ApplicationConfiguration) error {
for _, comp := range comps {
newComp := comp.DeepCopy()
// newComp will be updated and return the revision name instead of the component name
revisionName, err := h.createOrUpdateComponent(ctx, newComp)
}
return nil
}
根据 ApplicationConfiguration和Component 创建workload
// kubevela/pkg/controller/core.oam.dev/v1alpha2/applicationconfiguration/applicationconfiguration.go
// Reconcile an OAM ApplicationConfigurations by rendering and instantiating its Components and Traits.
func (r *OAMApplicationReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
ac := &v1alpha2.ApplicationConfiguration{}
// 获取 ApplicationConfiguration
r.client.Get(ctx, req.NamespacedName, ac);
reconResult := r.ACReconcile(ctx, ac, log)
return reconResult, err
}
// ACReconcile contains all the reconcile logic of an AC, it can be used by other controller
func (r *OAMApplicationReconciler) ACReconcile(ctx context.Context, ac *v1alpha2.ApplicationConfiguration,log logging.Logger) (result reconcile.Result) {
// execute the prehooks
// 渲染
workloads, depStatus, err := r.components.Render(ctx, ac)
applyOpts := []apply.ApplyOption{apply.MustBeControllableBy(ac.GetUID()), applyOnceOnly(ac, r.applyOnceOnlyMode, log)}
// 创建 workload 和 traits 对应的 k8s 资源对象
r.workloads.Apply(ctx, ac.Status.Workloads, workloads, applyOpts...)
return reconcile.Result{RequeueAfter: waitTime}
}
渲染主要由 components 来完成,将 workload 形态从json 转换为 *unstructured.Unstructured,之后被部署到 k8s 。
// kubevela/pkg/controller/core.oam.dev/v1alpha2/applicationconfiguration/render.go
type components struct {
// indicate that if this is generated by application
client client.Reader
dm discoverymapper.DiscoveryMapper
params ParameterResolver
workload ResourceRenderer
trait ResourceRenderer
}
func (r *components) Render(..., ac *v1alpha2.ApplicationConfiguration) ([]Workload, *v1alpha2.DependencyStatus, error) {...}
func (r *components) renderComponent(..., ac *v1alpha2.ApplicationConfiguration, ) (*Workload, error) {...}
func (r *components) renderTrait(..., ac *v1alpha2.ApplicationConfiguration) (*unstructured.Unstructured, *v1alpha2.TraitDefinition, error) {
有的trait是对workload的 patch,有的trait对应的独立的crd。通过trait.spec.workloadRefPath 建立trait与workload 之间的关系。 当trait 是patch时,kubevela 负责将patch 和 workload 做merge 然后apply 到k8s。
workloads,... := components.Render(ctx context.Context, ac *v1alpha2.ApplicationConfiguration)
for _, acc := range ac.Spec.Components {
components.renderComponent
renderWorkload # 返回 unstructured.Unstructured
for _, ct := range acc.Traits {
components.renderTrait
renderTrait
}
components.renderScope
}
for i, acc := range ac.Spec.Components {
components.handleDependency(ctx, workloads[i], acc, dag, ac)
}
workloads.Apply(...,workloads,...)
for _, wl := range w {
ApplyInputRef
APIApplicator.Apply
ApplyOutputRef
for _, trait := range wl.Traits {
ApplyInputRef
APIApplicator.Apply # 当trait 本身就对应一个crd时, 会直接apply
ApplyOutputRef # 当trait 是patch 类型时,会在这里通过 workloadRefPath 拿到ref object,对其进行json merge,之后apply
}
for _, s := range wl.Scopes {
applyScope(ctx, wl, s, workloadRef)
}
}
APIApplicator.Apply 更多是在 服务端模拟了 kubeclt apply -f
的能力。
// kubevela/pkg/utils/apply/apply.go
// Apply applies new state to an object or create it if not exist
func (a *APIApplicator) Apply(ctx context.Context, desired runtime.Object, ao ...ApplyOption) error {
existing, err := a.createOrGetExisting(ctx, a.c, desired, ao...)
if existing == nil { // 说明是新建
return nil
}
// 如果是object已经存在,则发送patch
// threeWayMergePatch
patch, err := a.patcher.patch(existing, desired)
return errors.Wrapf(a.c.Patch(ctx, desired, patch), "cannot patch object")
}
引入workflow之后的处理流程
kubevela/workflow 是一个独立的工作流实现,有workflowInstance(对应一个工作流)/workflowStep/taskRunner(对应一个step)/executor等概念,将appFile 转为workflowInstance 即进入了workflow的工作流程。
// kubevela/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
app := new(v1beta1.Application)
r.Get(ctx, client.ObjectKey{Name: req.Name,Namespace: req.Namespace,}, app)
// 解析为appFile
appParser := appfile.NewApplicationParser(r.Client, r.dm, r.pd)
handler, err := NewAppHandler(logCtx, r, app, appParser)
appFile, err := appParser.GenerateAppFile(logCtx, app)
handler.PrepareCurrentAppRevision(logCtx, appFile)
handler.FinalizeAndApplyAppRevision(logCtx)
handler.ApplyPolicies(logCtx, appFile);
// appFile 转为 workflowInstance
workflowInstance, runners, err :=handler.GenerateApplicationSteps(logCtx, app, appParser, appFile, handler.currentAppRev)
// executor 执行 workflowInstance
executor := executor.New(workflowInstance, r.Client)
workflowState, err := executor.ExecuteRunners(authCtx, runners)
}
从 Application 解析为appFile,将部署计划拆分为 runners(一个runner即TaskRunner,包含一个Run方法) 由executor 驱动执行。
// kubevela/workflow/pkg/executor/workflow.go
func (w *workflowExecutor) ExecuteRunners(ctx monitorContext.Context, taskRunners []types.TaskRunner) (v1alpha1.WorkflowRunPhase, error) {
e := newEngine(ctx, wfCtx, w, status)
err = e.Run(ctx, taskRunners, dagMode)
}
// kubevela/workflow/pkg/executor/workflow.go
func (e *engine) Run(ctx monitorContext.Context, taskRunners []types.TaskRunner, dag bool) error {
if dag {
err = e.runAsDAG(ctx, taskRunners, false)
} else {
err = e.steps(ctx, taskRunners, dag)
}
return err
}
func (e *engine) steps(ctx monitorContext.Context, taskRunners []types.TaskRunner, dag bool) error {
...
for index, runner := range taskRunners {
...
status, operation, err := runner.Run(wfCtx, options)
...
}
return nil
}
引入workflow之前,主要由applicationconfiguration controller 负责创建workload 及资源下发。引入workflow 之后,workflow 接管了这个活儿。if you only use components in the Application and do not declare a workflow, KubeVela will automatically create a default workflow for deploying the components when running the Application.
Parser.GenerateAppFile(ctx context.Context, app *v1beta1.Application)
GenerateAppFileFromApp(ctx context.Context, app *v1beta1.Application)
appfile := Parser.newAppfile(appName, ns, app)
for _, comp := range app.Spec.Components {
wd, err := Parser.parseWorkload(ctx, comp)
...
}
Parser.parseWorkflowSteps(ctx, appfile)
Parser.loadWorkflowToAppfile(ctx, af)
af.WorkflowSteps, err = step.NewChainWorkflowStepGenerator(
&step.RefWorkflowStepGenerator{Client: af.WorkflowClient(p.client), Context: ctx},
&step.DeployWorkflowStepGenerator{},
&step.Deploy2EnvWorkflowStepGenerator{},
&step.ApplyComponentWorkflowStepGenerator{},
&step.DeployPreApproveWorkflowStepGenerator{},
).Generate(af.app, af.WorkflowSteps)
for _, workflowStep := range af.WorkflowSteps {
parseWorkflowStep(ctx, af, workflowStep.Type)
}
...
核心是理解 type=deploy的step,其对应的 WorkflowStepDefinition 定义如下
import (
"vela/op"
)
deploy: {
alias: ""
annotations: {}
attributes: {}
description: "A powerful and unified deploy step for components multi-cluster delivery with policies."
labels: {}
type: "workflow-step"
}
template: {
deploy: op.#Deploy & {
policies: parameter.policies
parallelism: parameter.parallelism
ignoreTerraformComponent: parameter.ignoreTerraformComponent
}
parameter: {
//+usage=If set to false, the workflow will suspend automatically before this step, default to be true.
auto: *true | bool
//+usage=Declare the policies that used for this deployment. If not specified, the components will be deployed to the hub cluster.
policies?: [...string]
//+usage=Maximum number of concurrent delivered components.
parallelism: *5 | int
//+usage=If set false, this step will apply the components with the terraform workload.
ignoreTerraformComponent: *true | bool
}
}