Argo CD Sync 同步操作逻辑梳理

sync 子命令

做的事情就两件:从 git repo 拉取 Manifest、然后执行 kubectl apply

入口:app.go -> NewApplicationSyncCommand

执行命令:argocd app sync aaa

命令行客户端

  1. 先创建一个 client

    1
    acdClient := argocdclient.NewClientOrDie(clientOpts)

    其中 client 的属性如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    type client struct {
    ServerAddr string
    PlainText bool
    Insecure bool
    CertPEMData []byte
    ClientCert *tls.Certificate
    AuthToken string
    RefreshToken string
    UserAgent string
    GRPCWeb bool
    GRPCWebRootPath string
    Headers []string

    proxyMutex *sync.Mutex
    proxyListener net.Listener
    proxyServer *grpc.Server
    proxyUsersCount int
    }
  2. 再通过 acdClient 创建一个 applicationpkg.ApplicationServiceClient

    1
    conn, appIf := acdClient.NewApplicationClientOrDie()

    appIf 它包含一个 grpc 连接,如下:

    1
    2
    3
    type applicationServiceClient struct {
    cc *grpc.ClientConn
    }

    其中初始化 ApplicationServiceClient 之前,会先与 grpc 服务端建立连接,并将这个连接传递给到 ApplicationServiceClient

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    func (c *client) NewApplicationClient() (io.Closer, applicationpkg.ApplicationServiceClient, error) {
    conn, closer, err := c.newConn()
    ...
    appIf := applicationpkg.NewApplicationServiceClient(conn)
    return closer, appIf, nil
    }

    func (c *client) newConn() (*grpc.ClientConn, io.Closer, error) {
    ...
    conn, e := grpc_util.BlockingDial(ctx, network, serverAddr, creds, dialOpts...)
    return conn, argoio.NewCloser(...), e
    }

    func NewApplicationServiceClient(cc *grpc.ClientConn) ApplicationServiceClient {
    return &applicationServiceClient{cc}
    }
  3. 设置发送请求的参数 applicationpkg.ApplicationSyncRequest。主要包括两个方面参数的处理:

    1. selector。这个主要去筛选满足 selector 要求的 Application,并将 Application 的 Name 添加到待处理的 application 列表中。

    2. selectedLabels。这个主要针对 Application 下面的各项资源。当 Application 下各项资源的 Label 中含有符合对应的 labels 时,会将该资源以 GVK 的形式追加在 resources 后。

    3. 命令行发送 Sync 请求前、所有的参数如下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      syncReq := applicationpkg.ApplicationSyncRequest{
      Name: &appName,
      DryRun: dryRun,
      Revision: revision, // ""
      Resources: selectedResources,// 由 resource 转换而来
      Prune: prune,
      Manifests: localObjsStrings, // []
      Infos: getInfos(infos),
      }
  4. 然后就是发送 gRPC 请求。由于 appIf 其实是 applicationServiceClient 的一个实例,因此可以从 applicationServiceClient 的相关定义中,找到 Sync 的实现,如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    _, err := appIf.Sync(ctx, &syncReq)

    func (c *applicationServiceClient) Sync(ctx context.Context, in *ApplicationSyncRequest, opts ...grpc.CallOption) (*v1alpha1.Application, error) {
    out := new(v1alpha1.Application)
    err := c.cc.Invoke(ctx, "/application.ApplicationService/Sync", in, out, opts...)
    if err != nil {
    return nil, err
    }
    return out, nil
    }

gRPC 服务端逻辑

服务端的逻辑从 gRPC 对 /application.ApplicationService/Sync 处理开始。其中对 /application.ApplicationService/Sync 的关联在 application.pb.go 的 _ApplicationService_Sync_Handler 方法中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func _ApplicationService_Sync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ApplicationSyncRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApplicationServiceServer).Sync(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/application.ApplicationService/Sync",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApplicationServiceServer).Sync(ctx, req.(*ApplicationSyncRequest))
}
return interceptor(ctx, in, info, handler)
}

通过 gRPC 的注册过程,不难发现,最终 Sync() 的实现来自 server/application/application.goServer 结构体,如下:

1
2
3
func (s *Server) Sync(ctx context.Context, syncReq *application.ApplicationSyncRequest) (*appv1.Application, error) {
...
}

可以看到当 gRPC server(在 pod argocd-server 中) 收到请求后,带过来的参数如下:

name:\"aaa\" revision:\"\" dryRun:false prune:false strategy:<hook:<syncStrategyApply:<force:false > > >

下面有几个比较关键的逻辑:

  1. 从 k8s 集群中获取名为 aaa 的 application CRD。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    appIf := s.appclientset.ArgoprojV1alpha1().Applications(s.ns)
    a, err := appIf.Get(ctx, *syncReq.Name, metav1.GetOptions{})

    // a
    func (c *applications) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.Application, err error) {
    result = &v1alpha1.Application{}
    err = c.client.Get().
    Namespace(c.ns).
    Resource("applications").
    Name(name).
    VersionedParams(&options, scheme.ParameterCodec).
    Do(ctx).
    Into(result)
    return
    }
  2. 获取 revision、displayRevision

    1
    revision, displayRevision, err := s.resolveRevision(ctx, a, syncReq)

    由于 gRPC 请求携带的 revision 为空,所以这里会默认取 HEAD commit。如果是 revision 是 branch、tag,则会从 Git Repo 里面查询出其所对应的 commit id。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    ambiguousRevision := syncReq.Revision
    if ambiguousRevision == "" {
    ambiguousRevision = app.Spec.Source.TargetRevision
    }
    ...
    if git.IsCommitSHA(ambiguousRevision) {
    // If it's already a commit SHA, then no need to look it up
    return ambiguousRevision, ambiguousRevision, nil
    }
    repo, err := s.db.GetRepository(ctx, app.Spec.Source.RepoURL)
    if err != nil {
    return "", "", err
    }
    gitClient, err := git.NewClient(repo.Repo, repo.GetGitCreds(), repo.IsInsecure(), repo.IsLFSEnabled())
    if err != nil {
    return "", "", err
    }
    revision, err = gitClient.LsRemote(ambiguousRevision)
    if err != nil {
    return "", "", err
    }
    return revision, fmt.Sprintf("%s (%s)", ambiguousRevision, revision), nil
  3. 得到目标 commit id 之后,开始组装请求

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    op := appv1.Operation{
    Sync: &appv1.SyncOperation{
    Revision: revision,
    Prune: syncReq.Prune,
    DryRun: syncReq.DryRun,
    SyncOptions: syncOptions,
    SyncStrategy: syncReq.Strategy,
    Resources: syncReq.Resources,
    Manifests: syncReq.Manifests,
    },
    InitiatedBy: appv1.OperationInitiator{Username: session.Username(ctx)},
    Info: syncReq.Infos,
    }

    a, err = argo.SetAppOperation(appIf, *syncReq.Name, &op)

    请求发出来后、会开启一个死循环、来执行 Update 操作、直到操作成功,以此来解决冲突问题(?)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // SetAppOperation updates an application with the specified operation, retrying conflict errors
    func SetAppOperation(appIf v1alpha1.ApplicationInterface, appName string, op *argoappv1.Operation) (*argoappv1.Application, error) {
    for {
    a, err := appIf.Get(context.Background(), appName, metav1.GetOptions{})
    ...
    a.Operation = op
    a.Status.OperationState = nil
    a, err = appIf.Update(context.Background(), a, metav1.UpdateOptions{})
    if op.Sync == nil {
    return nil, status.Errorf(codes.InvalidArgument, "Operation unspecified")
    }
    if err == nil {
    return a, nil
    }
    ...
    }
    }

    其中 appIf.Get() 在前面已经介绍过了、就是从 k8s 集群中查找相应名称的 Application CRD,而 appIf.Update() 顾名思义、与 Get 类似,只是对 k8s 集群做更新操作:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // Update takes the representation of a application and updates it. Returns the server's representation of the application, and an error, if there is any.
    func (c *applications) Update(ctx context.Context, application *v1alpha1.Application, opts v1.UpdateOptions) (result *v1alpha1.Application, err error) {
    result = &v1alpha1.Application{}
    err = c.client.Put().
    Namespace(c.ns).
    Resource("applications").
    Name(application.Name).
    VersionedParams(&opts, scheme.ParameterCodec).
    Body(application).
    Do(ctx).
    Into(result)
    return
    }

    而整个 Sync 步骤已经完成了,更新完 CRD 之后,后续所需要作出来的更新操作,应该放到了对应的 CRD controller 里面去。

以下是当 cli 工具执行命令时,argocd-server 所打印的日志,可以在一定程度上佐证上述分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
time="2021-04-17T10:03:38Z" level=info msg="finished unary call with code OK" grpc.code=OK grpc.method=Version grpc.service=version.VersionService grpc.start_time="2021-04-17T10:03:38Z" grpc.time_ms=0.737 span.kind=server system=grpc
time="2021-04-17T10:03:38Z" level=info msg="received unary call /application.ApplicationService/Sync" grpc.method=Sync grpc.request.claims="{\"exp\":1618734626,\"iat\":1618648226,\"iss\":\"argocd\",\"jti\":\"18520eac-1579-47b7-b12d-cba5c24a6c23\",\"nbf\":1618648226,\"sub\":\"admin\"}" grpc.request.content="name:\"aaa\" revision:\"\" dryRun:false prune:false strategy:<hook:<syncStrategyApply:<force:false > > > " grpc.service=application.ApplicationService grpc.start_time="2021-04-17T10:03:38Z" span.kind=server system=grpc
time="2021-04-17T10:03:38Z" level=info msg="admin initiated sync to HEAD (5acebc82a613b73b46a09be1b023a1720ea3f7e9)" application=aaa dest-namespace=cc dest-server="https://kubernetes.default.svc" reason=OperationStarted type=Normal
time="2021-04-17T10:03:38Z" level=info msg="finished unary call with code OK" grpc.code=OK grpc.method=Sync grpc.service=application.ApplicationService grpc.start_time="2021-04-17T10:03:38Z" grpc.time_ms=268.642 span.kind=server system=grpc
time="2021-04-17T10:03:38Z" level=info msg="received unary call /application.ApplicationService/Get" grpc.method=Get grpc.request.claims="{\"exp\":1618734626,\"iat\":1618648226,\"iss\":\"argocd\",\"jti\":\"18520eac-1579-47b7-b12d-cba5c24a6c23\",\"nbf\":1618648226,\"sub\":\"admin\"}" grpc.request.content="name:\"aaa\" resourceVersion:\"\" selector:\"\" repo:\"\" " grpc.service=application.ApplicationService grpc.start_time="2021-04-17T10:03:38Z" span.kind=server system=grpc
time="2021-04-17T10:03:38Z" level=info msg="finished unary call with code OK" grpc.code=OK grpc.method=Get grpc.service=application.ApplicationService grpc.start_time="2021-04-17T10:03:38Z" grpc.time_ms=19.895 span.kind=server system=grpc
time="2021-04-17T10:03:38Z" level=info msg="received streaming call /application.ApplicationService/Watch" grpc.method=Watch grpc.request.claims="{\"exp\":1618734626,\"iat\":1618648226,\"iss\":\"argocd\",\"jti\":\"18520eac-1579-47b7-b12d-cba5c24a6c23\",\"nbf\":1618648226,\"sub\":\"admin\"}" grpc.request.content="name:\"aaa\" resourceVersion:\"39352\" selector:\"\" repo:\"\" " grpc.service=application.ApplicationService grpc.start_time="2021-04-17T10:03:38Z" span.kind=server system=grpc
time="2021-04-17T10:03:40Z" level=info msg="received unary call /application.ApplicationService/Get" grpc.method=Get grpc.request.claims="{\"exp\":1618734626,\"iat\":1618648226,\"iss\":\"argocd\",\"jti\":\"18520eac-1579-47b7-b12d-cba5c24a6c23\",\"nbf\":1618648226,\"sub\":\"admin\"}" grpc.request.content="name:\"aaa\" refresh:\"normal\" resourceVersion:\"\" selector:\"\" repo:\"\" " grpc.service=application.ApplicationService grpc.start_time="2021-04-17T10:03:40Z" span.kind=server system=grpc
time="2021-04-17T10:03:40Z" level=info msg="Requested app 'aaa' refresh"
time="2021-04-17T10:03:41Z" level=info msg="finished unary call with code OK" grpc.code=OK grpc.method=Get grpc.service=application.ApplicationService grpc.start_time="2021-04-17T10:03:40Z" grpc.time_ms=937.08 span.kind=server system=grpc
time="2021-04-17T10:03:41Z" level=info msg="received unary call /cluster.SettingsService/Get" grpc.method=Get grpc.request.claims="{\"exp\":1618734626,\"iat\":1618648226,\"iss\":\"argocd\",\"jti\":\"18520eac-1579-47b7-b12d-cba5c24a6c23\",\"nbf\":1618648226,\"sub\":\"admin\"}" grpc.request.content= grpc.service=cluster.SettingsService grpc.start_time="2021-04-17T10:03:41Z" span.kind=server system=grpc
time="2021-04-17T10:03:41Z" level=info msg="Ignore status for CustomResourceDefinitions"
time="2021-04-17T10:03:41Z" level=info msg="finished unary call with code OK" grpc.code=OK grpc.method=Get grpc.service=cluster.SettingsService grpc.start_time="2021-04-17T10:03:41Z" grpc.time_ms=2.212 span.kind=server system=grpc
time="2021-04-17T10:03:41Z" level=info msg="finished streaming call with code OK" grpc.code=OK grpc.method=Watch grpc.service=application.ApplicationService grpc.start_time="2021-04-17T10:03:38Z" grpc.time_ms=2771.24 span.kind=server system=grpc
time="2021-04-17T10:03:47Z" level=warning msg="Failed to resync revoked tokens. retrying again in 1 minute: dial tcp 10.103.83.103:6379: connect: connection refused"
time="2021-04-17T10:04:47Z" level=warning msg="Failed to resync revoked tokens. retrying again in 1 minute: dial tcp 10.103.83.103:6379: connect: connection refused"

CRD Controller

入口:argocd_application_controller.go -> NewCommand(),主要步骤如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
appController, err := controller.NewApplicationController(
namespace,
settingsMgr,
kubeClient,
appClient,
repoClientset,
cache,
kubectl,
resyncDuration,
time.Duration(selfHealTimeoutSeconds)*time.Second,
metricsPort,
metricsCacheExpiration,
kubectlParallelismLimit,
clusterFilter)
...
go appController.Run(ctx, statusProcessors, operationProcessors)

首先,创建的 appController 包含两个 Informer:

  1. appInformer 监听 Application CRD,事件根据不同类型按需送往 appRefreshQueueappComparisonTypeRefreshQueueappOperationQueue
  2. projInformer 监听 AppProject CRD,事件全部送往 projectRefreshQueue

其中 appInformer 中所涉及到的三个 Queue 的区别如下:

  1. appRefreshQueue:key 格式为:namespace/name
    1. AddFunc 添加事件
    2. UpdateFunc 添加事件(不一定)
    3. DeleteFunc 添加事件
  2. appComparisonTypeRefreshQueue:key 格式为:namespace/name/(int)
    1. UpdateFunc 添加事件
  3. appOperationQueue:key 格式为:namespace/name
    1. AddFunc 添加事件
    2. UpdateFunc 添加事件

其次,appController.Run()做得内容比较直观,所做的内容可以分成两类。

其一是将 Informer 新开一个协程跑起来。

1
2
go ctrl.appInformer.Run(ctx.Done())
go ctrl.projInformer.Run(ctx.Done())

其二是监听上面所涉及到的 4 个 Queue,当有新元素入队后、会立即调用相应的处理逻辑。

1
2
3
4
5
6
7
for ctrl.processAppRefreshQueueItem() {}
...
for ctrl.processAppOperationQueueItem() {}
...
for ctrl.processAppComparisonTypeQueueItem() {}
...
for ctrl.processProjectQueueItem() {}

执行一次 argocd app sync aaa 后,controller pod 打印的日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
time="2021-04-17T15:53:04Z" level=info msg="updated 'aaa' operation (phase: Running)"
time="2021-04-17T15:53:04Z" level=info msg="Initialized new operation: {&SyncOperation{Revision:5acebc82a613b73b46a09be1b023a1720ea3f7e9,Prune:false,DryRun:false,SyncStrategy:&SyncStrategy{Apply:nil,Hook:&SyncStrategyHook{SyncStrategyApply:SyncStrategyApply{Force:false,},},},Resources:[]SyncOperationResource{},Source:nil,Manifests:[],SyncOptions:[],} {admin false} [] {0 nil}}" application=aaa
time="2021-04-17T15:53:04Z" level=info msg="Ignore status for CustomResourceDefinitions"
time="2021-04-17T15:53:04Z" level=info msg="Comparing app state (cluster: https://kubernetes.default.svc, namespace: cc)" application=aaa
time="2021-04-17T15:53:05Z" level=info msg="getRepoObjs stats" application=aaa build_options_ms=0 helm_ms=0 plugins_ms=0 repo_ms=0 time_ms=560 unmarshal_ms=560 version_ms=0
time="2021-04-17T15:53:05Z" level=info msg="Ignore status for CustomResourceDefinitions"
time="2021-04-17T15:53:05Z" level=info msg=Syncing application=aaa skipHooks=false started=false syncId=00005-oqqyx
time="2021-04-17T15:53:05Z" level=info msg="sync/terminate complete" application=aaa duration=327.6219ms syncId=00005-oqqyx
time="2021-04-17T15:53:05Z" level=info msg="updated 'aaa' operation (phase: Failed)"
time="2021-04-17T15:53:05Z" level=info msg="Sync operation to 5acebc82a613b73b46a09be1b023a1720ea3f7e9 failed: one or more objects failed to apply" application=aaa dest-namespace=cc dest-server="https://kubernetes.default.svc" reason=OperationCompleted type=Warning
time="2021-04-17T15:53:05Z" level=info msg="Refreshing app status (controller refresh requested), level (2)" application=aaa
time="2021-04-17T15:53:05Z" level=info msg="Ignore status for CustomResourceDefinitions"
time="2021-04-17T15:53:05Z" level=info msg="Comparing app state (cluster: https://kubernetes.default.svc, namespace: cc)" application=aaa
time="2021-04-17T15:53:06Z" level=warning msg="Failed to save clusters info: dial tcp 10.103.83.103:6379: connect: connection refused"
time="2021-04-17T15:53:06Z" level=info msg="getRepoObjs stats" application=aaa build_options_ms=0 helm_ms=0 plugins_ms=0 repo_ms=0 time_ms=696 unmarshal_ms=695 version_ms=0
time="2021-04-17T15:53:06Z" level=error msg="Failed to cache app resources: dial tcp 10.103.83.103:6379: connect: connection refused" application=aaa dedup_ms=0 diff_ms=64 git_ms=696 health_ms=0 live_ms=0 settings_ms=0 sync_ms=0
time="2021-04-17T15:53:06Z" level=info msg="Update successful" application=aaa
time="2021-04-17T15:53:06Z" level=info msg="Reconciliation completed" application=aaa dedup_ms=0 dest-name= dest-namespace=cc dest-server="https://kubernetes.default.svc" diff_ms=64 fields.level=2 git_ms=696 health_ms=0 live_ms=0 settings_ms=0 sync_ms=0 time_ms=891
time="2021-04-17T15:53:06Z" level=info msg="Refreshing app status (normal refresh requested), level (2)" application=aaa
time="2021-04-17T15:53:06Z" level=info msg="Ignore status for CustomResourceDefinitions"
time="2021-04-17T15:53:06Z" level=info msg="Comparing app state (cluster: https://kubernetes.default.svc, namespace: cc)" application=aaa
time="2021-04-17T15:53:07Z" level=info msg="getRepoObjs stats" application=aaa build_options_ms=0 helm_ms=0 plugins_ms=0 repo_ms=0 time_ms=745 unmarshal_ms=744 version_ms=0
time="2021-04-17T15:53:07Z" level=error msg="Failed to cache app resources: dial tcp 10.103.83.103:6379: connect: connection refused" application=aaa dedup_ms=0 diff_ms=0 git_ms=745 health_ms=0 live_ms=0 settings_ms=0 sync_ms=0
time="2021-04-17T15:53:07Z" level=info msg="Update successful" application=aaa
time="2021-04-17T15:53:07Z" level=info msg="Reconciliation completed" application=aaa dedup_ms=0 dest-name= dest-namespace=cc dest-server="https://kubernetes.default.svc" diff_ms=0 fields.level=2 git_ms=745 health_ms=0 live_ms=0 settings_ms=0 sync_ms=0 time_ms=858

根据上面的日志内容,可以大致推测 argocd 对更新 Application CRD 后的处理逻辑。

从日志的第一、二行可以看出,appOperationQueue 被新增了 key。即 processAppOperationQueueItem 从 Queue 中 Get 的操作,脱离堵塞状态,开始执行对 Queue 中事件的处理逻辑。获取到被更新的 Application 的 key(namespace/name)之后,先从 Indexer 中获取到完整的 Application 定义,再开始对它的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext bool) {
appKey, shutdown := ctrl.appOperationQueue.Get()
...
obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey.(string))
...
origApp, ok := obj.(*appv1.Application)
...
app := origApp.DeepCopy()

if app.Operation != nil {
ctrl.processRequestedAppOperation(app)
} else if app.DeletionTimestamp != nil && app.CascadedDeletion() {
...
}
return
}

在其后processRequestedAppOperation的处理中、会先把 Application CRD 的状态以 PATCH 的方式设置成 RUNNING

1
2
3
4
5
6
7
state = &appv1.OperationState{
Phase: synccommon.OperationRunning,
Operation: *app.Operation,
StartedAt: metav1.Now()
}
ctrl.setOperationState(app, state)
logCtx.Infof("Initialized new operation: %v", *app.Operation)

然后开始同步状态

1
ctrl.appStateManager.SyncAppState(app, state)

再设置为同步之后的状态

1
ctrl.setOperationState(app, state)

SyncAppState

在这一步、基本上将同步所做的事情干完了。

GetAppProject

获取 AppProject CRD 用来作为 CompareAppState 的入参。

CompareAppState

对比 Git 仓库中 yaml 与 k8s 中在跑资源的 yaml。这里会返回一个 comparisonResult,其中包含 DiffResultList、ReconciliationResult。

调用 gRPC 接口获取指定 commit id 的 yaml,得到一个 targetObjs,内容为 []*unstructured.Unstructured 由 [] string 转换而来。

在获取到 Git 仓库最新的 yaml 后,会对 yaml 中的资源进行一次去重,重复的判定标准为:G/K/Namespace/Name,去重之后,保证相同的标准的资源的数量为1。

1
2
targetObjs, dedupConditions, err := 
DeduplicateTargetObjects(app.Spec.Destination.Namespace, targetObjs, infoProvider)

在资源去重后、会根据过滤规则、再对资源进行一次过滤。

再从 k8s 集群(本地缓存)中获取当前应用的资源。得到一个 liveObjByKey,它是一个 Map,结构为:map[kube.ResourceKey]*unstructured.Unstructured

1
liveObjByKey, err := m.liveStateCache.GetManagedLiveObjs(app, targetObjs)

过滤掉liveObjByKey中不允许出现的资源

Reconcile

产生了一个规律比较奇怪的数据结构、可能是后续使用到这个数据结构的时候、也会遵照某种规律。其实主要是用于 Diff,可以简单理解:对新增的资源,live 中没有,因此对比的时候它的值应该为 nil,而 target 中有具体值,这样就能显示出 git diff 中新增文件时的展示效果:一边有值、一边没有值。

  1. 将 liveObjByKey 中的 uid 相同的资源全部删除、只留1个。(前提是:targetObjs中也含有相同的资源)
  2. target 增加几个 nil 值,个数为 target 中存在、但是 live 里面不存在的资源个数。
  3. managedLiveObj 基本等同 liveObj、具体为 target 中有、但 live 中没有。

返回的数据结构:

1
2
3
4
5
return ReconciliationResult{
Target: targetObjs,
Hooks: hooks,
Live: managedLiveObj,
}
Diff

这里有一个分支、但不论走哪个分支,都会调用 diff.Diff() 方法。

1
2
3
4
5
6
if noCache || specChanged || revisionChanged || m.cache.GetAppManagedResources(app.Name, &cachedDiff) != nil {
// (rare) cache miss
diffResults, err = diff.DiffArray(reconciliation.Target, reconciliation.Live, diffOpts...)
} else {
diffResults, err = m.diffArrayCached(reconciliation.Target, reconciliation.Live, cachedDiff, diffOpts...)
}

在 Diff() 方法中,存在 ThreeWayDiff 和 TwoWayDiff 这两种方式。看 label 来决定。最终结果得到一个类似于 git diff 的结果列表(每个资源有一个 Diff)。

sync.NewSyncContext

这个方法里面初始化了一个 syncContext 其中它的 resource 值是通过一个 roupResources(reconciliationResult) 方法获取到。它的处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
func groupResources(reconciliationResult ReconciliationResult) map[kubeutil.ResourceKey]reconciledResource {
resources := make(map[kube.ResourceKey]reconciledResource)
for i := 0; i < len(reconciliationResult.Target); i++ {
res := reconciledResource{
Target: reconciliationResult.Target[i],
Live: reconciliationResult.Live[i],
}
...
resources[kube.GetResourceKey(obj)] = res
}
return resources
}

返回的是一个以 G/K/NS/NAME 为键,以 reconciledResource 为值的 Map,也就是说此 map 中同时含有修改前、修改后的 yaml 内容。

syncCtx.Sync()
  1. 获取到 task。task 中包含 syncTask 列表,包含 targetObj 与 liveObj。

  2. 对每个 task 执行 dry-run,来检查 yaml 是否存在错误。失败则直接退出。

  3. 启动所有任务

    1
    runState := sc.runTasks(tasks, false)
    1. 先执行 prene 任务(task.targetObj 为空),最终调用 kubectl delete,并等待所有任务执行完成。
    2. 在执行应用最新的 yaml 前,会将部分符合要求的待更新资源(task.liveObj)先执行删除操作、在进行 apply。

    部分符合要求的待更新资源:

    1
    2
    3
    func (t *syncTask) deleteBeforeCreation() bool {
    return t.liveObj != nil && t.pending() && t.hasHookDeletePolicy(common.HookDeletePolicyBeforeHookCreation)
    }

Argo CD Sync 同步操作逻辑梳理

https://eucham.me/2021/05/16/65dbfe09c7bd.html

作者

遇寻

发布于

2021-05-16

更新于

2021-05-17

许可协议

评论