Kubernetes 中 apiserver 加载 schema 流程

在 apiserver 初始化、启动时,会加载所有能识别的 schema。加载的过程通过 import pkg 后,触发对应包下 init() 方法来加载。

入口

cmd/kube-apiserver/apiserver.go: 初始化 apiserver 命令行,并运行。

1
2
3
4
5
func main() {
command := app.NewAPIServerCommand()
code := cli.Run(command)
os.Exit(code)
}

app.NewAPIServerCommand() 中,有一长串 import,其中这两条 import 语句完成了 schema 的加载。

1
2
3
4
5
6
import (
...
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controlplane"
...
)

初始化

导入 k8s.io/kubernetes/pkg/api/legacyscheme 后,此包下只有一个文件,名为 scheme.go ,该文件中会初始化 3 个变量。其中 Scheme 中会存储 k8s 默认能识别的 scheme。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package legacyscheme

import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
)

var (
// Scheme is the default instance of runtime.Scheme to which types in the Kubernetes API are already registered.
// NOTE: If you are copying this file to start a new api group, STOP! Copy the
// extensions group instead. This Scheme is special and should appear ONLY in
// the api group, unless you really know what you're doing.
// TODO(lavalamp): make the above error impossible.
Scheme = runtime.NewScheme()

// Codecs provides access to encoding and decoding for the scheme
Codecs = serializer.NewCodecFactory(Scheme)

// ParameterCodec handles versioning of objects that are converted to query parameters.
ParameterCodec = runtime.NewParameterCodec(Scheme)
)

导入 k8s.io/kubernetes/pkg/controlplane 后,在该包下的 import_known_versions.go 文件中,导入了所有 apiserver 支持的 API groups。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package controlplane

import (
// These imports are the API groups the API server will support.
_ "k8s.io/kubernetes/pkg/apis/admission/install"
_ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
_ "k8s.io/kubernetes/pkg/apis/apiserverinternal/install"
_ "k8s.io/kubernetes/pkg/apis/apps/install"
_ "k8s.io/kubernetes/pkg/apis/authentication/install"
_ "k8s.io/kubernetes/pkg/apis/authorization/install"
_ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/certificates/install"
_ "k8s.io/kubernetes/pkg/apis/coordination/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
_ "k8s.io/kubernetes/pkg/apis/discovery/install"
_ "k8s.io/kubernetes/pkg/apis/events/install"
_ "k8s.io/kubernetes/pkg/apis/extensions/install"
_ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
_ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
_ "k8s.io/kubernetes/pkg/apis/networking/install"
_ "k8s.io/kubernetes/pkg/apis/node/install"
_ "k8s.io/kubernetes/pkg/apis/policy/install"
_ "k8s.io/kubernetes/pkg/apis/rbac/install"
_ "k8s.io/kubernetes/pkg/apis/scheduling/install"
_ "k8s.io/kubernetes/pkg/apis/storage/install"
)

它们 import 后的行为是基本一致的,以 k8s.io/kubernetes/pkg/apis/apps/install 为例。该包下只有一个文件,名叫 install.go,其中有一个 init() 方法,会在包导入时执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Package install installs the apps API group, making it available as
// an option to all of the API encoding/decoding machinery.
package install

import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/apps/v1"
"k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/apis/apps/v1beta2"
)

func init() {
Install(legacyscheme.Scheme)
}

// Install registers the API group and adds types to a scheme
func Install(scheme *runtime.Scheme) {
utilruntime.Must(apps.AddToScheme(scheme))
utilruntime.Must(v1beta1.AddToScheme(scheme))
utilruntime.Must(v1beta2.AddToScheme(scheme))
utilruntime.Must(v1.AddToScheme(scheme))
utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta2.SchemeGroupVersion, v1beta1.SchemeGroupVersion))
}

这里可以这样简单理解(以 utilruntime.Must(apps.AddToScheme(scheme)) 为例):

apps 这个 gv 下所有的资源,通过 apps 包中的 AddToScheme 方法,都注册到了 legacyscheme.Scheme 中,即前面初始化的 Scheme 变量中。这样 k8s 就能识别该 gv 下资源。

对于仅仅了解大致过程,上述的流程已经足够。但是如果点开 apps.AddToScheme(scheme) 方法,会有一种非常熟悉的感觉——在开发operator,通过k8s提供的脚本,来为 CRD 生成对应 yaml 时,会生成相应的代码。

那么,这一套模板到底是怎么将 Scheme 加载进 legacyscheme.Scheme 去的呢?

One more step forward

apps.AddToScheme() 方法在 pkg/apis/apps/register.go 文件中,它有 3 个变量加上一个方法,即表示这些资源(Kind)同属于这个 GV

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
var (
// SchemeBuilder stores functions to add things to a scheme.
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
// AddToScheme applies all stored functions t oa scheme.
AddToScheme = SchemeBuilder.AddToScheme
)

// GroupName is the group name use in this package
const GroupName = "apps"

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal}

// Adds the list of known types to the given scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
// TODO this will get cleaned up with the scheme types are fixed
scheme.AddKnownTypes(SchemeGroupVersion,
&DaemonSet{},
&DaemonSetList{},
&Deployment{},
&DeploymentList{},
&DeploymentRollback{},
&autoscaling.Scale{},
&StatefulSet{},
&StatefulSetList{},
&ControllerRevision{},
&ControllerRevisionList{},
&ReplicaSet{},
&ReplicaSetList{},
)
return nil
}

其中执行添加操作的方式是 scheme.AddKnownTypes(),对每种资源,将会在 legacyscheme.SchemegvkToTypetypeToGVK 添加对应的值,如下:

1
2
s.gvkToType[gvk] = t
s.typeToGVK[t] = append(s.typeToGVK[t], gvk)

方法 addKnownTypes(scheme *runtime.Scheme) 是如何绕了一大圈才执行的?

  1. 变量 SchemeBuilder 的类型是 []func(*Scheme) error ,也就是说它本身是一个数组,元素类型为一个函数。即:

    1
    2
    3
    4
    5
    // SchemeBuilder collects functions that add things to a scheme. It's to allow
    // code to compile without explicitly referencing generated types. You should
    // declare one in each package that will have generated deep copy or conversion
    // functions.
    type SchemeBuilder []func(*Scheme) error
  2. 变量 SchemeBuilder 通过 runtime.NewSchemeBuilder(addKnownTypes) 初始化之后,它(数组)含有一个元素,即 addKnownTypes() 方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // Register adds a scheme setup function to the list.
    func (sb *SchemeBuilder) Register(funcs ...func(*Scheme) error) {
    for _, f := range funcs {
    *sb = append(*sb, f)
    }
    }

    // NewSchemeBuilder calls Register for you.
    func NewSchemeBuilder(funcs ...func(*Scheme) error) SchemeBuilder {
    var sb SchemeBuilder
    sb.Register(funcs...)
    return sb
    }
  3. 当执行 utilruntime.Must(apps.AddToScheme(scheme)) 时,会执行 SchemeBuilder.AddToScheme 方法,这个方法中会依次取 SchemeBuilder 中的每个元素,然后执行该元素(方法),如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // AddToScheme applies all the stored functions to the scheme. A non-nil error
    // indicates that one function failed and the attempt was abandoned.
    func (sb *SchemeBuilder) AddToScheme(s *Scheme) error {
    for _, f := range *sb {
    if err := f(s); err != nil {
    return err
    }
    }
    return nil
    }

为什么要这样设计 SchemeBuilder

  • 核心思想是让执行加载 Scheme 的逻辑不用知道具体的类型是什么
  • SchemeBuilder 设计成数组的形式,可以提高加载时的灵活性?虽然绝大多数场景下,SchemeBuilder 只有一个元素,但是有一些场景下会有两个以上的元素。
  • SchemeBuilder 的 3 个方法 AddToScheme()Register()NewSchemeBuilder() 都是基于 SchemeBuilder 是数组形式时的应有做法,分别对应执行元素(方法)、添加(数组 append())、初始化(数组)。

Kubernetes 中 apiserver 加载 schema 流程

https://eucham.me/2022/07/14/7a73b54f4876.html

作者

遇寻

发布于

2022-07-14

更新于

2022-07-14

许可协议

评论