3 | Spinnaker 如何执行 pipeline

本篇文章记录以发布单的形式启动的一个流水线的执行过程,并对相关敏感信息做了删除,但不影响对整体流程的介绍。通过 POST 调用 gate 服务的 pipelines/v2/{application}/{pipeline}@POST("pipelines/v2/{application}/{

本篇文章记录以发布单的形式启动的一个流水线的执行过程,并对相关敏感信息做了删除,但不影响对整体流程的介绍。

通过 POST 调用 gate 服务的 pipelines/v2/{application}/{pipeline}

@POST("pipelines/v2/{application}/{pipeline}")
Call<Map<String, Object>> postPipeline(@Path("application") String application,
                                       @Path("pipeline") String pipeline,
                                       @Body TaskPipeline taskPipeline);

进入 gate

gate

请求:/v2/{application}/{pipelineNameOrId:.+}

位置:gate-web/src/main/groovy/com/netflix/spinnaker/gate/controllers/PipelineController.groovy

请求进入 gate 之后,会先校验当前用户是否有此流水线的执行权限,然后调用 echo 触发流水线。

@POST('/')
String postEvent(@Body Map event)

此时的参数 eventMap 为:

{
    "content": {
        "application": "artifacturlteam1",
        "pipelineNameOrId": "ccbae2c9-ba68-43f3-87ba-669b1bb27831",
        "trigger": {
            "buildNumber": "0",
            "type": "manual",
            "dryRun": false,
            "parameters": {},
            "user": "vskycorpltd",
            "artifacts": [
                {
                    "artifactAccount": "generic-repo::1",
                    "customKind": false,
                    "id": "94141c3b-1f0f-4674-99e1-cb138905ac25",
                    "name": "vskycorpltdcorp-generic.pkg.vskycorpltd.com/vsky/generic-repo/abc/efg.txt",
                    "parentType": "generic",
                    "pkgId": 2,
                    "projectId": 1,
                    "projectName": "vsky",
                    "reference": "",
                    "repoName": "generic-repo",
                    "type": "vskycorpltd_artifact/generic",
                    "uriName": "vsky"
                }
            ],
            "customName": "20201016-artifacturl-p1",
            "customDescription": "",
            "projectUrlName": "vsky",
            "vskycorpltdNickname": "vskycorpltd",
            "eventId": "2796229f-fbea-4da8-a8af-fd3de334f765",
            "executionId": "01EN7GSM1KA2XHV9SX956YGKCD"
        },
        "user": "vskycorpltd"
    },
    "details": {
        "type": "manual"
    }
}

进入 echo

echo

位置:echo-web/src/main/java/com/netflix/spinnaker/echo/history/HistoryController.java

在 echo 中,上面的参数被转换成一个 echoEvent,然后将此事件,交给事件监听者来处理,

@RequestMapping(value = "/", method = RequestMethod.POST)
public void saveHistory(@RequestBody Event event) {
  propagator.processEvent(event);
}

public void processEvent(Event event) {
  Observable.from(listeners)
      .map(
          listener ->
              AuthenticatedRequest.propagate(
                  () -> {
                    listener.processEvent(event);
                    return null;
                  }))
      .observeOn(scheduler)
      .subscribe(
          callable -> {
            try {
              callable.call();
            } catch (Exception e) {
              log.error("failed processing event: {}", event.content, e);
            }
          });
}

listeners 来在容器中所有的 EchoEventListener bean。

@Bean
public EventPropagator propagator() {
  EventPropagator instance = new EventPropagator();
  for (EchoEventListener e : context.getBeansOfType(EchoEventListener.class).values()) {
    instance.addListener(e);
  }
  return instance;
}

此处有 5 个监听者,他们的继承关系如下(观察者模式):

监听者继承关系

此处仅关注 TriggerEventListener,它将是触发流水线的入口。随后 TriggerEventListener 会将事件交给 所有注册进来的 triggerMonitors 处理。

public void processEvent(Event event) {
  for (TriggerMonitor triggerMonitor : triggerMonitors) {
    triggerMonitor.processEvent(event);
  }
}

每个 TriggerMonitor 包含一个 TriggerEventHandler,并且在初始化 TriggerEventListener 时,从容器中将所有的 TriggerEventHandler 都捞出来,用 TriggerMonitor 包裹一层,放入 triggerMonitors 中。

public TriggerEventListener(
    @NonNull PipelineCache pipelineCache,
    @NonNull PipelineInitiator pipelineInitiator,
    @NonNull Registry registry,
    @NonNull PipelinePostProcessorHandler pipelinePostProcessorHandler,
    @NonNull List<TriggerEventHandler<?>> eventHandlers) {
  this.triggerMonitors =
      eventHandlers.stream()
          .map(
              e ->
                  new TriggerMonitor<>(
                      pipelineCache,
                      pipelineInitiator,
                      registry,
                      pipelinePostProcessorHandler,
                      e))
          .collect(Collectors.toList());
}

但是并不是所有的 triggerMonitor 都处理这个事件,他们只负责处理各自能处理的事件。

public void processEvent(Event event) {
  validateEvent(event);
  if (eventHandler.handleEventType(event.getDetails().getType())) {
    recordMetrics();
    T triggerEvent = eventHandler.convertEvent(event);
    triggerMatchingPipelines(triggerEvent);
  }
}

因此有若干个 TriggerEventHandler ,他们分别对应若干触发方式,如下:

此处我们是以手动的方式启动的,在 Event 的 detail 中,type 记录为 manual。

并且判断一个 TriggerEventHandler 能否处理某个事件的方式为:子类覆盖 handleEventType() 方法,并将相应能处理的事件的类型与当前 event.detail.type 对比,以 DockerEventHandler 为例:

@Override
public boolean handleEventType(String eventType) {
  return eventType.equalsIgnoreCase(DockerEvent.TYPE);
}

public class DockerEvent extends TriggerEvent {
  public static final String TYPE = "DOCKER";
  // ...
}

因此,我们最终的入口便是 ManualEventHandler 。开始触发流水线。最终在 echo  PipelineInitiator

triggerPipelineImpl() 方法中,通过 triggerWithRetries() 实现对 orca 的 HTTP 调用。

orca

接口:/orchestrate

位置:OperationsController

经过一番折腾后,边开始执行 pipeline,并返回一个 id 给到 echo。其间有几个比较重要的步骤:

  1. 执行parseAndValidatePipeline(pipeline),解决制品问题。最终调用的是:ArtifactResolver.resolveArtifacts()
  2. 进行了一次 contextParameterProcessor.process() ,对 pipeline 中的表达式进行计算。
  3. 将 pipeline 信息转化成 Execution,然后再将其转化成一个 StartExecution 后,存入到一个 Queue 里面。
override fun start(execution: Execution) = queue.push(StartExecution(execution))
    
@JsonTypeName("startExecution")
data class StartExecution(
override val executionType: ExecutionType,
override val executionId: String,
override val application: String
) : Message(), ExecutionLevel {
constructor(source: Execution) :
  this(source.type, source.id, source.application)
}

其中,StartExecution 的值如下:

{
    "kind": "startExecution",
    "attributes": [],
    "ackTimeoutMs": null,
    "executionType": "PIPELINE",
    "executionId": "01ENCS32B8B8AP83FAHYTCCKHN",
    "application": "asgteam1"
}

发布&获取事件

Queue 的实现为 RedisQueue,它 push 的代码逻辑如下:

  override fun push(message: Message, delay: TemporalAmount) {
    pool.resource.use { redis ->
      redis.firstFingerprint(queueKey, message.fingerprint()).also { fingerprint ->
        if (fingerprint != null) {
          // ...
          redis.zadd(queueKey, score(delay), fingerprint, zAddParams().xx())
          fire(MessageDuplicate(message))
        } else {
          redis.queueMessage(message, delay)
          fire(MessagePushed(message))
        }
      }
    }
  }

先将 message 即 StartExecution 的摘要信息存入到一个有序集合中,集合的键为 orca.task.queue.queue,可以查看 redis 中相应的值,结果如下:

ZznQhe

然后通过 fire 函数,发布一个事件,这里应该是利用了 spring 的事件发布机制。

  @Bean
  fun queueEventPublisher(
    applicationEventPublisher: ApplicationEventPublisher
  ) = object : EventPublisher {
    override fun publishEvent(event: QueueEvent) {
      applicationEventPublisher.publishEvent(event)
    }
  }

ApplicationEventPublisher 是 spring-context 包下的一个接口。它是一个AnnotationConfigServletWebServerApplicationContext 的实例,且继承自AbstractApplicationContext ,它有一个  publishEvent() 方法。

获取消息

位置:com/netflix/spinnaker/q/QueueProcessor.kt

可以参考:https://spinnaker.io/guides/developer/service-overviews/orca/

简而言之:不同的消息类型,对应不同的 handler。例如:StartExecution 消息,对应 StartExecutionHandler。

StartExecutionHandler

initialStages.forEach { queue.push(StartStage(it)) }

fun Execution.initialStages() =
  stages
    .filter { it.isInitial() }

fun Stage.isInitial(): Boolean =
  requisiteStageRefIds.isEmpty()

往 Queue 中塞若干个 StartStage,这些 Stage 必须不依赖其他 stage (配置除外),这里是并行执行多个 Stage 的入口。

StartStageHandler

stage.plan() 函数执行逻辑:

private fun Stage.plan() {
  builder().let { builder ->
    // if we have a top level stage, ensure that context expressions are processed
    val mergedStage = if (this.parentStageId == null) this.withMergedContext() else this
    builder.addContextFlags(mergedStage)
    builder.buildTasks(mergedStage)
    builder.buildBeforeStages(mergedStage) { it: Stage ->
      repository.addStage(it.withMergedContext())
    }
  }
}
  • 根据 stage 的定义将 Task 加入到 stage 的 tasks 列表中。
fun StageDefinitionBuilder.buildTasks(stage: Stage) {
  buildTaskGraph(stage)
    .listIterator()
    .forEachWithMetadata { processTaskNode(stage, it) }
}

default @Nonnull TaskGraph buildTaskGraph(@Nonnull Stage stage) {
  Builder graphBuilder = Builder(FULL);
  taskGraph(stage, graphBuilder);
  return graphBuilder.build();
}

// 在 Stage 中自定义的 taskGraph()
@Override
void taskGraph(Stage stage, TaskNode.Builder builder) {
  if (isTopLevelStage(stage)) {
    builder
      .withTask("completeParallel", CompleteParallelBakeTask)
  } else {
    builder
      .withTask("createBake", CreateBakeTask)
      .withTask("monitorBake", MonitorBakeTask)
      .withTask("completedBake", CompletedBakeTask)
      .withTask("bindProducedArtifacts", BindProducedArtifactsTask)
  }
}

withTask() 方法将会将 taskName,taskImp类,封装到  TaskDefinition 中,并保存在 builder 的 graph 中。

public Builder withTask(
    String name, Class<? extends com.netflix.spinnaker.orca.Task> implementingClass) {
  graph.add(new TaskDefinition(name, implementingClass));
  return this;
}

然后通过 processTaskNode() 将所有的 task 保存到 stage 的 tasks 中。

private fun processTaskNode(
  stage: Stage,
  element: IteratorElement<TaskNode>,
  isSubGraph: Boolean = false
) {
  element.apply {
    when (value) {
      is TaskDefinition -> {
        val task = Task()
        task.id = (stage.tasks.size + 1).toString()
        task.name = value.name
        task.implementingClass = value.implementingClass.name
        if (isSubGraph) {
          task.isLoopStart = isFirst
          task.isLoopEnd = isLast
        } else {
          task.isStageStart = isFirst
          task.isStageEnd = isLast
        }
        stage.tasks.add(task)
      }
      is TaskGraph -> {
        // 递归执行...
      }
    }
  }
}
  • 让一个 Stage 包含两个 Stage

简单说就是在一个 Bake 阶段,似乎产生了两个 Bake,并且每个 Bake 的 task 还不一样,但他们都在一个 Bake 阶段里面,如下:

zn6xx5

它的实现只需要在自定义 Stage 中实现 buildBeforeStage() 方法,并在这个方法中,在 graph 中,添加一个新的 Stage,它的本质是为 Bake A 阶段,添加了一个前置阶段 Bake in ap-beijing

@Override
void beforeStages(@Nonnull Stage parent, @Nonnull StageGraphBuilder graph) {
  if (isTopLevelStage(parent)) {
    parallelContexts(parent)
      .collect({ context ->
        newStage(parent.execution, type, "Bake in ${context.region}", context, parent, STAGE_BEFORE)
      })
      .forEach({Stage s -> graph.add(s) })
  }
}

在搞定整个 Stage 的构造之后,开始 stage.start(),此处,会将上面创建的两个 Stage 分开运行。

private fun Stage.start() {
  val beforeStages = firstBeforeStages()
  if (beforeStages.isEmpty()) {
    val task = firstTask()
    if (task == null) {
      // TODO: after stages are no longer planned at this point. We could skip this
      val afterStages = firstAfterStages()
      if (afterStages.isEmpty()) {
        queue.push(CompleteStage(this))
      } else {
        afterStages.forEach {
          queue.push(StartStage(it))
        }
      }
    } else {
      queue.push(StartTask(this, task.id))
    }
  } else {
    beforeStages.forEach {
      queue.push(StartStage(it))
    }
  }
}

第一步的 val beforeStages = firstBeforeStages() 会先获取到创建的 Bake in ap-beijing 阶段,并通过 queue.push(StartStage(it)) 先执行 Bake in ap-beijing 。所以,只执行完 queue.push(StartStage(it)) ,代码的处理逻辑,又会进入 StartStageHandler中,如下:

2PjQP0

接着通过 queue.push(StartTask(this, task.id)),开始执行第一个 Task,即 createBake。

V66UMR

StartTaskHandler

message.withTask { stage, task ->
  task.status = RUNNING
  task.startTime = clock.millis()
  val mergedContextStage = stage.withMergedContext()
  repository.storeStage(mergedContextStage)

  queue.push(RunTask(message, task.id, task.type))

  publisher.publishEvent(TaskStarted(this, mergedContextStage, task))
}

这里记录了 task 的开始时间、并保存到数据库中,同时通过 queue.push(RunTask(message, task.id, task.type)) 开启 task。

这里的 message.withTask 里面的套了好几层逻辑,与后续的 RunTask 有类似之处。

RunTaskHandler

在 handle() 方法中,通过解析,最终得到了实际的 task,即 CreateBakeTask。

vKZN8f

CreateBakeTask

在此 Task 中,主要是对 bake 所需参数进行拼凑,主要步骤如下:

根据 account 获取 secret_id 和 secret_key。

根据 pipeline 中的设置,整理出 packer 使用的配置。

调用 rosco,并等待请求返回。

返回 task 执行成功。

NLtcbu

task 执行完成

当 CreateBakeTask 返回的状态为成功时,后续会触发 CompleteTask 事件,也就是对应 CompleteTaskHandler。

wDuUiF

CompleteTaskHandler

当一个 task 结束时,会判断 task 所属 stage 是否结束、是否已被手动跳过、已经进行触发下个 task。

mergedContextStage.nextTask(task).let {
  if (it == null) {
    queue.push(NoDownstreamTasks(message))
  } else {
    queue.push(StartTask(message, it.id))
  }
}

其中查找下一个 task 的逻辑只是将下标 +1

fun Stage.nextTask(task: Task): Task? =
  if (task.isStageEnd) {
    null
  } else {
    val index = tasks.indexOf(task)
    tasks[index + 1]
  }

接着,便开始了一个新的循环,分别开始 MonitorBakeTask、CompletedBakeTask、BindProducedArtifactsTask。每个 task 都会像 CreateBakeTask 一样,经历下面的 3 个阶段:

  • StartTaskHandler
  • RunTaskHandler
  • CompleteTaskHandler

当最后一个 task 执行完了之后,仍然会到达 CompleteTaskHandler 的 handle() 方法,此时会触发 CompleteStage。

CompleteStageHandler

主要逻辑分两个,一个是对 afterStage 的处理,一个是对 nextStage 的处理。

  1. 与前面的 beforeStage 对应,还有一个 afterStage,逻辑应该与 beforeStage 类似,当执行完一个 stage 之后,再执行它的 afterStage。
  2. stage.startNext()

获取接下来的 stages 的逻辑:并发执行所有接下来的 stage

fun Stage.startNext() {
  execution.let { execution ->
    val downstreamStages = downstreamStages()
    val phase = syntheticStageOwner
    if (downstreamStages.isNotEmpty()) {
      downstreamStages.forEach {
        queue.push(StartStage(it))
      }
    } else if (phase != null) {
      queue.ensure(ContinueParentStage(parent(), phase), Duration.ZERO)
    } else {
      queue.push(CompleteExecution(execution))
    }
  }
}

如何确定接下来的 stages

根据当前 stage 的 refId,到 pipeline 中去遍历所有的 stage,只要该 stage 的 requisiteStageRefIds 里面有当前 stage 的 refId,那么它就将要被执行。

@JsonIgnore
public List<Stage> downstreamStages() {
  return getExecution().getStages().stream()
      .filter(it -> it.getRequisiteStageRefIds().contains(getRefId()))
      .collect(toList());
}

Read more

容器镜像(4):镜像的常用工具箱

容器镜像(4):镜像的常用工具箱

前几篇在讲多架构镜像时已经用过 skopeo 和 crane 做镜像复制,这篇系统整理这两个工具的完整能力,同时介绍几个日常操作镜像时同样好用的工具。 一、skopeo:不依赖 Daemon 的镜像瑞士军刀 skopeo 的核心价值是绕过 Docker daemon,直接与 Registry API 交互。上一篇用它做镜像复制和离线传输,但它的能力远不止于此。 1.1 安装 # Ubuntu / Debian sudo apt install -y skopeo skopeo --version # skopeo version 1.15.1 1.2 inspect:免拉取检查镜像元数据 docker inspect 需要先把镜像拉到本地,skopeo inspect 直接向 Registry

容器镜像(3):多架构镜像构建

容器镜像(3):多架构镜像构建

一、什么是多架构镜像 1.1 OCI Image Index 上一篇介绍了单平台镜像的结构:一个 Manifest 指向 Config 和若干 Layer blob。多架构镜像在此之上多了一层——OCI Image Index(也叫 Manifest List),是一个轻量的索引文件,把多个单平台 Manifest 组织在一起: $ docker manifest inspect golang:1.22-alpine { "schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests&

容器镜像(2):containerd 视角下的镜像

容器镜像(2):containerd 视角下的镜像

一、为什么需要了解 containerd 如果你只用 docker run 跑容器,从来不关心底层,那可以不了解 containerd。但如果你在用 Kubernetes,或者想真正理解"容器运行时"是什么,containerd 是绕不开的。 事实上,当你执行 docker run 的时候,containerd 早就在后台悄悄工作了——Docker 从 1.11 版本开始,就把核心运行时剥离出来交给 containerd 负责。 1.1 Docker 的架构演变 早期的 Docker(1.10 及之前)是一个"大一统"的单体程序:一个 dockerd