关于 kubectl apply 的流程分析

本文主要介绍与 kubectl apply 相关的几个概念、相关操作的主要逻辑,主要包括 Server Side ApplyClient Side Apply

环境搭建

这里其实可以直接用 kubernetes 仓库,但它里面的内容太多,如果只用 kubectl 仓库 看起来会更简洁、纯粹。

  • Kubernetes 集群
  • 克隆 kubectl 的源代码到本地用 GoLand 打开
    • git clone https://github.com/kubernetes/kubectl.git
  • 将程序入口拷贝进 kubectl 中。
    • cd kubectl
    • wget https://github.com/kubernetes/kubernetes/blob/master/cmd/kubectl/kubectl.go
  • 修改 doc.go 中包名为 main
  • 直接跑 kubectl.go 中的 main 方法来从源码编译、启动 kubectl 命令。
    • 添加如下命令参数:apply -f testdata/apply/cm.yaml --field-manager=some-controller

Client Side Apply

入口在 pkg/cmd/apply/apply.go 中的 cmdutil.CheckErr(o.Run())

① 资源读取

1
infos, err := o.GetObjects()

这个方法会将 kubectl apply 命令将要操作的对象(通常是本地的文件、文件夹等)加载到到内存中,保存在 info.Object 中,并返回 infos 列表。当把所有需要 apply 的对象读出来后,会通过 for 循环去依次做 apply 操作。

1
2
3
4
5
for _, info := range infos {
if err := o.applyOneObject(info); err != nil {
errs = append(errs, err)
}
}

所以,对每个资源的 apply 操作是分隔开来的,同时,处理的逻辑也进入到 o.applyOneObject(info) 中。

② 处理第一次 apply 操作

📢注意:此处的 info.Object 是刚从 kubectl client 端加载进来的内容。

在从 k8s 集群中进行一次查询前,会先计算出一个 modified,它表示此次 apply 结束后,在目标集群中该资源的终态。

1
modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)

modified 的计算逻辑是先从 info.Object 中删除 kubectl.kubernetes.io/last-applied-configuration,然后将 info.Object 本身作为 kubectl.kubernetes.io/last-applied-configuration 的值塞进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func GetModifiedConfiguration(obj runtime.Object, annotate bool, codec runtime.Encoder) ([]byte, error) {
var modified []byte

annots, err := metadataAccessor.Annotations(obj)

original := annots[v1.LastAppliedConfigAnnotation]
delete(annots, v1.LastAppliedConfigAnnotation)
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}

// 此时 modified 等于 info.Object 的 Annotation 减去 kubectl.kubernetes.io/last-applied-configuration
modified, err = runtime.Encode(codec, obj)

if annotate {
// 将此时的 modified 作为 kubectl.kubernetes.io/last-applied-configuration 塞进 Annotation 中
annots[v1.LastAppliedConfigAnnotation] = string(modified)
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}
// 重新生成 modified
modified, err = runtime.Encode(codec, obj)
if err != nil {
return nil, err
}
}

annots[v1.LastAppliedConfigAnnotation] = original
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}

return modified, nil
}

当第一次 apply 某个资源到 k8s 集群中时,会显式地执行创建逻辑。对于是否执行创建逻辑,则通过一次对该资源的查询操作来确定。

1
2
3
4
5
6
7
8
9
10
11
12
13
if err := info.Get(); err != nil {
if !errors.IsNotFound(err) {
return ...
}
if o.DryRunStrategy != cmdutil.DryRunClient {
// Then create the resource and skip the three-way merge
obj, err := helper.Create(info.Namespace, true, info.Object)
// ...
info.Refresh(obj, true)
}
// ...
return nil
}

如果资源存在,则不会进入该 if 语句,且能够获取到集群中该资源的定义,并覆盖 info.Object 字段。

📢注意:此时的 info.Object 就变成了资源在 k8s 集群中的定义。

③ 客户端重试机制

apply 的资源在 k8s 集群中已存在时,会先创建 Patcher,然后调用 patcher.Patch() 方法来完成 patch 操作。其中该函数的入参注释如下:

  • current runtime.Object: 当前集群中该资源的定义。
  • modified []byte: 参照第②步中的解释
1
2
3
4
5
6
7
8
9
10
11
12
13
// 初始化入参
helper := resource.NewHelper(info.Client, info.Mapping).
DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
WithFieldManager(o.FieldManager).
WithFieldValidation(o.ValidationDirective)

// 创建 patcher
patcher, err := newPatcher(o, info, helper)

// 执行 Patch 操作
patchBytes, patchedObject, err :=
patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut)

进入 patcher.Patch() 之后,是一个特别熟悉的 for 循环。目的是当遇到冲突时,(默认5次)进行 maxPatchRetry 重试。执行的主体函数是 p.patchSimple()

1
2
3
4
5
6
patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut)
for i := 1; i <= p.Retries && errors.IsConflict(err); i++ {
// ...
current, getErr = p.Helper.Get(namespace, name)
patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut)
}

分析下 p.patchSimple() 的入参:

  • current -> obj : 当前集群中该资源的定义。
  • modified: 参照第②步中的解释。

④ 计算 Patch

进入 patchSimple() 后,主要做的操作就是先计算 patch 的内容,这个内容是指客户端通过计算,得出来发生变更的字段。而计算的方式是 ThreeWayMerge。该函数的定义如下:

1
2
3
4
func CreateThreeWayMergePatch(original, modified, current []byte, 
schema LookupPatchMeta,
overwrite bool,
fns ...mergepatch.PreconditionFunc) ([]byte, error) {}

前3个入参对应的含义如下(因为后面 patch 的计算,需要用到此3个入参,搞清楚他们的含义特别重要):

  • original: 当前集群中该资源的定义中 Annotations 中 kubectl.kubernetes.io/last-applied-configuration 对应的内容。

    1
    original, err := util.GetOriginalConfiguration(obj)
  • modified: 参照第②步中的解释。

  • current: 当前集群中该资源的定义。

    1
    current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)

基于对上面 3 个入参的认知,开始进入 patch 计算的过程,其中的 ThreeWay 应该就是指通过上面的 3 个入参。

先将 original,modified,current 三个变量转成 map[string]interface{}类型为 originalMapmodifiedMapcurrentMap,以便于后面计算:

1
2
3
4
5
6
7
originalMap := map[string]interface{}{}
if len(original) > 0 {
if err := json.Unmarshal(original, &originalMap); err != nil {
return nil, mergepatch.ErrBadJSONDoc
}
}
// ...

接下来会计算 3 组数据,分别为

  • deltaMap: currentmodified 之间的差异(除去 deletion 操作)

    1
    2
    3
    4
    5
    deltaMapDiffOptions := DiffOptions{
    IgnoreDeletions: true,
    SetElementOrder: true,
    }
    deltaMap, err := diffMaps(currentMap, modifiedMap, schema, deltaMapDiffOptions)
  • deletionsMap: originalMapmodifiedMap 之间的 deletions 操作

    1
    2
    3
    4
    5
    deletionsMapDiffOptions := DiffOptions{
    SetElementOrder: true,
    IgnoreChangesAndAdditions: true,
    }
    deletionsMap, err := diffMaps(originalMap, modifiedMap, schema, deletionsMapDiffOptions)
  • patchMap: deltaMapdeletionsMap 执行 merge 操作后得到 patchMap

    1
    2
    mergeOptions := MergeOptions{}
    patchMap, err := mergeMap(deletionsMap, deltaMap, schema, mergeOptions)

最终返回 json.Marshal(patchMap)。至此计算 patch 结束。

❓为什么 patch 的计算方法是这样的呢?

XLW8qA

⑤ 发送 PATCH 请求给 api-server

strategicpatch.CreateThreeWayMergePatch() 函数执行完后,得到 patch 的结果,然后调用 client 发送 PATCH 请求给到 api-server,如下:

1
2
3
4
5
6
7
8
9
// 此种情形得到的 apply 结果是 unchanged。
if string(patch) == "{}" {
return patch, obj, nil
}

// 调用 client 发送 PATCH 请求
patchedObj, err := p.Helper.Patch(namespace, name, patchType, patch, nil)

return patch, patchedObj, err

Server Side Apply

官网文档说明见链接

启用 Server Side Apply 需要添加 --server-side 参数、以及通过 --field-manager=some-controller 来指定 manager(可选)。如下:

1
kubectl apply --server-side --field-manager=some-controller -f testdata/apply/cm.yaml

运行完成后,可以在获取该 ConfigMap 时,添加 --show-managed-fields 参数以展示 managedFields 字段。如下:

1
kubectl get cm test1 -o json --show-managed-fields

冲突

当本次 apply 操作中想要修改的值,已经被其它 manager 管理后,出现的一种状态。主要是为了避免字段被别的用户修改。

关于 kubectl apply 的流程分析

https://eucham.me/2022/06/09/d092d4969a94.html

作者

遇寻

发布于

2022-06-09

更新于

2022-06-13

许可协议

评论