orca 是 Spinnaker 这个开源 CD 服务的心脏模块,负责 CD 流水线的编排、执行,是大脑一般的存在。
在 Spinnaker 这个产品中,每次部署操作都可以抽象成一条 流水线 ,当 流水线 触发时,会产生一次 执行 。当该 流水线 已存在一次正在运行中的 执行 时,新的 执行 是否能够执行,取决于 流水线 中的一个配置项:limitConcurrent
。
当此选项为 true
时,新的 执行 将进入等待状态,待正在运行的 执行 完成后,才会继续执行;当次选项为 false
时,新的 执行 将立刻执行
用法 & 产生此问题的前提 在 coding.net 的部署控制台中,点击应用,进入流水线编辑,在基础配置中:
勾选 禁止本流程并行执行
选项,打钩则意味着同一时间一个流水线 只能有一个执行 在运行。
勾选 不要自动取消在排队状态的部署执行任务
,如果由于勾选 禁止本流程并行执行
而产生的处于排队等待状态的执行 ,不会被自动取消,直至进入运行状态或手动取消。
现象 流水线在多次触发后,由于禁止本流程并行执行 ,所以有一些 执行 进入等待状态
但当正在运行的 执行 跑完后,后触发的 执行 并未进入运行状态,一直处于等待中。
此时如果再触发一次流水线
可以看到最新的一次执行 完成后,之前处于等待状态的执行 进入了运行中的状态。
原因 禁止并发执行依赖于 Redis 中一个叫做 orca.pipeline.queue.${pipeline_config_id}
的 key,它是 list 类型,其中存储的内容(部分)是 StartExecution
类型的 Message,如下:
该 list 在 orca 代码中主要由 PendingExecutionService
来操作。主要涉及下面两个操作:
enqueue(左入,头插)
popOldest(右出,尾取)
往该 list 存东西的时机只有 2 处:
其中在 startExecution 时,会判断 如果禁用并行执行 & 该流水线有在执行的 execution
,如果满足要求,就会安排入队,其中入队的信息是 StartExecution Message。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 override fun handle (message: StartExecution ) { message.withExecution { execution -> if (execution.status == NOT_STARTED && !execution.isCanceled) { if (execution.shouldQueue()) { execution.pipelineConfigId?.let { log.info("Queueing {} {} {}" , execution.application, execution.name, execution.id) pendingExecutionService.enqueue(it, message) } } else { start(execution) } } else { terminate(execution) } } }
这里的 enqueue 不会做任何校验,会直接入队。
1 2 3 4 5 override fun enqueue (pipelineConfigId: String , message: Message ) { pool.resource.use { redis -> redis.lpush(listName(pipelineConfigId), mapper.writeValueAsString(message)) } }
如果存在两个一模一样的 StartExecution 被送到 StartExectionHandler 执行,那么就会入队两次。
往该 list 中取东西的时机只有 1 处,即在 StartWaitingExecutionsHandler
中,如下:
1 2 3 4 5 6 override fun popOldest (pipelineConfigId: String ) : Message? = pool.resource.use { redis -> redis .rpop(listName(pipelineConfigId)) ?.let { mapper.readValue(it) } }
StartWaitingExecutions
Message 被插入 Redis 中的时机主要看 CompleteExecutionHandler
中,如下:
也就是说,当一次 execution 完成后,会尝试执行因并发策略而排队的执行
。
至此,完整的逻辑链路梳理完毕。
客户问题日志验证 此次 execution 01FSC58BRQW5GH19AM8D45SN9E
,用户等待了 30min 以上,再真正被执行,期间还进行了多次触发。此次 execution 的日志如下:
可以看出此次 execution 在 2022-01-14 19:23:39
被提交,但在 2022-01-14 19:23:41
之后,就再也没有任何动静了,直到 2022-01-14 20:00:54 才真正执行。 提交后没有动静,是因为此时有流水线正在执行。可以看到查询数据库获得此时的执行记录如下:
按照前面的逻辑,当正在执行的 execution 完成后,会依次执行排队在中的 execution,直到此次 execution 01FSC58BRQW5GH19AM8D45SN9E
。但是可以观察到,在此次 execution 01FSC58BRQW5GH19AM8D45SN9E
的前面,有 3 次 execution,其中出现异常的是第 2 次,即 01FSC3YKGFYX68X2DDXT3W9MD6
,它在 2022-01-14 19:45:49 执行完之后,并没有去执行后续队列中的 execution,直到 2022-01-14 19:59:52 开始了一次新的 execution 01FSC7AMB7SDMT3KE2SNER4KBT
,注意它是用户在发现很久没有触发后,再一次手动触发的。它触发完成后,才轮到 01FSC58BRQW5GH19AM8D45SN9E
执行,此时已过去 30 min 多。
出现问题的 execution 可以通过日志发现他被入队了两次。
可以从上面的代码中发现,只要是打了 Queueing xxx
日志的地方,那次 execution 一定会入队。所以出现两次,会入队两次。且上面的日志,来自于两个不同的 pod。
为啥不能入队两次 假设入队了两次,就会在获取到第一个消息时,成功/失败执行该 execution,等到该 execution 执行失败后,会再次从该队列中去取,取到了,但是是同一个流水线,且启动流水线时,对流水线的状态时有要求的,正常执行需要 execution.status == NOT_STARTED && !execution.isCanceled
即:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 override fun handle (message: StartExecution ) { message.withExecution { execution -> if (execution.status == NOT_STARTED && !execution.isCanceled) { if (execution.shouldQueue()) { execution.pipelineConfigId?.let { log.info("Queueing {} {} {}" , execution.application, execution.name, execution.id) pendingExecutionService.enqueue(it, message) } } else { start(execution) } } else { terminate(execution) } } }
如果不满足状态要求,会执行终止该 execution,即:
1 2 3 4 5 6 7 8 9 10 11 12 private fun terminate (execution: Execution ) { if (execution.status == CANCELED || execution.isCanceled) { publisher.publishEvent(ExecutionComplete(this , execution.type, execution.id, execution.status)) execution.pipelineConfigId?.let { queue.push(StartWaitingExecutions(it, purgeQueue = !execution.isKeepWaitingPipelines)) } } else { log.warn("Execution (type: ${execution.type} , id: {}, status: ${execution.status} , application: {})" + " cannot be started unless state is NOT_STARTED. Ignoring StartExecution message." , value("executionId" , execution.id), value("application" , execution.application)) }
这里最主要的问题是不会再从 orca.pipeline.queue.${pipeline_config_id}
中取,也就是说,如果队列中还有排队的 execution,将得不到执行。所以可以从日志中看到,01FSC3YKGFYX68X2DDXT3W9MD6
执行完成后,又从队列中取到了一模一样的消息,再次启动该流水线时发现状态不符合要求,也不再执行等待队列中的其他 execution。
修复思路 在尝试启动排队的 execution 时,对该 execution 的状态做判断,不满足要求,直接 push(StartWaitExecution),此时,后续的执行又会从 StartWaitExecutionHandler 的入口开始,类似于一个 for 循环;满足要求的 execution,直接 push(StartExecution),尝试执行下一个 execution。
修复后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Component class StartWaitingExecutionsHandler ( override val queue: Queue, override val repository: ExecutionRepository, private val pendingExecutionService: PendingExecutionService ) : OrcaMessageHandler<StartWaitingExecutions> { private val log: Logger get () = LoggerFactory.getLogger(javaClass) override val messageType = StartWaitingExecutions::class .java override fun handle (message: StartWaitingExecutions ) { if (message.purgeQueue) { pendingExecutionService.popNewest(message.pipelineConfigId) .also { _ -> pendingExecutionService.purge(message.pipelineConfigId) { purgedMessage -> when (purgedMessage) { is StartExecution -> { log.info("Dropping queued pipeline {} {}" , purgedMessage.application, purgedMessage.executionId) queue.push(CancelExecution(purgedMessage)) } is RestartStage -> { log.info("Cancelling restart of {} {}" , purgedMessage.application, purgedMessage.executionId) } } } } } else { pendingExecutionService.popOldest(message.pipelineConfigId) } ?.let { when (it) { is StartExecution -> { try { val startedExecution: Execution = repository.retrieve(it.executionType, it.executionId) if (startedExecution.status != ExecutionStatus.NOT_STARTED || startedExecution.isCanceled) { log.info( "Dropping queued pipeline {} {}, because its status was {}, which can not be stated" , startedExecution.application, startedExecution.id, startedExecution.status ) queue.push(message) return } } catch (enfe: ExecutionNotFoundException) { log.debug("queued pipeline {} {} not found, and can be started later" , it.application, it.executionId) } } } if (it is ExecutionLevel) { log.info("Starting queued pipeline {} {} {}" , it.application, it.executionId) } queue.push(it) } } }