3 | Spinnaker 如何执行 pipeline

本篇文章记录以发布单的形式启动的一个流水线的执行过程,并对相关敏感信息做了删除,但不影响对整体流程的介绍。

通过 POST 调用 gate 服务的 pipelines/v2/{application}/{pipeline}

1
2
3
4
@POST("pipelines/v2/{application}/{pipeline}")
Call<Map<String, Object>> postPipeline(@Path("application") String application,
@Path("pipeline") String pipeline,
@Body TaskPipeline taskPipeline);

进入 gate

gate

请求:/v2/{application}/{pipelineNameOrId:.+}

位置:gate-web/src/main/groovy/com/netflix/spinnaker/gate/controllers/PipelineController.groovy

请求进入 gate 之后,会先校验当前用户是否有此流水线的执行权限,然后调用 echo 触发流水线。

1
2
@POST('/')
String postEvent(@Body Map event)

此时的参数 eventMap 为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
"content": {
"application": "artifacturlteam1",
"pipelineNameOrId": "ccbae2c9-ba68-43f3-87ba-669b1bb27831",
"trigger": {
"buildNumber": "0",
"type": "manual",
"dryRun": false,
"parameters": {},
"user": "vskycorpltd",
"artifacts": [
{
"artifactAccount": "generic-repo::1",
"customKind": false,
"id": "94141c3b-1f0f-4674-99e1-cb138905ac25",
"name": "vskycorpltdcorp-generic.pkg.vskycorpltd.com/vsky/generic-repo/abc/efg.txt",
"parentType": "generic",
"pkgId": 2,
"projectId": 1,
"projectName": "vsky",
"reference": "",
"repoName": "generic-repo",
"type": "vskycorpltd_artifact/generic",
"uriName": "vsky"
}
],
"customName": "20201016-artifacturl-p1",
"customDescription": "",
"projectUrlName": "vsky",
"vskycorpltdNickname": "vskycorpltd",
"eventId": "2796229f-fbea-4da8-a8af-fd3de334f765",
"executionId": "01EN7GSM1KA2XHV9SX956YGKCD"
},
"user": "vskycorpltd"
},
"details": {
"type": "manual"
}
}

进入 echo

echo

位置:echo-web/src/main/java/com/netflix/spinnaker/echo/history/HistoryController.java

在 echo 中,上面的参数被转换成一个 echoEvent,然后将此事件,交给事件监听者来处理,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RequestMapping(value = "/", method = RequestMethod.POST)
public void saveHistory(@RequestBody Event event) {
propagator.processEvent(event);
}

public void processEvent(Event event) {
Observable.from(listeners)
.map(
listener ->
AuthenticatedRequest.propagate(
() -> {
listener.processEvent(event);
return null;
}))
.observeOn(scheduler)
.subscribe(
callable -> {
try {
callable.call();
} catch (Exception e) {
log.error("failed processing event: {}", event.content, e);
}
});
}

listeners 来在容器中所有的 EchoEventListener bean。

1
2
3
4
5
6
7
8
@Bean
public EventPropagator propagator() {
EventPropagator instance = new EventPropagator();
for (EchoEventListener e : context.getBeansOfType(EchoEventListener.class).values()) {
instance.addListener(e);
}
return instance;
}

此处有 5 个监听者,他们的继承关系如下(观察者模式):

监听者继承关系

此处仅关注 TriggerEventListener,它将是触发流水线的入口。随后 TriggerEventListener 会将事件交给 所有注册进来的 triggerMonitors 处理。

1
2
3
4
5
public void processEvent(Event event) {
for (TriggerMonitor triggerMonitor : triggerMonitors) {
triggerMonitor.processEvent(event);
}
}

每个 TriggerMonitor 包含一个 TriggerEventHandler,并且在初始化 TriggerEventListener 时,从容器中将所有的 TriggerEventHandler 都捞出来,用 TriggerMonitor 包裹一层,放入 triggerMonitors 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public TriggerEventListener(
@NonNull PipelineCache pipelineCache,
@NonNull PipelineInitiator pipelineInitiator,
@NonNull Registry registry,
@NonNull PipelinePostProcessorHandler pipelinePostProcessorHandler,
@NonNull List<TriggerEventHandler<?>> eventHandlers) {
this.triggerMonitors =
eventHandlers.stream()
.map(
e ->
new TriggerMonitor<>(
pipelineCache,
pipelineInitiator,
registry,
pipelinePostProcessorHandler,
e))
.collect(Collectors.toList());
}

但是并不是所有的 triggerMonitor 都处理这个事件,他们只负责处理各自能处理的事件。

1
2
3
4
5
6
7
8
public void processEvent(Event event) {
validateEvent(event);
if (eventHandler.handleEventType(event.getDetails().getType())) {
recordMetrics();
T triggerEvent = eventHandler.convertEvent(event);
triggerMatchingPipelines(triggerEvent);
}
}

因此有若干个 TriggerEventHandler ,他们分别对应若干触发方式,如下:

此处我们是以手动的方式启动的,在 Event 的 detail 中,type 记录为 manual。

并且判断一个 TriggerEventHandler 能否处理某个事件的方式为:子类覆盖 handleEventType() 方法,并将相应能处理的事件的类型与当前 event.detail.type 对比,以 DockerEventHandler 为例:

1
2
3
4
5
6
7
8
9
@Override
public boolean handleEventType(String eventType) {
return eventType.equalsIgnoreCase(DockerEvent.TYPE);
}

public class DockerEvent extends TriggerEvent {
public static final String TYPE = "DOCKER";
// ...
}

因此,我们最终的入口便是 ManualEventHandler 。开始触发流水线。最终在 echo PipelineInitiator

triggerPipelineImpl() 方法中,通过 triggerWithRetries() 实现对 orca 的 HTTP 调用。

orca

接口:/orchestrate

位置:OperationsController

经过一番折腾后,边开始执行 pipeline,并返回一个 id 给到 echo。其间有几个比较重要的步骤:

  1. 执行parseAndValidatePipeline(pipeline),解决制品问题。最终调用的是:ArtifactResolver.resolveArtifacts()
  2. 进行了一次 contextParameterProcessor.process() ,对 pipeline 中的表达式进行计算。
  3. 将 pipeline 信息转化成 Execution,然后再将其转化成一个 StartExecution 后,存入到一个 Queue 里面。
1
2
3
4
5
6
7
8
9
10
11
override fun start(execution: Execution) = queue.push(StartExecution(execution))

@JsonTypeName("startExecution")
data class StartExecution(
override val executionType: ExecutionType,
override val executionId: String,
override val application: String
) : Message(), ExecutionLevel {
constructor(source: Execution) :
this(source.type, source.id, source.application)
}

其中,StartExecution 的值如下:

1
2
3
4
5
6
7
8
{
"kind": "startExecution",
"attributes": [],
"ackTimeoutMs": null,
"executionType": "PIPELINE",
"executionId": "01ENCS32B8B8AP83FAHYTCCKHN",
"application": "asgteam1"
}

发布&获取事件

Queue 的实现为 RedisQueue,它 push 的代码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
override fun push(message: Message, delay: TemporalAmount) {
pool.resource.use { redis ->
redis.firstFingerprint(queueKey, message.fingerprint()).also { fingerprint ->
if (fingerprint != null) {
// ...
redis.zadd(queueKey, score(delay), fingerprint, zAddParams().xx())
fire(MessageDuplicate(message))
} else {
redis.queueMessage(message, delay)
fire(MessagePushed(message))
}
}
}
}

先将 message 即 StartExecution 的摘要信息存入到一个有序集合中,集合的键为 orca.task.queue.queue,可以查看 redis 中相应的值,结果如下:

ZznQhe

然后通过 fire 函数,发布一个事件,这里应该是利用了 spring 的事件发布机制。

1
2
3
4
5
6
7
8
@Bean
fun queueEventPublisher(
applicationEventPublisher: ApplicationEventPublisher
) = object : EventPublisher {
override fun publishEvent(event: QueueEvent) {
applicationEventPublisher.publishEvent(event)
}
}

ApplicationEventPublisher 是 spring-context 包下的一个接口。它是一个AnnotationConfigServletWebServerApplicationContext 的实例,且继承自AbstractApplicationContext ,它有一个 publishEvent() 方法。

获取消息

位置:com/netflix/spinnaker/q/QueueProcessor.kt

可以参考:https://spinnaker.io/guides/developer/service-overviews/orca/

简而言之:不同的消息类型,对应不同的 handler。例如:StartExecution 消息,对应 StartExecutionHandler。

StartExecutionHandler

1
2
3
4
5
6
7
8
initialStages.forEach { queue.push(StartStage(it)) }

fun Execution.initialStages() =
stages
.filter { it.isInitial() }

fun Stage.isInitial(): Boolean =
requisiteStageRefIds.isEmpty()

往 Queue 中塞若干个 StartStage,这些 Stage 必须不依赖其他 stage (配置除外),这里是并行执行多个 Stage 的入口。

StartStageHandler

stage.plan() 函数执行逻辑:

1
2
3
4
5
6
7
8
9
10
11
private fun Stage.plan() {
builder().let { builder ->
// if we have a top level stage, ensure that context expressions are processed
val mergedStage = if (this.parentStageId == null) this.withMergedContext() else this
builder.addContextFlags(mergedStage)
builder.buildTasks(mergedStage)
builder.buildBeforeStages(mergedStage) { it: Stage ->
repository.addStage(it.withMergedContext())
}
}
}
  • 根据 stage 的定义将 Task 加入到 stage 的 tasks 列表中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fun StageDefinitionBuilder.buildTasks(stage: Stage) {
buildTaskGraph(stage)
.listIterator()
.forEachWithMetadata { processTaskNode(stage, it) }
}

default @Nonnull TaskGraph buildTaskGraph(@Nonnull Stage stage) {
Builder graphBuilder = Builder(FULL);
taskGraph(stage, graphBuilder);
return graphBuilder.build();
}

// 在 Stage 中自定义的 taskGraph()
@Override
void taskGraph(Stage stage, TaskNode.Builder builder) {
if (isTopLevelStage(stage)) {
builder
.withTask("completeParallel", CompleteParallelBakeTask)
} else {
builder
.withTask("createBake", CreateBakeTask)
.withTask("monitorBake", MonitorBakeTask)
.withTask("completedBake", CompletedBakeTask)
.withTask("bindProducedArtifacts", BindProducedArtifactsTask)
}
}

withTask() 方法将会将 taskName,taskImp类,封装到 TaskDefinition 中,并保存在 builder 的 graph 中。

1
2
3
4
5
public Builder withTask(
String name, Class<? extends com.netflix.spinnaker.orca.Task> implementingClass) {
graph.add(new TaskDefinition(name, implementingClass));
return this;
}

然后通过 processTaskNode() 将所有的 task 保存到 stage 的 tasks 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private fun processTaskNode(
stage: Stage,
element: IteratorElement<TaskNode>,
isSubGraph: Boolean = false
) {
element.apply {
when (value) {
is TaskDefinition -> {
val task = Task()
task.id = (stage.tasks.size + 1).toString()
task.name = value.name
task.implementingClass = value.implementingClass.name
if (isSubGraph) {
task.isLoopStart = isFirst
task.isLoopEnd = isLast
} else {
task.isStageStart = isFirst
task.isStageEnd = isLast
}
stage.tasks.add(task)
}
is TaskGraph -> {
// 递归执行...
}
}
}
}
  • 让一个 Stage 包含两个 Stage

简单说就是在一个 Bake 阶段,似乎产生了两个 Bake,并且每个 Bake 的 task 还不一样,但他们都在一个 Bake 阶段里面,如下:

zn6xx5

它的实现只需要在自定义 Stage 中实现 buildBeforeStage() 方法,并在这个方法中,在 graph 中,添加一个新的 Stage,它的本质是为 Bake A 阶段,添加了一个前置阶段 Bake in ap-beijing

1
2
3
4
5
6
7
8
9
10
@Override
void beforeStages(@Nonnull Stage parent, @Nonnull StageGraphBuilder graph) {
if (isTopLevelStage(parent)) {
parallelContexts(parent)
.collect({ context ->
newStage(parent.execution, type, "Bake in ${context.region}", context, parent, STAGE_BEFORE)
})
.forEach({Stage s -> graph.add(s) })
}
}

在搞定整个 Stage 的构造之后,开始 stage.start(),此处,会将上面创建的两个 Stage 分开运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private fun Stage.start() {
val beforeStages = firstBeforeStages()
if (beforeStages.isEmpty()) {
val task = firstTask()
if (task == null) {
// TODO: after stages are no longer planned at this point. We could skip this
val afterStages = firstAfterStages()
if (afterStages.isEmpty()) {
queue.push(CompleteStage(this))
} else {
afterStages.forEach {
queue.push(StartStage(it))
}
}
} else {
queue.push(StartTask(this, task.id))
}
} else {
beforeStages.forEach {
queue.push(StartStage(it))
}
}
}

第一步的 val beforeStages = firstBeforeStages() 会先获取到创建的 Bake in ap-beijing 阶段,并通过 queue.push(StartStage(it)) 先执行 Bake in ap-beijing 。所以,只执行完 queue.push(StartStage(it)) ,代码的处理逻辑,又会进入 StartStageHandler中,如下:

2PjQP0

接着通过 queue.push(StartTask(this, task.id)),开始执行第一个 Task,即 createBake。

V66UMR

StartTaskHandler

1
2
3
4
5
6
7
8
9
10
message.withTask { stage, task ->
task.status = RUNNING
task.startTime = clock.millis()
val mergedContextStage = stage.withMergedContext()
repository.storeStage(mergedContextStage)

queue.push(RunTask(message, task.id, task.type))

publisher.publishEvent(TaskStarted(this, mergedContextStage, task))
}

这里记录了 task 的开始时间、并保存到数据库中,同时通过 queue.push(RunTask(message, task.id, task.type)) 开启 task。

这里的 message.withTask 里面的套了好几层逻辑,与后续的 RunTask 有类似之处。

RunTaskHandler

在 handle() 方法中,通过解析,最终得到了实际的 task,即 CreateBakeTask。

vKZN8f

CreateBakeTask

在此 Task 中,主要是对 bake 所需参数进行拼凑,主要步骤如下:

  1. 根据 account 获取 secret_id 和 secret_key。

  2. 根据 pipeline 中的设置,整理出 packer 使用的配置。

  3. 调用 rosco,并等待请求返回。

  4. 返回 task 执行成功。

    NLtcbu

task 执行完成

当 CreateBakeTask 返回的状态为成功时,后续会触发 CompleteTask 事件,也就是对应 CompleteTaskHandler。

wDuUiF

CompleteTaskHandler

当一个 task 结束时,会判断 task 所属 stage 是否结束、是否已被手动跳过、已经进行触发下个 task。

1
2
3
4
5
6
7
mergedContextStage.nextTask(task).let {
if (it == null) {
queue.push(NoDownstreamTasks(message))
} else {
queue.push(StartTask(message, it.id))
}
}

其中查找下一个 task 的逻辑只是将下标 +1

1
2
3
4
5
6
7
fun Stage.nextTask(task: Task): Task? =
if (task.isStageEnd) {
null
} else {
val index = tasks.indexOf(task)
tasks[index + 1]
}

接着,便开始了一个新的循环,分别开始 MonitorBakeTask、CompletedBakeTask、BindProducedArtifactsTask。每个 task 都会像 CreateBakeTask 一样,经历下面的 3 个阶段:

  • StartTaskHandler
  • RunTaskHandler
  • CompleteTaskHandler

当最后一个 task 执行完了之后,仍然会到达 CompleteTaskHandler 的 handle() 方法,此时会触发 CompleteStage。

CompleteStageHandler

主要逻辑分两个,一个是对 afterStage 的处理,一个是对 nextStage 的处理。

  1. 与前面的 beforeStage 对应,还有一个 afterStage,逻辑应该与 beforeStage 类似,当执行完一个 stage 之后,再执行它的 afterStage。
  2. stage.startNext()

获取接下来的 stages 的逻辑:并发执行所有接下来的 stage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun Stage.startNext() {
execution.let { execution ->
val downstreamStages = downstreamStages()
val phase = syntheticStageOwner
if (downstreamStages.isNotEmpty()) {
downstreamStages.forEach {
queue.push(StartStage(it))
}
} else if (phase != null) {
queue.ensure(ContinueParentStage(parent(), phase), Duration.ZERO)
} else {
queue.push(CompleteExecution(execution))
}
}
}

如何确定接下来的 stages

根据当前 stage 的 refId,到 pipeline 中去遍历所有的 stage,只要该 stage 的 requisiteStageRefIds 里面有当前 stage 的 refId,那么它就将要被执行。

1
2
3
4
5
6
@JsonIgnore
public List<Stage> downstreamStages() {
return getExecution().getStages().stream()
.filter(it -> it.getRequisiteStageRefIds().contains(getRefId()))
.collect(toList());
}

3 | Spinnaker 如何执行 pipeline

https://eucham.me/2020/10/31/6ce2d2364228.html

作者

遇寻

发布于

2020-10-31

更新于

2022-04-20

许可协议

评论