关于 kubectl apply 的流程分析

Server Side Apply资源读取Patch 计算与 APIServer 交互

本文主要介绍与 kubectl apply 相关的几个概念、相关操作的主要逻辑,主要包括 Server Side ApplyClient Side Apply

环境搭建

这里其实可以直接用 kubernetes 仓库,但它里面的内容太多,如果只用 kubectl 仓库 看起来会更简洁、纯粹。
  • Kubernetes 集群
  • 克隆 kubectl 的源代码到本地用 GoLand 打开
  • git clone https://github.com/kubernetes/kubectl.git
  • 将程序入口拷贝进 kubectl 中。
  • cd kubectl
  • wget https://github.com/kubernetes/kubernetes/blob/master/cmd/kubectl/kubectl.go
  • 修改 doc.go 中包名为 main
  • 直接跑 kubectl.go 中的 main 方法来从源码编译、启动 kubectl 命令。
  • 添加如下命令参数:apply -f testdata/apply/cm.yaml --field-manager=some-controller

Client Side Apply

入口在 pkg/cmd/apply/apply.go 中的 cmdutil.CheckErr(o.Run())

① 资源读取

infos, err := o.GetObjects()

这个方法会将 kubectl apply 命令将要操作的对象(通常是本地的文件、文件夹等)加载到到内存中,保存在 info.Object 中,并返回 infos 列表。当把所有需要 apply 的对象读出来后,会通过 for 循环去依次做 apply 操作。

for _, info := range infos {
	if err := o.applyOneObject(info); err != nil {
		errs = append(errs, err)
	}
}

所以,对每个资源的 apply 操作是分隔开来的,同时,处理的逻辑也进入到 o.applyOneObject(info) 中。

② 处理第一次 apply 操作

📢注意:此处的 info.Object 是刚从 kubectl client 端加载进来的内容。

在从 k8s 集群中进行一次查询前,会先计算出一个 modified,它表示此次 apply 结束后,在目标集群中该资源的终态。

modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)

modified 的计算逻辑是先从 info.Object 中删除 kubectl.kubernetes.io/last-applied-configuration,然后将 info.Object 本身作为 kubectl.kubernetes.io/last-applied-configuration 的值塞进去。

func GetModifiedConfiguration(obj runtime.Object, annotate bool, codec runtime.Encoder) ([]byte, error) {
	var modified []byte

	annots, err := metadataAccessor.Annotations(obj)

	original := annots[v1.LastAppliedConfigAnnotation]
	delete(annots, v1.LastAppliedConfigAnnotation)
	if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
		return nil, err
	}

  // 此时 modified 等于 info.Object 的 Annotation 减去 kubectl.kubernetes.io/last-applied-configuration
	modified, err = runtime.Encode(codec, obj)

	if annotate {
    // 将此时的 modified 作为 kubectl.kubernetes.io/last-applied-configuration 塞进 Annotation 中
		annots[v1.LastAppliedConfigAnnotation] = string(modified)
		if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
			return nil, err
		}
		// 重新生成 modified
		modified, err = runtime.Encode(codec, obj)
		if err != nil {
			return nil, err
		}
	}

	annots[v1.LastAppliedConfigAnnotation] = original
	if err := metadataAccessor.SetAnnotations(obj, annots); err != nil {
		return nil, err
	}

	return modified, nil
}

当第一次 apply 某个资源到 k8s 集群中时,会显式地执行创建逻辑。对于是否执行创建逻辑,则通过一次对该资源的查询操作来确定。

if err := info.Get(); err != nil {
	if !errors.IsNotFound(err) {
		return ...
	}
	if o.DryRunStrategy != cmdutil.DryRunClient {
		// Then create the resource and skip the three-way merge
		obj, err := helper.Create(info.Namespace, true, info.Object)
		// ...
		info.Refresh(obj, true)
	}
  // ...
  return nil
}

如果资源存在,则不会进入该 if 语句,且能够获取到集群中该资源的定义,并覆盖 info.Object 字段。

📢注意:此时的 info.Object 就变成了资源在 k8s 集群中的定义。

③ 客户端重试机制

apply 的资源在 k8s 集群中已存在时,会先创建 Patcher,然后调用 patcher.Patch() 方法来完成 patch 操作。其中该函数的入参注释如下:

  • current runtime.Object: 当前集群中该资源的定义。
  • modified []byte: 参照第②步中的解释
// 初始化入参
helper := resource.NewHelper(info.Client, info.Mapping).
		DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
		WithFieldManager(o.FieldManager).
		WithFieldValidation(o.ValidationDirective)

// 创建 patcher
patcher, err := newPatcher(o, info, helper)

// 执行 Patch 操作
patchBytes, patchedObject, err := 
	patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut)
		

进入 patcher.Patch() 之后,是一个特别熟悉的 for 循环。目的是当遇到冲突时,(默认5次)进行 maxPatchRetry 重试。执行的主体函数是 p.patchSimple()

patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut)
for i := 1; i <= p.Retries && errors.IsConflict(err); i++ {
	// ...
	current, getErr = p.Helper.Get(namespace, name)
	patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut)
}

分析下 p.patchSimple() 的入参:

  • current -> obj : 当前集群中该资源的定义。
  • modified: 参照第②步中的解释。

④ 计算 Patch

进入 patchSimple() 后,主要做的操作就是先计算 patch 的内容,这个内容是指客户端通过计算,得出来发生变更的字段。而计算的方式是 ThreeWayMerge。该函数的定义如下:

func CreateThreeWayMergePatch(original, modified, current []byte, 
	schema LookupPatchMeta, 
	overwrite bool, 
	fns ...mergepatch.PreconditionFunc) ([]byte, error) {}

前3个入参对应的含义如下(因为后面 patch 的计算,需要用到此3个入参,搞清楚他们的含义特别重要):

original: 当前集群中该资源的定义中 Annotations 中 kubectl.kubernetes.io/last-applied-configuration 对应的内容。

original, err := util.GetOriginalConfiguration(obj)

modified: 参照第②步中的解释。

current: 当前集群中该资源的定义。

current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
基于对上面 3 个入参的认知,开始进入 patch 计算的过程,其中的 ThreeWay 应该就是指通过上面的 3 个入参。

先将 original,modified,current 三个变量转成 map[string]interface{}类型为 originalMapmodifiedMapcurrentMap,以便于后面计算:

originalMap := map[string]interface{}{}
if len(original) > 0 {
  if err := json.Unmarshal(original, &originalMap); err != nil {
    return nil, mergepatch.ErrBadJSONDoc
  }
}
// ...

接下来会计算 3 组数据,分别为

deltaMap: currentmodified 之间的差异(除去 deletion 操作)

deltaMapDiffOptions := DiffOptions{
  IgnoreDeletions: true,
  SetElementOrder: true,
}
deltaMap, err := diffMaps(currentMap, modifiedMap, schema, deltaMapDiffOptions)

deletionsMap: originalMapmodifiedMap 之间的 deletions 操作

deletionsMapDiffOptions := DiffOptions{
  SetElementOrder:           true,
  IgnoreChangesAndAdditions: true,
}
deletionsMap, err := diffMaps(originalMap, modifiedMap, schema, deletionsMapDiffOptions)

patchMap: deltaMapdeletionsMap 执行 merge 操作后得到 patchMap

mergeOptions := MergeOptions{}
patchMap, err := mergeMap(deletionsMap, deltaMap, schema, mergeOptions)

最终返回 json.Marshal(patchMap)。至此计算 patch 结束。

❓为什么 patch 的计算方法是这样的呢?
XLW8qA

⑤ 发送 PATCH 请求给 api-server

strategicpatch.CreateThreeWayMergePatch() 函数执行完后,得到 patch 的结果,然后调用 client 发送 PATCH 请求给到 api-server,如下:

// 此种情形得到的 apply 结果是 unchanged。
if string(patch) == "{}" {
	return patch, obj, nil
}

// 调用 client 发送 PATCH 请求
patchedObj, err := p.Helper.Patch(namespace, name, patchType, patch, nil)

return patch, patchedObj, err

Server Side Apply

官网文档说明见链接

启用 Server Side Apply 需要添加 --server-side 参数、以及通过 --field-manager=some-controller 来指定 manager(可选)。如下:

kubectl apply --server-side --field-manager=some-controller -f testdata/apply/cm.yaml

运行完成后,可以在获取该 ConfigMap 时,添加 --show-managed-fields 参数以展示 managedFields 字段。如下:

kubectl get cm test1 -o json --show-managed-fields

冲突

当本次 apply 操作中想要修改的值,已经被其它 manager 管理后,出现的一种状态。主要是为了避免字段被别的用户修改。

Read more

容器镜像(4):镜像的常用工具箱

容器镜像(4):镜像的常用工具箱

前几篇在讲多架构镜像时已经用过 skopeo 和 crane 做镜像复制,这篇系统整理这两个工具的完整能力,同时介绍几个日常操作镜像时同样好用的工具。 一、skopeo:不依赖 Daemon 的镜像瑞士军刀 skopeo 的核心价值是绕过 Docker daemon,直接与 Registry API 交互。上一篇用它做镜像复制和离线传输,但它的能力远不止于此。 1.1 安装 # Ubuntu / Debian sudo apt install -y skopeo skopeo --version # skopeo version 1.15.1 1.2 inspect:免拉取检查镜像元数据 docker inspect 需要先把镜像拉到本地,skopeo inspect 直接向 Registry

容器镜像(3):多架构镜像构建

容器镜像(3):多架构镜像构建

一、什么是多架构镜像 1.1 OCI Image Index 上一篇介绍了单平台镜像的结构:一个 Manifest 指向 Config 和若干 Layer blob。多架构镜像在此之上多了一层——OCI Image Index(也叫 Manifest List),是一个轻量的索引文件,把多个单平台 Manifest 组织在一起: $ docker manifest inspect golang:1.22-alpine { "schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests&

容器镜像(2):containerd 视角下的镜像

容器镜像(2):containerd 视角下的镜像

一、为什么需要了解 containerd 如果你只用 docker run 跑容器,从来不关心底层,那可以不了解 containerd。但如果你在用 Kubernetes,或者想真正理解"容器运行时"是什么,containerd 是绕不开的。 事实上,当你执行 docker run 的时候,containerd 早就在后台悄悄工作了——Docker 从 1.11 版本开始,就把核心运行时剥离出来交给 containerd 负责。 1.1 Docker 的架构演变 早期的 Docker(1.10 及之前)是一个"大一统"的单体程序:一个 dockerd