Spinnaker orca 禁止流水线并发执行的 bug 修复记录

orcaSpinnaker 这个开源 CD 服务的心脏模块,负责 CD 流水线的编排、执行,是大脑一般的存在。

在 Spinnaker 这个产品中,每次部署操作都可以抽象成一条 流水线,当 流水线 触发时,会产生一次 执行。当该 流水线 已存在一次正在运行中的 执行 时,新的 执行 是否能够执行,取决于 流水线 中的一个配置项:limitConcurrent

当此选项为 true 时,新的 执行 将进入等待状态,待正在运行的 执行 完成后,才会继续执行;当次选项为 false 时,新的 执行 将立刻执行

用法 & 产生此问题的前提

coding.net 的部署控制台中,点击应用,进入流水线编辑,在基础配置中:

  1. 勾选 禁止本流程并行执行 选项,打钩则意味着同一时间一个流水线只能有一个执行在运行。

  2. 勾选 不要自动取消在排队状态的部署执行任务,如果由于勾选 禁止本流程并行执行 而产生的处于排队等待状态的执行,不会被自动取消,直至进入运行状态或手动取消。

rFhcDR

现象

流水线在多次触发后,由于禁止本流程并行执行,所以有一些 执行 进入等待状态

3MsUHM

但当正在运行的 执行 跑完后,后触发的 执行 并未进入运行状态,一直处于等待中。

qf2g7y

此时如果再触发一次流水线

pWEomD

可以看到最新的一次执行完成后,之前处于等待状态的执行进入了运行中的状态。

IjM8vR

原因

禁止并发执行依赖于 Redis 中一个叫做 orca.pipeline.queue.${pipeline_config_id} 的 key,它是 list 类型,其中存储的内容(部分)是 StartExecution 类型的 Message,如下:

kxhILp

该 list 在 orca 代码中主要由 PendingExecutionService 来操作。主要涉及下面两个操作:

  • enqueue(左入,头插)
  • popOldest(右出,尾取)

往该 list 存东西的时机只有 2 处:

FV2SpP

其中在 startExecution 时,会判断 如果禁用并行执行 & 该流水线有在执行的 execution ,如果满足要求,就会安排入队,其中入队的信息是 StartExecution Message。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
override fun handle(message: StartExecution) {
message.withExecution { execution ->
if (execution.status == NOT_STARTED && !execution.isCanceled) {
if (execution.shouldQueue()) {
execution.pipelineConfigId?.let {
log.info("Queueing {} {} {}", execution.application, execution.name, execution.id)
pendingExecutionService.enqueue(it, message)
}
} else {
start(execution)
}
} else {
terminate(execution)
}
}
}

这里的 enqueue 不会做任何校验,会直接入队。

1
2
3
4
5
override fun enqueue(pipelineConfigId: String, message: Message) {
pool.resource.use { redis ->
redis.lpush(listName(pipelineConfigId), mapper.writeValueAsString(message))
}
}

如果存在两个一模一样的 StartExecution 被送到 StartExectionHandler 执行,那么就会入队两次。

往该 list 中取东西的时机只有 1 处,即在 StartWaitingExecutionsHandler 中,如下:

Jcs2v8

1
2
3
4
5
6
override fun popOldest(pipelineConfigId: String): Message? =
pool.resource.use { redis ->
redis
.rpop(listName(pipelineConfigId))
?.let { mapper.readValue(it) }
}

StartWaitingExecutions Message 被插入 Redis 中的时机主要看 CompleteExecutionHandler 中,如下:

O6lKLh也就是说,当一次 execution 完成后,会尝试执行因并发策略而排队的执行

至此,完整的逻辑链路梳理完毕。

客户问题日志验证

此次 execution 01FSC58BRQW5GH19AM8D45SN9E,用户等待了 30min 以上,再真正被执行,期间还进行了多次触发。此次 execution 的日志如下:

jeQ0AS

可以看出此次 execution 在 2022-01-14 19:23:39 被提交,但在 2022-01-14 19:23:41 之后,就再也没有任何动静了,直到 2022-01-14 20:00:54 才真正执行。
提交后没有动静,是因为此时有流水线正在执行。可以看到查询数据库获得此时的执行记录如下:

qwsC2o

按照前面的逻辑,当正在执行的 execution 完成后,会依次执行排队在中的 execution,直到此次 execution 01FSC58BRQW5GH19AM8D45SN9E。但是可以观察到,在此次 execution 01FSC58BRQW5GH19AM8D45SN9E 的前面,有 3 次 execution,其中出现异常的是第 2 次,即 01FSC3YKGFYX68X2DDXT3W9MD6,它在 2022-01-14 19:45:49 执行完之后,并没有去执行后续队列中的 execution,直到 2022-01-14 19:59:52 开始了一次新的 execution 01FSC7AMB7SDMT3KE2SNER4KBT,注意它是用户在发现很久没有触发后,再一次手动触发的。它触发完成后,才轮到 01FSC58BRQW5GH19AM8D45SN9E 执行,此时已过去 30 min 多。

出现问题的 execution 可以通过日志发现他被入队了两次。

JVofaY

可以从上面的代码中发现,只要是打了 Queueing xxx 日志的地方,那次 execution 一定会入队。所以出现两次,会入队两次。且上面的日志,来自于两个不同的 pod。

8CR61W

为啥不能入队两次

假设入队了两次,就会在获取到第一个消息时,成功/失败执行该 execution,等到该 execution 执行失败后,会再次从该队列中去取,取到了,但是是同一个流水线,且启动流水线时,对流水线的状态时有要求的,正常执行需要 execution.status == NOT_STARTED && !execution.isCanceled 即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
override fun handle(message: StartExecution) {
message.withExecution { execution ->
if (execution.status == NOT_STARTED && !execution.isCanceled) {
if (execution.shouldQueue()) {
execution.pipelineConfigId?.let {
log.info("Queueing {} {} {}", execution.application, execution.name, execution.id)
pendingExecutionService.enqueue(it, message)
}
} else {
start(execution)
}
} else {
terminate(execution)
}
}
}

如果不满足状态要求,会执行终止该 execution,即:

1
2
3
4
5
6
7
8
9
10
11
12
private fun terminate(execution: Execution) {
if (execution.status == CANCELED || execution.isCanceled) {
publisher.publishEvent(ExecutionComplete(this, execution.type, execution.id, execution.status))
execution.pipelineConfigId?.let {
queue.push(StartWaitingExecutions(it, purgeQueue = !execution.isKeepWaitingPipelines))
}
} else {
log.warn("Execution (type: ${execution.type}, id: {}, status: ${execution.status}, application: {})" +
" cannot be started unless state is NOT_STARTED. Ignoring StartExecution message.",
value("executionId", execution.id),
value("application", execution.application))
}

这里最主要的问题是不会再从 orca.pipeline.queue.${pipeline_config_id} 中取,也就是说,如果队列中还有排队的 execution,将得不到执行。所以可以从日志中看到,01FSC3YKGFYX68X2DDXT3W9MD6 执行完成后,又从队列中取到了一模一样的消息,再次启动该流水线时发现状态不符合要求,也不再执行等待队列中的其他 execution。

8LiDF1

修复思路

在尝试启动排队的 execution 时,对该 execution 的状态做判断,不满足要求,直接 push(StartWaitExecution),此时,后续的执行又会从 StartWaitExecutionHandler 的入口开始,类似于一个 for 循环;满足要求的 execution,直接 push(StartExecution),尝试执行下一个 execution。

修复后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Component
class StartWaitingExecutionsHandler(
override val queue: Queue,
override val repository: ExecutionRepository,
private val pendingExecutionService: PendingExecutionService
) : OrcaMessageHandler<StartWaitingExecutions> {

private val log: Logger get() = LoggerFactory.getLogger(javaClass)

override val messageType = StartWaitingExecutions::class.java

override fun handle(message: StartWaitingExecutions) {
if (message.purgeQueue) {
// when purging the queue, run the latest message and discard the rest
pendingExecutionService.popNewest(message.pipelineConfigId)
.also { _ ->
pendingExecutionService.purge(message.pipelineConfigId) { purgedMessage ->
when (purgedMessage) {
is StartExecution -> {
log.info("Dropping queued pipeline {} {}", purgedMessage.application, purgedMessage.executionId)
queue.push(CancelExecution(purgedMessage))
}
is RestartStage -> {
log.info("Cancelling restart of {} {}", purgedMessage.application, purgedMessage.executionId)
// don't need to do anything else
}
}
}
}
} else {
// when not purging the queue, run the messages in the order they came in
pendingExecutionService.popOldest(message.pipelineConfigId)
}
?.let {
when (it) {
is StartExecution -> {
try {
val startedExecution: Execution = repository.retrieve(it.executionType, it.executionId)
if (startedExecution.status != ExecutionStatus.NOT_STARTED || startedExecution.isCanceled) {
log.info(
"Dropping queued pipeline {} {}, because its status was {}, which can not be stated",
startedExecution.application, startedExecution.id, startedExecution.status
)
queue.push(message)
return
}
} catch (enfe: ExecutionNotFoundException) {
log.debug("queued pipeline {} {} not found, and can be started later", it.application, it.executionId)
}
}
}
// spoiler, it always is!
if (it is ExecutionLevel) {
log.info("Starting queued pipeline {} {} {}", it.application, it.executionId)
}
queue.push(it)
}
}
}

Spinnaker orca 禁止流水线并发执行的 bug 修复记录

https://eucham.me/2022/04/20/beca913cb6e2.html

作者

遇寻

发布于

2022-04-20

更新于

2022-04-20

许可协议

评论