Argo CD Sync 同步操作逻辑梳理
sync 子命令
做的事情就两件:从 git repo 拉取 Manifest、然后执行 kubectl apply
。
入口:app.go
-> NewApplicationSyncCommand
执行命令:argocd app sync aaa
命令行客户端
先创建一个
client
1
acdClient := argocdclient.NewClientOrDie(clientOpts)
其中
client
的属性如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18type client struct {
ServerAddr string
PlainText bool
Insecure bool
CertPEMData []byte
ClientCert *tls.Certificate
AuthToken string
RefreshToken string
UserAgent string
GRPCWeb bool
GRPCWebRootPath string
Headers []string
proxyMutex *sync.Mutex
proxyListener net.Listener
proxyServer *grpc.Server
proxyUsersCount int
}再通过
acdClient
创建一个applicationpkg.ApplicationServiceClient
1
conn, appIf := acdClient.NewApplicationClientOrDie()
appIf
它包含一个 grpc 连接,如下:1
2
3type applicationServiceClient struct {
cc *grpc.ClientConn
}其中初始化
ApplicationServiceClient
之前,会先与 grpc 服务端建立连接,并将这个连接传递给到ApplicationServiceClient
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16func (c *client) NewApplicationClient() (io.Closer, applicationpkg.ApplicationServiceClient, error) {
conn, closer, err := c.newConn()
...
appIf := applicationpkg.NewApplicationServiceClient(conn)
return closer, appIf, nil
}
func (c *client) newConn() (*grpc.ClientConn, io.Closer, error) {
...
conn, e := grpc_util.BlockingDial(ctx, network, serverAddr, creds, dialOpts...)
return conn, argoio.NewCloser(...), e
}
func NewApplicationServiceClient(cc *grpc.ClientConn) ApplicationServiceClient {
return &applicationServiceClient{cc}
}设置发送请求的参数
applicationpkg.ApplicationSyncRequest
。主要包括两个方面参数的处理:selector。这个主要去筛选满足 selector 要求的 Application,并将 Application 的 Name 添加到待处理的 application 列表中。
selectedLabels。这个主要针对 Application 下面的各项资源。当 Application 下各项资源的 Label 中含有符合对应的 labels 时,会将该资源以 GVK 的形式追加在
resources
后。命令行发送 Sync 请求前、所有的参数如下:
1
2
3
4
5
6
7
8
9syncReq := applicationpkg.ApplicationSyncRequest{
Name: &appName,
DryRun: dryRun,
Revision: revision, // ""
Resources: selectedResources,// 由 resource 转换而来
Prune: prune,
Manifests: localObjsStrings, // []
Infos: getInfos(infos),
}
然后就是发送 gRPC 请求。由于 appIf 其实是
applicationServiceClient
的一个实例,因此可以从applicationServiceClient
的相关定义中,找到 Sync 的实现,如下:1
2
3
4
5
6
7
8
9
10_, err := appIf.Sync(ctx, &syncReq)
func (c *applicationServiceClient) Sync(ctx context.Context, in *ApplicationSyncRequest, opts ...grpc.CallOption) (*v1alpha1.Application, error) {
out := new(v1alpha1.Application)
err := c.cc.Invoke(ctx, "/application.ApplicationService/Sync", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
gRPC 服务端逻辑
服务端的逻辑从 gRPC 对 /application.ApplicationService/Sync
处理开始。其中对 /application.ApplicationService/Sync
的关联在 application.pb.go 的 _ApplicationService_Sync_Handler
方法中,如下:
1 | func _ApplicationService_Sync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { |
通过 gRPC 的注册过程,不难发现,最终 Sync()
的实现来自 server/application/application.go
的 Server
结构体,如下:
1 | func (s *Server) Sync(ctx context.Context, syncReq *application.ApplicationSyncRequest) (*appv1.Application, error) { |
可以看到当 gRPC server(在 pod argocd-server 中) 收到请求后,带过来的参数如下:
name:\"aaa\" revision:\"\" dryRun:false prune:false strategy:<hook:<syncStrategyApply:<force:false > > >
下面有几个比较关键的逻辑:
从 k8s 集群中获取名为
aaa
的 application CRD。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15appIf := s.appclientset.ArgoprojV1alpha1().Applications(s.ns)
a, err := appIf.Get(ctx, *syncReq.Name, metav1.GetOptions{})
// a
func (c *applications) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.Application, err error) {
result = &v1alpha1.Application{}
err = c.client.Get().
Namespace(c.ns).
Resource("applications").
Name(name).
VersionedParams(&options, scheme.ParameterCodec).
Do(ctx).
Into(result)
return
}获取 revision、displayRevision
1
revision, displayRevision, err := s.resolveRevision(ctx, a, syncReq)
由于 gRPC 请求携带的 revision 为空,所以这里会默认取 HEAD commit。如果是 revision 是 branch、tag,则会从 Git Repo 里面查询出其所对应的 commit id。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22ambiguousRevision := syncReq.Revision
if ambiguousRevision == "" {
ambiguousRevision = app.Spec.Source.TargetRevision
}
...
if git.IsCommitSHA(ambiguousRevision) {
// If it's already a commit SHA, then no need to look it up
return ambiguousRevision, ambiguousRevision, nil
}
repo, err := s.db.GetRepository(ctx, app.Spec.Source.RepoURL)
if err != nil {
return "", "", err
}
gitClient, err := git.NewClient(repo.Repo, repo.GetGitCreds(), repo.IsInsecure(), repo.IsLFSEnabled())
if err != nil {
return "", "", err
}
revision, err = gitClient.LsRemote(ambiguousRevision)
if err != nil {
return "", "", err
}
return revision, fmt.Sprintf("%s (%s)", ambiguousRevision, revision), nil得到目标 commit id 之后,开始组装请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15op := appv1.Operation{
Sync: &appv1.SyncOperation{
Revision: revision,
Prune: syncReq.Prune,
DryRun: syncReq.DryRun,
SyncOptions: syncOptions,
SyncStrategy: syncReq.Strategy,
Resources: syncReq.Resources,
Manifests: syncReq.Manifests,
},
InitiatedBy: appv1.OperationInitiator{Username: session.Username(ctx)},
Info: syncReq.Infos,
}
a, err = argo.SetAppOperation(appIf, *syncReq.Name, &op)请求发出来后、会开启一个死循环、来执行 Update 操作、直到操作成功,以此来解决冲突问题(?)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// SetAppOperation updates an application with the specified operation, retrying conflict errors
func SetAppOperation(appIf v1alpha1.ApplicationInterface, appName string, op *argoappv1.Operation) (*argoappv1.Application, error) {
for {
a, err := appIf.Get(context.Background(), appName, metav1.GetOptions{})
...
a.Operation = op
a.Status.OperationState = nil
a, err = appIf.Update(context.Background(), a, metav1.UpdateOptions{})
if op.Sync == nil {
return nil, status.Errorf(codes.InvalidArgument, "Operation unspecified")
}
if err == nil {
return a, nil
}
...
}
}其中
appIf.Get()
在前面已经介绍过了、就是从 k8s 集群中查找相应名称的 Application CRD,而appIf.Update()
顾名思义、与 Get 类似,只是对 k8s 集群做更新操作:1
2
3
4
5
6
7
8
9
10
11
12
13// Update takes the representation of a application and updates it. Returns the server's representation of the application, and an error, if there is any.
func (c *applications) Update(ctx context.Context, application *v1alpha1.Application, opts v1.UpdateOptions) (result *v1alpha1.Application, err error) {
result = &v1alpha1.Application{}
err = c.client.Put().
Namespace(c.ns).
Resource("applications").
Name(application.Name).
VersionedParams(&opts, scheme.ParameterCodec).
Body(application).
Do(ctx).
Into(result)
return
}而整个 Sync 步骤已经完成了,更新完 CRD 之后,后续所需要作出来的更新操作,应该放到了对应的 CRD controller 里面去。
以下是当 cli 工具执行命令时,argocd-server 所打印的日志,可以在一定程度上佐证上述分析:
1 | time="2021-04-17T10:03:38Z" level=info msg="finished unary call with code OK" grpc.code=OK grpc.method=Version grpc.service=version.VersionService grpc.start_time="2021-04-17T10:03:38Z" grpc.time_ms=0.737 span.kind=server system=grpc |
CRD Controller
入口:argocd_application_controller.go
-> NewCommand()
,主要步骤如下:
1 | ... |
首先,创建的 appController
包含两个 Informer:
- appInformer 监听
Application
CRD,事件根据不同类型按需送往appRefreshQueue
、appComparisonTypeRefreshQueue
、appOperationQueue
。 - projInformer 监听
AppProject
CRD,事件全部送往projectRefreshQueue
其中 appInformer
中所涉及到的三个 Queue 的区别如下:
appRefreshQueue
:key 格式为:namespace/name- AddFunc 添加事件
- UpdateFunc 添加事件(不一定)
- DeleteFunc 添加事件
appComparisonTypeRefreshQueue
:key 格式为:namespace/name/(int)- UpdateFunc 添加事件
appOperationQueue
:key 格式为:namespace/name- AddFunc 添加事件
- UpdateFunc 添加事件
其次,appController.Run()
做得内容比较直观,所做的内容可以分成两类。
其一是将 Informer 新开一个协程跑起来。
1 | go ctrl.appInformer.Run(ctx.Done()) |
其二是监听上面所涉及到的 4 个 Queue,当有新元素入队后、会立即调用相应的处理逻辑。
1 | for ctrl.processAppRefreshQueueItem() {} |
执行一次 argocd app sync aaa
后,controller pod 打印的日志如下:
1 | time="2021-04-17T15:53:04Z" level=info msg="updated 'aaa' operation (phase: Running)" |
根据上面的日志内容,可以大致推测 argocd 对更新 Application CRD 后的处理逻辑。
从日志的第一、二行可以看出,appOperationQueue
被新增了 key。即 processAppOperationQueueItem
从 Queue 中 Get 的操作,脱离堵塞状态,开始执行对 Queue 中事件的处理逻辑。获取到被更新的 Application 的 key(namespace/name)之后,先从 Indexer 中获取到完整的 Application 定义,再开始对它的处理。
1 | func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext bool) { |
在其后processRequestedAppOperation
的处理中、会先把 Application CRD 的状态以 PATCH 的方式设置成 RUNNING
。
1 | state = &appv1.OperationState{ |
然后开始同步状态
1 | ctrl.appStateManager.SyncAppState(app, state) |
再设置为同步之后的状态
1 | ctrl.setOperationState(app, state) |
SyncAppState
在这一步、基本上将同步所做的事情干完了。
GetAppProject
获取 AppProject CRD 用来作为 CompareAppState 的入参。
CompareAppState
对比 Git 仓库中 yaml 与 k8s 中在跑资源的 yaml。这里会返回一个 comparisonResult,其中包含 DiffResultList、ReconciliationResult。
调用 gRPC 接口获取指定 commit id 的 yaml,得到一个 targetObjs,内容为 []*unstructured.Unstructured 由 [] string 转换而来。
在获取到 Git 仓库最新的 yaml 后,会对 yaml 中的资源进行一次去重,重复的判定标准为:G/K/Namespace/Name
,去重之后,保证相同的标准的资源的数量为1。
1 | targetObjs, dedupConditions, err := |
在资源去重后、会根据过滤规则、再对资源进行一次过滤。
再从 k8s 集群(本地缓存)中获取当前应用的资源。得到一个 liveObjByKey,它是一个 Map,结构为:map[kube.ResourceKey]*unstructured.Unstructured
。
1 | liveObjByKey, err := m.liveStateCache.GetManagedLiveObjs(app, targetObjs) |
过滤掉liveObjByKey
中不允许出现的资源
Reconcile
产生了一个规律比较奇怪的数据结构、可能是后续使用到这个数据结构的时候、也会遵照某种规律。其实主要是用于 Diff,可以简单理解:对新增的资源,live 中没有,因此对比的时候它的值应该为 nil,而 target 中有具体值,这样就能显示出 git diff 中新增文件时的展示效果:一边有值、一边没有值。
- 将 liveObjByKey 中的 uid 相同的资源全部删除、只留1个。(前提是:targetObjs中也含有相同的资源)
- target 增加几个 nil 值,个数为 target 中存在、但是 live 里面不存在的资源个数。
- managedLiveObj 基本等同 liveObj、具体为 target 中有、但 live 中没有。
返回的数据结构:
1 | return ReconciliationResult{ |
Diff
这里有一个分支、但不论走哪个分支,都会调用 diff.Diff()
方法。
1 | if noCache || specChanged || revisionChanged || m.cache.GetAppManagedResources(app.Name, &cachedDiff) != nil { |
在 Diff() 方法中,存在 ThreeWayDiff 和 TwoWayDiff 这两种方式。看 label 来决定。最终结果得到一个类似于 git diff 的结果列表(每个资源有一个 Diff)。
sync.NewSyncContext
这个方法里面初始化了一个 syncContext 其中它的 resource 值是通过一个 roupResources(reconciliationResult)
方法获取到。它的处理逻辑如下:
1 | func groupResources(reconciliationResult ReconciliationResult) map[kubeutil.ResourceKey]reconciledResource { |
返回的是一个以 G/K/NS/NAME 为键,以 reconciledResource
为值的 Map,也就是说此 map 中同时含有修改前、修改后的 yaml 内容。
syncCtx.Sync()
获取到 task。task 中包含 syncTask 列表,包含 targetObj 与 liveObj。
对每个 task 执行 dry-run,来检查 yaml 是否存在错误。失败则直接退出。
启动所有任务
1
runState := sc.runTasks(tasks, false)
- 先执行 prene 任务(task.targetObj 为空),最终调用
kubectl delete
,并等待所有任务执行完成。 - 在执行应用最新的 yaml 前,会将部分符合要求的待更新资源(task.liveObj)先执行删除操作、在进行 apply。
部分符合要求的待更新资源:
1
2
3func (t *syncTask) deleteBeforeCreation() bool {
return t.liveObj != nil && t.pending() && t.hasHookDeletePolicy(common.HookDeletePolicyBeforeHookCreation)
}- 先执行 prene 任务(task.targetObj 为空),最终调用
Argo CD Sync 同步操作逻辑梳理