关于 kubectl apply 的流程分析
Server Side Apply资源读取Patch 计算与 APIServer 交互
本文主要介绍与kubectl apply相关的几个概念、相关操作的主要逻辑,主要包括Server Side Apply、Client Side Apply。
环境搭建
这里其实可以直接用 kubernetes 仓库,但它里面的内容太多,如果只用 kubectl 仓库 看起来会更简洁、纯粹。
- Kubernetes 集群
- 克隆
kubectl的源代码到本地用 GoLand 打开 git clone https://github.com/kubernetes/kubectl.git- 将程序入口拷贝进
kubectl中。 cd kubectlwget https://github.com/kubernetes/kubernetes/blob/master/cmd/kubectl/kubectl.go- 修改
doc.go中包名为main - 直接跑
kubectl.go中的main方法来从源码编译、启动kubectl命令。 - 添加如下命令参数:
apply -f testdata/apply/cm.yaml --field-manager=some-controller
Client Side Apply
入口在 pkg/cmd/apply/apply.go 中的 cmdutil.CheckErr(o.Run())。
① 资源读取
infos, err := o.GetObjects()
这个方法会将 kubectl apply 命令将要操作的对象(通常是本地的文件、文件夹等)加载到到内存中,保存在 info.Object 中,并返回 infos 列表。当把所有需要 apply 的对象读出来后,会通过 for 循环去依次做 apply 操作。
for _, info := range infos {
if err := o.applyOneObject(info); err != nil {
errs = append(errs, err)
}
}
所以,对每个资源的 apply 操作是分隔开来的,同时,处理的逻辑也进入到 o.applyOneObject(info) 中。
② 处理第一次 apply 操作
📢注意:此处的 info.Object 是刚从 kubectl client 端加载进来的内容。在从 k8s 集群中进行一次查询前,会先计算出一个 modified,它表示此次 apply 结束后,在目标集群中该资源的终态。
modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)
modified 的计算逻辑是先从 info.Object 中删除 kubectl.kubernetes.io/last-applied-configuration,然后将 info.Object 本身作为 kubectl.kubernetes.io/last-applied-configuration 的值塞进去。
func GetModifiedConfiguration(obj runtime.Object, annotate bool, codec runtime.Encoder) ([]byte, error) {
var modified []byte
annots, err := metadataAccessor.Annotations(obj)
original := annots[v1.LastAppliedConfigAnnotation]
delete(annots, v1.LastAppliedConfigAnnotation)
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}
// 此时 modified 等于 info.Object 的 Annotation 减去 kubectl.kubernetes.io/last-applied-configuration
modified, err = runtime.Encode(codec, obj)
if annotate {
// 将此时的 modified 作为 kubectl.kubernetes.io/last-applied-configuration 塞进 Annotation 中
annots[v1.LastAppliedConfigAnnotation] = string(modified)
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}
// 重新生成 modified
modified, err = runtime.Encode(codec, obj)
if err != nil {
return nil, err
}
}
annots[v1.LastAppliedConfigAnnotation] = original
if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
return nil, err
}
return modified, nil
}
当第一次 apply 某个资源到 k8s 集群中时,会显式地执行创建逻辑。对于是否执行创建逻辑,则通过一次对该资源的查询操作来确定。
if err := info.Get(); err != nil {
if !errors.IsNotFound(err) {
return ...
}
if o.DryRunStrategy != cmdutil.DryRunClient {
// Then create the resource and skip the three-way merge
obj, err := helper.Create(info.Namespace, true, info.Object)
// ...
info.Refresh(obj, true)
}
// ...
return nil
}
如果资源存在,则不会进入该 if 语句,且能够获取到集群中该资源的定义,并覆盖 info.Object 字段。
📢注意:此时的 info.Object 就变成了资源在 k8s 集群中的定义。③ 客户端重试机制
当 apply 的资源在 k8s 集群中已存在时,会先创建 Patcher,然后调用 patcher.Patch() 方法来完成 patch 操作。其中该函数的入参注释如下:
current runtime.Object: 当前集群中该资源的定义。modified []byte: 参照第②步中的解释
// 初始化入参
helper := resource.NewHelper(info.Client, info.Mapping).
DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
WithFieldManager(o.FieldManager).
WithFieldValidation(o.ValidationDirective)
// 创建 patcher
patcher, err := newPatcher(o, info, helper)
// 执行 Patch 操作
patchBytes, patchedObject, err :=
patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut)
进入 patcher.Patch() 之后,是一个特别熟悉的 for 循环。目的是当遇到冲突时,(默认5次)进行 maxPatchRetry 重试。执行的主体函数是 p.patchSimple()。
patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut)
for i := 1; i <= p.Retries && errors.IsConflict(err); i++ {
// ...
current, getErr = p.Helper.Get(namespace, name)
patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut)
}
分析下 p.patchSimple() 的入参:
current -> obj: 当前集群中该资源的定义。modified: 参照第②步中的解释。
④ 计算 Patch
进入 patchSimple() 后,主要做的操作就是先计算 patch 的内容,这个内容是指客户端通过计算,得出来发生变更的字段。而计算的方式是 ThreeWayMerge。该函数的定义如下:
func CreateThreeWayMergePatch(original, modified, current []byte,
schema LookupPatchMeta,
overwrite bool,
fns ...mergepatch.PreconditionFunc) ([]byte, error) {}
前3个入参对应的含义如下(因为后面 patch 的计算,需要用到此3个入参,搞清楚他们的含义特别重要):
original: 当前集群中该资源的定义中 Annotations 中 kubectl.kubernetes.io/last-applied-configuration 对应的内容。
original, err := util.GetOriginalConfiguration(obj)
modified: 参照第②步中的解释。
current: 当前集群中该资源的定义。
current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
基于对上面 3 个入参的认知,开始进入 patch 计算的过程,其中的 ThreeWay 应该就是指通过上面的 3 个入参。先将 original,modified,current 三个变量转成 map[string]interface{}类型为 originalMap、modifiedMap 、currentMap,以便于后面计算:
originalMap := map[string]interface{}{}
if len(original) > 0 {
if err := json.Unmarshal(original, &originalMap); err != nil {
return nil, mergepatch.ErrBadJSONDoc
}
}
// ...
接下来会计算 3 组数据,分别为
deltaMap: current 与 modified 之间的差异(除去 deletion 操作)
deltaMapDiffOptions := DiffOptions{
IgnoreDeletions: true,
SetElementOrder: true,
}
deltaMap, err := diffMaps(currentMap, modifiedMap, schema, deltaMapDiffOptions)
deletionsMap: originalMap 与 modifiedMap 之间的 deletions 操作
deletionsMapDiffOptions := DiffOptions{
SetElementOrder: true,
IgnoreChangesAndAdditions: true,
}
deletionsMap, err := diffMaps(originalMap, modifiedMap, schema, deletionsMapDiffOptions)
patchMap: deltaMap 和 deletionsMap 执行 merge 操作后得到 patchMap
mergeOptions := MergeOptions{}
patchMap, err := mergeMap(deletionsMap, deltaMap, schema, mergeOptions)
最终返回 json.Marshal(patchMap)。至此计算 patch 结束。
❓为什么 patch 的计算方法是这样的呢?

⑤ 发送 PATCH 请求给 api-server
在 strategicpatch.CreateThreeWayMergePatch() 函数执行完后,得到 patch 的结果,然后调用 client 发送 PATCH 请求给到 api-server,如下:
// 此种情形得到的 apply 结果是 unchanged。
if string(patch) == "{}" {
return patch, obj, nil
}
// 调用 client 发送 PATCH 请求
patchedObj, err := p.Helper.Patch(namespace, name, patchType, patch, nil)
return patch, patchedObj, err
Server Side Apply
官网文档说明见链接。
启用 Server Side Apply 需要添加 --server-side 参数、以及通过 --field-manager=some-controller 来指定 manager(可选)。如下:
kubectl apply --server-side --field-manager=some-controller -f testdata/apply/cm.yaml
运行完成后,可以在获取该 ConfigMap 时,添加 --show-managed-fields 参数以展示 managedFields 字段。如下:
kubectl get cm test1 -o json --show-managed-fields
冲突
当本次 apply 操作中想要修改的值,已经被其它 manager 管理后,出现的一种状态。主要是为了避免字段被别的用户修改。