Spinnaker orca 禁止流水线并发执行的 bug 修复记录

orca 是 Spinnaker 这个开源 CD 服务的心脏模块,负责 CD 流水线的编排、执行,是大脑一般的存在。在 Spinnaker 这个产品中,每次部署操作都可以抽象成一条 流水线,当 流水线 触发时,会产生一次 执行。当该 流水线 已存在一次正在运行中的 执行 时,新的 执行 是否能够执行

orcaSpinnaker 这个开源 CD 服务的心脏模块,负责 CD 流水线的编排、执行,是大脑一般的存在。
在 Spinnaker 这个产品中,每次部署操作都可以抽象成一条 流水线,当 流水线 触发时,会产生一次 执行。当该 流水线 已存在一次正在运行中的 执行 时,新的 执行 是否能够执行,取决于 流水线 中的一个配置项:limitConcurrent
当此选项为 true 时,新的 执行 将进入等待状态,待正在运行的 执行 完成后,才会继续执行;当次选项为 false 时,新的 执行 将立刻执行

用法 & 产生此问题的前提

coding.net 的部署控制台中,点击应用,进入流水线编辑,在基础配置中:

勾选 禁止本流程并行执行 选项,打钩则意味着同一时间一个流水线只能有一个执行在运行。

勾选 不要自动取消在排队状态的部署执行任务,如果由于勾选 禁止本流程并行执行 而产生的处于排队等待状态的执行,不会被自动取消,直至进入运行状态或手动取消。

rFhcDR

现象

流水线在多次触发后,由于禁止本流程并行执行,所以有一些 执行 进入等待状态

3MsUHM

但当正在运行的 执行 跑完后,后触发的 执行 并未进入运行状态,一直处于等待中。

qf2g7y

此时如果再触发一次流水线

pWEomD

可以看到最新的一次执行完成后,之前处于等待状态的执行进入了运行中的状态。

IjM8vR

原因

禁止并发执行依赖于 Redis 中一个叫做 orca.pipeline.queue.${pipeline_config_id} 的 key,它是 list 类型,其中存储的内容(部分)是 StartExecution 类型的 Message,如下:

kxhILp

该 list 在 orca 代码中主要由 PendingExecutionService 来操作。主要涉及下面两个操作:

  • enqueue(左入,头插)
  • popOldest(右出,尾取)

往该 list 存东西的时机只有 2 处:

FV2SpP

其中在 startExecution 时,会判断 如果禁用并行执行 & 该流水线有在执行的 execution ,如果满足要求,就会安排入队,其中入队的信息是 StartExecution Message。

override fun handle(message: StartExecution) {
  message.withExecution { execution ->
    if (execution.status == NOT_STARTED && !execution.isCanceled) {
      if (execution.shouldQueue()) {
        execution.pipelineConfigId?.let {
          log.info("Queueing {} {} {}", execution.application, execution.name, execution.id)
          pendingExecutionService.enqueue(it, message)
        }
      } else {
        start(execution)
      }
    } else {
      terminate(execution)
    }
  }
}

这里的 enqueue 不会做任何校验,会直接入队。

override fun enqueue(pipelineConfigId: String, message: Message) {
  pool.resource.use { redis ->
    redis.lpush(listName(pipelineConfigId), mapper.writeValueAsString(message))
  }
}

如果存在两个一模一样的 StartExecution 被送到 StartExectionHandler 执行,那么就会入队两次。

往该 list 中取东西的时机只有 1 处,即在 StartWaitingExecutionsHandler 中,如下:

Jcs2v8
override fun popOldest(pipelineConfigId: String): Message? =
  pool.resource.use { redis ->
    redis
      .rpop(listName(pipelineConfigId))
      ?.let { mapper.readValue(it) }
  }

StartWaitingExecutions Message 被插入 Redis 中的时机主要看 CompleteExecutionHandler 中,如下:

O6lKLh

也就是说,当一次 execution 完成后,会尝试执行因并发策略而排队的执行

至此,完整的逻辑链路梳理完毕。

客户问题日志验证

此次 execution 01FSC58BRQW5GH19AM8D45SN9E,用户等待了 30min 以上,再真正被执行,期间还进行了多次触发。此次 execution 的日志如下:

jeQ0AS

可以看出此次 execution 在 2022-01-14 19:23:39 被提交,但在 2022-01-14 19:23:41 之后,就再也没有任何动静了,直到 2022-01-14 20:00:54 才真正执行。 提交后没有动静,是因为此时有流水线正在执行。可以看到查询数据库获得此时的执行记录如下:

qwsC2o

按照前面的逻辑,当正在执行的 execution 完成后,会依次执行排队在中的 execution,直到此次 execution 01FSC58BRQW5GH19AM8D45SN9E。但是可以观察到,在此次 execution 01FSC58BRQW5GH19AM8D45SN9E 的前面,有 3 次 execution,其中出现异常的是第 2 次,即 01FSC3YKGFYX68X2DDXT3W9MD6,它在 2022-01-14 19:45:49 执行完之后,并没有去执行后续队列中的 execution,直到 2022-01-14 19:59:52 开始了一次新的 execution 01FSC7AMB7SDMT3KE2SNER4KBT,注意它是用户在发现很久没有触发后,再一次手动触发的。它触发完成后,才轮到 01FSC58BRQW5GH19AM8D45SN9E 执行,此时已过去 30 min 多。

出现问题的 execution 可以通过日志发现他被入队了两次。

JVofaY

可以从上面的代码中发现,只要是打了 Queueing xxx 日志的地方,那次 execution 一定会入队。所以出现两次,会入队两次。且上面的日志,来自于两个不同的 pod。

8CR61W

为啥不能入队两次

假设入队了两次,就会在获取到第一个消息时,成功/失败执行该 execution,等到该 execution 执行失败后,会再次从该队列中去取,取到了,但是是同一个流水线,且启动流水线时,对流水线的状态时有要求的,正常执行需要 execution.status == NOT_STARTED && !execution.isCanceled 即:

override fun handle(message: StartExecution) {
  message.withExecution { execution ->
    if (execution.status == NOT_STARTED && !execution.isCanceled) {
      if (execution.shouldQueue()) {
        execution.pipelineConfigId?.let {
          log.info("Queueing {} {} {}", execution.application, execution.name, execution.id)
          pendingExecutionService.enqueue(it, message)
        }
      } else {
        start(execution)
      }
    } else {
      terminate(execution)
    }
  }
}

如果不满足状态要求,会执行终止该 execution,即:

private fun terminate(execution: Execution) {
  if (execution.status == CANCELED || execution.isCanceled) {
    publisher.publishEvent(ExecutionComplete(this, execution.type, execution.id, execution.status))
    execution.pipelineConfigId?.let {
      queue.push(StartWaitingExecutions(it, purgeQueue = !execution.isKeepWaitingPipelines))
    }
  } else {
    log.warn("Execution (type: ${execution.type}, id: {}, status: ${execution.status}, application: {})" +
      " cannot be started unless state is NOT_STARTED. Ignoring StartExecution message.",
      value("executionId", execution.id),
      value("application", execution.application))
  }

这里最主要的问题是不会再从 orca.pipeline.queue.${pipeline_config_id} 中取,也就是说,如果队列中还有排队的 execution,将得不到执行。所以可以从日志中看到,01FSC3YKGFYX68X2DDXT3W9MD6 执行完成后,又从队列中取到了一模一样的消息,再次启动该流水线时发现状态不符合要求,也不再执行等待队列中的其他 execution。

8LiDF1

修复思路

在尝试启动排队的 execution 时,对该 execution 的状态做判断,不满足要求,直接 push(StartWaitExecution),此时,后续的执行又会从 StartWaitExecutionHandler 的入口开始,类似于一个 for 循环;满足要求的 execution,直接 push(StartExecution),尝试执行下一个 execution。

修复后的代码如下:

@Component
class StartWaitingExecutionsHandler(
  override val queue: Queue,
  override val repository: ExecutionRepository,
  private val pendingExecutionService: PendingExecutionService
) : OrcaMessageHandler<StartWaitingExecutions> {

  private val log: Logger get() = LoggerFactory.getLogger(javaClass)

  override val messageType = StartWaitingExecutions::class.java

  override fun handle(message: StartWaitingExecutions) {
    if (message.purgeQueue) {
      // when purging the queue, run the latest message and discard the rest
      pendingExecutionService.popNewest(message.pipelineConfigId)
        .also { _ ->
          pendingExecutionService.purge(message.pipelineConfigId) { purgedMessage ->
            when (purgedMessage) {
              is StartExecution -> {
                log.info("Dropping queued pipeline {} {}", purgedMessage.application, purgedMessage.executionId)
                queue.push(CancelExecution(purgedMessage))
              }
              is RestartStage -> {
                log.info("Cancelling restart of {} {}", purgedMessage.application, purgedMessage.executionId)
                // don't need to do anything else
              }
            }
          }
        }
    } else {
      // when not purging the queue, run the messages in the order they came in
      pendingExecutionService.popOldest(message.pipelineConfigId)
    }
      ?.let {
        when (it) {
          is StartExecution -> {
            try {
              val startedExecution: Execution = repository.retrieve(it.executionType, it.executionId)
              if (startedExecution.status != ExecutionStatus.NOT_STARTED || startedExecution.isCanceled) {
                log.info(
                  "Dropping queued pipeline {} {}, because its status was {}, which can not be stated",
                  startedExecution.application, startedExecution.id, startedExecution.status
                )
                queue.push(message)
                return
              }
            } catch (enfe: ExecutionNotFoundException) {
              log.debug("queued pipeline {} {} not found, and can be started later", it.application, it.executionId)
            }
          }
        }
        // spoiler, it always is!
        if (it is ExecutionLevel) {
          log.info("Starting queued pipeline {} {} {}", it.application, it.executionId)
        }
        queue.push(it)
      }
  }
}

Read more

容器镜像(4):镜像的常用工具箱

容器镜像(4):镜像的常用工具箱

前几篇在讲多架构镜像时已经用过 skopeo 和 crane 做镜像复制,这篇系统整理这两个工具的完整能力,同时介绍几个日常操作镜像时同样好用的工具。 一、skopeo:不依赖 Daemon 的镜像瑞士军刀 skopeo 的核心价值是绕过 Docker daemon,直接与 Registry API 交互。上一篇用它做镜像复制和离线传输,但它的能力远不止于此。 1.1 安装 # Ubuntu / Debian sudo apt install -y skopeo skopeo --version # skopeo version 1.15.1 1.2 inspect:免拉取检查镜像元数据 docker inspect 需要先把镜像拉到本地,skopeo inspect 直接向 Registry

容器镜像(3):多架构镜像构建

容器镜像(3):多架构镜像构建

一、什么是多架构镜像 1.1 OCI Image Index 上一篇介绍了单平台镜像的结构:一个 Manifest 指向 Config 和若干 Layer blob。多架构镜像在此之上多了一层——OCI Image Index(也叫 Manifest List),是一个轻量的索引文件,把多个单平台 Manifest 组织在一起: $ docker manifest inspect golang:1.22-alpine { "schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests&

容器镜像(2):containerd 视角下的镜像

容器镜像(2):containerd 视角下的镜像

一、为什么需要了解 containerd 如果你只用 docker run 跑容器,从来不关心底层,那可以不了解 containerd。但如果你在用 Kubernetes,或者想真正理解"容器运行时"是什么,containerd 是绕不开的。 事实上,当你执行 docker run 的时候,containerd 早就在后台悄悄工作了——Docker 从 1.11 版本开始,就把核心运行时剥离出来交给 containerd 负责。 1.1 Docker 的架构演变 早期的 Docker(1.10 及之前)是一个"大一统"的单体程序:一个 dockerd