本篇文章记录以发布单的形式启动的一个流水线的执行过程,并对相关敏感信息做了删除 ,但不影响对整体流程的介绍。
通过 POST 调用 gate 服务的 pipelines/v2/{application}/{pipeline}
1 2 3 4 @POST("pipelines/v2/{application}/{pipeline}") Call<Map<String, Object>> postPipeline (@Path("application") String application, @Path("pipeline") String pipeline, @Body TaskPipeline taskPipeline) ;
进入 gate
gate 请求:/v2/{application}/{pipelineNameOrId:.+}
位置:gate-web/src/main/groovy/com/netflix/spinnaker/gate/controllers/PipelineController.groovy
请求进入 gate 之后,会先校验当前用户是否有此流水线的执行权限,然后调用 echo 触发流水线。
1 2 @POST ('/' )String postEvent(@Body Map event)
此时的参数 eventMap 为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 { "content" : { "application" : "artifacturlteam1" , "pipelineNameOrId" : "ccbae2c9-ba68-43f3-87ba-669b1bb27831" , "trigger" : { "buildNumber" : "0" , "type" : "manual" , "dryRun" : false , "parameters" : { } , "user" : "vskycorpltd" , "artifacts" : [ { "artifactAccount" : "generic-repo::1" , "customKind" : false , "id" : "94141c3b-1f0f-4674-99e1-cb138905ac25" , "name" : "vskycorpltdcorp-generic.pkg.vskycorpltd.com/vsky/generic-repo/abc/efg.txt" , "parentType" : "generic" , "pkgId" : 2 , "projectId" : 1 , "projectName" : "vsky" , "reference" : "" , "repoName" : "generic-repo" , "type" : "vskycorpltd_artifact/generic" , "uriName" : "vsky" } ] , "customName" : "20201016-artifacturl-p1" , "customDescription" : "" , "projectUrlName" : "vsky" , "vskycorpltdNickname" : "vskycorpltd" , "eventId" : "2796229f-fbea-4da8-a8af-fd3de334f765" , "executionId" : "01EN7GSM1KA2XHV9SX956YGKCD" } , "user" : "vskycorpltd" } , "details" : { "type" : "manual" } }
进入 echo
echo 位置:echo-web/src/main/java/com/netflix/spinnaker/echo/history/HistoryController.java
在 echo 中,上面的参数被转换成一个 echoEvent,然后将此事件,交给事件监听者来处理,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @RequestMapping(value = "/", method = RequestMethod.POST) public void saveHistory (@RequestBody Event event) { propagator.processEvent(event); } public void processEvent (Event event) { Observable.from(listeners) .map( listener -> AuthenticatedRequest.propagate( () -> { listener.processEvent(event); return null ; })) .observeOn(scheduler) .subscribe( callable -> { try { callable.call(); } catch (Exception e) { log.error("failed processing event: {}" , event.content, e); } }); }
listeners 来在容器中所有的 EchoEventListener bean。
1 2 3 4 5 6 7 8 @Bean public EventPropagator propagator () { EventPropagator instance = new EventPropagator (); for (EchoEventListener e : context.getBeansOfType(EchoEventListener.class).values()) { instance.addListener(e); } return instance; }
此处有 5 个监听者,他们的继承关系如下(观察者模式):
此处仅关注 TriggerEventListener
,它将是触发流水线的入口。随后 TriggerEventListener
会将事件交给 所有注册进来的 triggerMonitors
处理。
1 2 3 4 5 public void processEvent (Event event) { for (TriggerMonitor triggerMonitor : triggerMonitors) { triggerMonitor.processEvent(event); } }
每个 TriggerMonitor
包含一个 TriggerEventHandler
,并且在初始化 TriggerEventListener
时,从容器中将所有的 TriggerEventHandler
都捞出来,用 TriggerMonitor
包裹一层,放入 triggerMonitors
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public TriggerEventListener ( @NonNull PipelineCache pipelineCache, @NonNull PipelineInitiator pipelineInitiator, @NonNull Registry registry, @NonNull PipelinePostProcessorHandler pipelinePostProcessorHandler, @NonNull List<TriggerEventHandler<?>> eventHandlers) { this .triggerMonitors = eventHandlers.stream() .map( e -> new TriggerMonitor <>( pipelineCache, pipelineInitiator, registry, pipelinePostProcessorHandler, e)) .collect(Collectors.toList()); }
但是 并不是所有的 triggerMonitor 都处理这个事件,他们只负责处理各自能处理的事件。
1 2 3 4 5 6 7 8 public void processEvent (Event event) { validateEvent(event); if (eventHandler.handleEventType(event.getDetails().getType())) { recordMetrics(); T triggerEvent = eventHandler.convertEvent(event); triggerMatchingPipelines(triggerEvent); } }
因此有若干个 TriggerEventHandler
,他们分别对应若干触发方式,如下:
此处我们是以手动的方式启动的,在 Event 的 detail 中,type 记录为 manual。
并且判断一个 TriggerEventHandler
能否处理某个事件的方式为:子类覆盖 handleEventType()
方法,并将相应能处理的事件的类型与当前 event.detail.type 对比,以 DockerEventHandler
为例:
1 2 3 4 5 6 7 8 9 @Override public boolean handleEventType (String eventType) { return eventType.equalsIgnoreCase(DockerEvent.TYPE); } public class DockerEvent extends TriggerEvent { public static final String TYPE = "DOCKER" ; }
因此,我们最终的入口便是 ManualEventHandler
。开始触发流水线。最终在 echo PipelineInitiator
的
triggerPipelineImpl()
方法中,通过 triggerWithRetries()
实现对 orca 的 HTTP 调用。
orca 接口:/orchestrate
位置:OperationsController
经过一番折腾后,边开始执行 pipeline,并返回一个 id 给到 echo。其间有几个比较重要的步骤:
执行parseAndValidatePipeline(pipeline)
,解决制品问题。最终调用的是:ArtifactResolver.resolveArtifacts()
。
进行了一次 contextParameterProcessor.process()
,对 pipeline 中的表达式进行计算。
将 pipeline 信息转化成 Execution
,然后再将其转化成一个 StartExecution
后,存入到一个 Queue 里面。
1 2 3 4 5 6 7 8 9 10 11 override fun start (execution: Execution ) = queue.push(StartExecution(execution)) @JsonTypeName("startExecution" ) data class StartExecution (override val executionType: ExecutionType,override val executionId: String,override val application: String) : Message(), ExecutionLevel { constructor (source: Execution) : this (source.type, source.id, source.application) }
其中,StartExecution 的值如下:
1 2 3 4 5 6 7 8 { "kind" : "startExecution" , "attributes" : [ ] , "ackTimeoutMs" : null , "executionType" : "PIPELINE" , "executionId" : "01ENCS32B8B8AP83FAHYTCCKHN" , "application" : "asgteam1" }
发布&获取事件 Queue 的实现为 RedisQueue,它 push 的代码逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 override fun push (message: Message , delay: TemporalAmount ) { pool.resource.use { redis -> redis.firstFingerprint(queueKey, message.fingerprint()).also { fingerprint -> if (fingerprint != null ) { redis.zadd(queueKey, score(delay), fingerprint, zAddParams().xx()) fire(MessageDuplicate(message)) } else { redis.queueMessage(message, delay) fire(MessagePushed(message)) } } } }
先将 message 即 StartExecution 的摘要信息存入到一个有序集合中,集合的键为 orca.task.queue.queue
,可以查看 redis 中相应的值,结果如下:
然后通过 fire 函数,发布一个事件,这里应该是利用了 spring 的事件发布机制。
1 2 3 4 5 6 7 8 @Bean fun queueEventPublisher ( applicationEventPublisher: ApplicationEventPublisher ) = object : EventPublisher { override fun publishEvent (event: QueueEvent ) { applicationEventPublisher.publishEvent(event) } }
ApplicationEventPublisher 是 spring-context 包下的一个接口。它是一个AnnotationConfigServletWebServerApplicationContext
的实例,且继承自AbstractApplicationContext
,它有一个 publishEvent()
方法。
获取消息
位置:com/netflix/spinnaker/q/QueueProcessor.kt
可以参考:https://spinnaker.io/guides/developer/service-overviews/orca/
简而言之:不同的消息类型,对应不同的 handler。例如:StartExecution 消息,对应 StartExecutionHandler。
StartExecutionHandler 1 2 3 4 5 6 7 8 initialStages.forEach { queue.push(StartStage(it)) } fun Execution.initialStages () = stages .filter { it.isInitial() } fun Stage.isInitial () : Boolean = requisiteStageRefIds.isEmpty()
往 Queue 中塞若干个 StartStage,这些 Stage 必须不依赖其他 stage (配置除外),这里是并行执行多个 Stage 的入口。
StartStageHandler stage.plan()
函数执行逻辑:
1 2 3 4 5 6 7 8 9 10 11 private fun Stage.plan () { builder().let { builder -> val mergedStage = if (this .parentStageId == null ) this .withMergedContext() else this builder.addContextFlags(mergedStage) builder.buildTasks(mergedStage) builder.buildBeforeStages(mergedStage) { it: Stage -> repository.addStage(it.withMergedContext()) } } }
根据 stage 的定义将 Task 加入到 stage 的 tasks 列表中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 fun StageDefinitionBuilder.buildTasks (stage: Stage ) { buildTaskGraph(stage) .listIterator() .forEachWithMetadata { processTaskNode(stage, it) } } default @Nonnull TaskGraph buildTaskGraph(@Nonnull Stage stage) { Builder graphBuilder = Builder(FULL); taskGraph(stage, graphBuilder); return graphBuilder.build(); } @Override void taskGraph(Stage stage, TaskNode.Builder builder) { if (isTopLevelStage(stage)) { builder .withTask("completeParallel" , CompleteParallelBakeTask) } else { builder .withTask("createBake" , CreateBakeTask) .withTask("monitorBake" , MonitorBakeTask) .withTask("completedBake" , CompletedBakeTask) .withTask("bindProducedArtifacts" , BindProducedArtifactsTask) } }
withTask() 方法将会将 taskName,taskImp类,封装到 TaskDefinition 中,并保存在 builder 的 graph 中。
1 2 3 4 5 public Builder withTask ( String name, Class<? extends com.netflix.spinnaker.orca.Task> implementingClass) { graph.add(new TaskDefinition (name, implementingClass)); return this ; }
然后通过 processTaskNode() 将所有的 task 保存到 stage 的 tasks 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private fun processTaskNode ( stage: Stage , element: IteratorElement <TaskNode >, isSubGraph: Boolean = false ) { element.apply { when (value) { is TaskDefinition -> { val task = Task() task.id = (stage.tasks.size + 1 ).toString() task.name = value.name task.implementingClass = value.implementingClass.name if (isSubGraph) { task.isLoopStart = isFirst task.isLoopEnd = isLast } else { task.isStageStart = isFirst task.isStageEnd = isLast } stage.tasks.add(task) } is TaskGraph -> { } } } }
简单说就是在一个 Bake 阶段,似乎产生了两个 Bake,并且每个 Bake 的 task 还不一样,但他们都在一个 Bake 阶段里面 ,如下:
它的实现只需要在自定义 Stage 中实现 buildBeforeStage()
方法,并在这个方法中,在 graph 中,添加一个新的 Stage,它的本质是为 Bake A
阶段,添加了一个前置阶段 Bake in ap-beijing
。
1 2 3 4 5 6 7 8 9 10 @Override void beforeStages(@Nonnull Stage parent, @Nonnull StageGraphBuilder graph) { if (isTopLevelStage(parent)) { parallelContexts(parent) .collect({ context -> newStage(parent.execution, type, "Bake in ${context.region}" , context, parent, STAGE_BEFORE) }) .forEach({Stage s -> graph.add(s) }) } }
在搞定整个 Stage 的构造之后,开始 stage.start()
,此处,会将上面创建的两个 Stage 分开运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private fun Stage.start () { val beforeStages = firstBeforeStages() if (beforeStages.isEmpty()) { val task = firstTask() if (task == null ) { val afterStages = firstAfterStages() if (afterStages.isEmpty()) { queue.push(CompleteStage(this )) } else { afterStages.forEach { queue.push(StartStage(it)) } } } else { queue.push(StartTask(this , task.id)) } } else { beforeStages.forEach { queue.push(StartStage(it)) } } }
第一步的 val beforeStages = firstBeforeStages()
会先获取到创建的 Bake in ap-beijing
阶段,并通过 queue.push(StartStage(it))
先执行 Bake in ap-beijing
。所以,只执行完 queue.push(StartStage(it))
,代码的处理逻辑,又会进入 StartStageHandler
中,如下:
接着通过 queue.push(StartTask(this, task.id))
,开始执行第一个 Task,即 createBake。
StartTaskHandler 1 2 3 4 5 6 7 8 9 10 message.withTask { stage, task -> task.status = RUNNING task.startTime = clock.millis() val mergedContextStage = stage.withMergedContext() repository.storeStage(mergedContextStage) queue.push(RunTask(message, task.id, task.type)) publisher.publishEvent(TaskStarted(this , mergedContextStage, task)) }
这里记录了 task 的开始时间、并保存到数据库中,同时通过 queue.push(RunTask(message, task.id, task.type))
开启 task。
这里的 message.withTask
里面的套了好几层逻辑,与后续的 RunTask 有类似之处。
RunTaskHandler 在 handle() 方法中,通过解析,最终得到了实际的 task,即 CreateBakeTask。
CreateBakeTask 在此 Task 中,主要是对 bake 所需参数进行拼凑,主要步骤如下:
根据 account 获取 secret_id 和 secret_key。
根据 pipeline 中的设置,整理出 packer 使用的配置。
调用 rosco,并等待请求返回。
返回 task 执行成功。
task 执行完成 当 CreateBakeTask 返回的状态为成功时,后续会触发 CompleteTask 事件,也就是对应 CompleteTaskHandler。
CompleteTaskHandler 当一个 task 结束时,会判断 task 所属 stage 是否结束、是否已被手动跳过、已经进行触发下个 task。
1 2 3 4 5 6 7 mergedContextStage.nextTask(task).let { if (it == null ) { queue.push(NoDownstreamTasks(message)) } else { queue.push(StartTask(message, it.id)) } }
其中查找下一个 task 的逻辑只是将下标 +1
1 2 3 4 5 6 7 fun Stage.nextTask (task: Task ) : Task? = if (task.isStageEnd) { null } else { val index = tasks.indexOf(task) tasks[index + 1 ] }
接着,便开始了一个新的循环,分别开始 MonitorBakeTask、CompletedBakeTask、BindProducedArtifactsTask。每个 task 都会像 CreateBakeTask 一样,经历下面的 3 个阶段:
StartTaskHandler
RunTaskHandler
CompleteTaskHandler
当最后一个 task 执行完了之后,仍然会到达 CompleteTaskHandler 的 handle() 方法,此时会触发 CompleteStage。
CompleteStageHandler 主要逻辑分两个,一个是对 afterStage 的处理,一个是对 nextStage 的处理。
与前面的 beforeStage 对应,还有一个 afterStage ,逻辑应该与 beforeStage 类似,当执行完一个 stage 之后,再执行它的 afterStage。
stage.startNext()
获取接下来的 stages 的逻辑 :并发执行所有接下来的 stage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 fun Stage.startNext () { execution.let { execution -> val downstreamStages = downstreamStages() val phase = syntheticStageOwner if (downstreamStages.isNotEmpty()) { downstreamStages.forEach { queue.push(StartStage(it)) } } else if (phase != null ) { queue.ensure(ContinueParentStage(parent(), phase), Duration.ZERO) } else { queue.push(CompleteExecution(execution)) } } }
如何确定接下来的 stages
根据当前 stage 的 refId,到 pipeline 中去遍历所有的 stage,只要该 stage 的 requisiteStageRefIds 里面有当前 stage 的 refId,那么它就将要被执行。
1 2 3 4 5 6 @JsonIgnore public List<Stage> downstreamStages () { return getExecution().getStages().stream() .filter(it -> it.getRequisiteStageRefIds().contains(getRefId())) .collect(toList()); }