Argo CD 创建 Application 时的逻辑分析

在 Argo CD 里面,同时提供了 HTTP 接口(Web UI)、gRPC 接口(Command CLI),但是只用到了一个端口,并且实现逻辑的代码是同一份代码,是怎么做到的呢?创建应用时都做了哪些操作逻辑?

请求跟踪

当在 Argo CD 的 Web UI 页面中,创建一个 Application 时,会发出一个 HTTP 请求,详情如下:

请求详情

翻开 Argo CD 的源代码,跟踪到服务的创建。由于是 HTTP 请求,所以入口模块为:argocd-server,即入口处为:cmd/argocd-server/commands/argocd_server.goNewCommand() 函数。可以看出其中启动一个 server 的代码在:

  • 路径:cmd/argocd-server/commands/argocd_server.go 的 154 行 argocd.Run(...)

Run() 方法中主要的代码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建 gRPC 服务
grpcS := a.newGRPCServer()
// gRPC 服务被包装成了 Web 服务
grpcWebS := grpcweb.WrapServer(grpcS)
var httpS *http.Server
var httpsS *http.Server
if a.useTLS() {
httpS = newRedirectServer(port, a.RootPath)
// 竟然传递了 gRPC 包装后的 Web 服务
httpsS = a.newHTTPServer(ctx, port, grpcWebS)
} else {
httpS = a.newHTTPServer(ctx, port, grpcWebS)
}

// ...

// Cmux is used to support servicing gRPC and HTTP1.1+JSON on the same port
tcpm := cmux.New(conn)

//...

// 启动 gRPC 和 HTTP 服务
go func() { a.checkServeErr("grpcS", grpcS.Serve(grpcL)) }()
go func() { a.checkServeErr("httpS", httpS.Serve(httpL)) }()
if a.useTLS() {
go func() { a.checkServeErr("httpsS", httpsS.Serve(httpsL)) }()
go func() { a.checkServeErr("tlsm", tlsm.Serve()) }()
}

这一块代码看着有点凌乱,但是从零零总总的信息中抽离出下列信息:

  • gRPC 和 HTTP 共用了一个端口
  • 没有找到 HTTP 接口的定义处
  • 看起来 gRPC 与 HTTP 服务之间存在一个映射关系

gRPC 与 HTTP 服务之间的映射关系具体为什么样?怎么样才能搞清楚其中的关联关系? 这是当时的问题,怀着这个问题,去搜了一下相关的关键字。

搜索记录

经过一段时间的信息搜集,锁定了一个叫做 grpc-gateway 的一个项目,可以实现上面所列出的 3 个问题点。并且通过对该项目中 示例 proto 文件 的观察,初步锁定 gRPC 与 HTTP 接口之间的关联关系就在 proto 文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Echo service responds to incoming echo requests.
service EchoService {
// EchoBody method receives a simple message and returns it.
rpc EchoBody(SimpleMessage) returns (SimpleMessage) {
option (google.api.http) = {
post: "/v1/example/echo_body"
body: "*"
};
}
// EchoDelete method receives a simple message and returns it.
rpc EchoDelete(SimpleMessage) returns (SimpleMessage) {
option (google.api.http) = {
delete: "/v1/example/echo_delete"
};
}
// EchoPatch method receives a NonStandardUpdateRequest and returns it.
rpc EchoPatch(DynamicMessageUpdate) returns (DynamicMessageUpdate) {
option (google.api.http) = {
patch: "/v1/example/echo_patch"
body: "body"
};
}
}

因此只要找到相关 proto 的定义即可找到对应关系,也就找到了相应接口的实现代码。

grpc-gateway

项目地址:https://github.com/grpc-ecosystem/grpc-gateway

grpc-gateway 工作原理图

实现逻辑

在源码中搜索 *.proto 可以看到 application.proto,并且可以找到 /api/v1/applications 接口对应的 rpc 方法(路径为:server/application/application.proto):

1
2
3
4
5
6
7
8
9
10
11
12
message ApplicationCreateRequest {
required github.com.argoproj.argo_cd.v2.pkg.apis.application.v1alpha1.Application application = 1 [(gogoproto.nullable) = false];
optional bool upsert = 2;
optional bool validate = 3;
}
// Create creates an application
rpc Create (ApplicationCreateRequest) returns (github.com.argoproj.argo_cd.v2.pkg.apis.application.v1alpha1.Application) {
option (google.api.http) = {
post: "/api/v1/applications"
body: "application"
};
}

并且在同目录下找到相应的实现 application.go(路径为:server/application/application.go),其接口入参与返回值如下:

1
func (s *Server) Create(ctx context.Context, q *application.ApplicationCreateRequest) (*appv1.Application, error) {...}

入参和响应的结构都是 Application CRD,与 Web UI 中的请求一致。其逻辑大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 校验是否有权限操作
s.enf.EnforceErr(ctx.Value("claims"), rbacpolicy.ResourceApplications, rbacpolicy.ActionCreate, appRBACName(q.Application));

// ① 加锁 + 解锁
s.projectLock.RLock(q.Application.Spec.Project)
defer s.projectLock.RUnlock(q.Application.Spec.Project)

// 校验用户输入是否合法
err := s.validateAndNormalizeApp(ctx, &a, validate)

// 创建 Application CRD
created, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Create(ctx, &a, metav1.CreateOptions{})

// 产生 Event
s.logAppEvent(created, ctx, argo.EventReasonResourceCreated, "created application")

// ② 等待缓存刷新
s.waitSync(created)

① 加、解锁时是 RWLock,非分布式锁

② 等待缓存刷新时,是通过一个 死循环 + deadline 来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *Server) waitSync(app *appv1.Application) {
logCtx := log.WithField("application", app.Name)
deadline := time.Now().Add(informerSyncTimeout)
minVersion, err := strconv.Atoi(app.ResourceVersion)
if err != nil {
logCtx.Warnf("waitSync failed: could not parse resource version %s", app.ResourceVersion)
time.Sleep(50 * time.Millisecond) // sleep anyways
return
}
for {
if currApp, err := s.appLister.Get(app.Name); err == nil {
currVersion, err := strconv.Atoi(currApp.ResourceVersion)
if err == nil && currVersion >= minVersion {
return
}
}
if time.Now().After(deadline) {
break
}
time.Sleep(20 * time.Millisecond)
}
logCtx.Warnf("waitSync failed: timed out")
}

此时,kubernetes 中便存在了一个名为 first-app Application CRD。

Application CRD

Reference

Argo CD 创建 Application 时的逻辑分析

https://eucham.me/2021/06/17/2461a3728194.html

作者

遇寻

发布于

2021-06-17

更新于

2021-06-18

许可协议

评论