Volcano 与 Kubernetes GPU 调度学习笔记

本笔记系统整理 Volcano 调度器、Kubernetes 调度框架、GPU Device Plugin、HAMi 等云原生 AI 调度领域的核心知识,适合用于学习、复习和工程实践参考。

目录


第一部分:Volcano 入门

1. Volcano 是什么

Volcano 是 CNCF 旗下的云原生批处理调度系统,最初由华为开源,主要用于解决 Kubernetes 默认调度器在 AI 训练、大数据、HPC 等高性能计算场景下的不足。

定位:Kubernetes 之上的批处理调度增强层,与 kube-scheduler 互补而非替代。

2. 安装与快速使用

2.1 安装

最简单方式,使用官方 YAML 一键部署:

kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/master/installer/volcano-development.yaml

也支持 Helm 安装。安装完成后会在 volcano-system 命名空间下看到三个核心组件:

  • volcano-scheduler —— 调度器
  • volcano-controllers —— Controller Manager
  • volcano-admission —— 准入控制

验证安装:

kubectl get pods -n volcano-system

2.2 提交一个 Volcano Job

Volcano 的核心使用方式是通过它的 Job CRD 提交批处理任务。下面是一个典型的分布式训练任务:

apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: tensorflow-dist
spec:
  minAvailable: 3              # 至少 3 个 Pod 就绪才启动(gang scheduling)
  schedulerName: volcano
  queue: default               # 提交到哪个队列
  policies:
    - event: PodEvicted
      action: RestartJob       # Pod 被驱逐时整个 Job 重启
  tasks:
    - replicas: 1
      name: ps
      template:
        spec:
          containers:
            - name: tensorflow
              image: tensorflow/tensorflow:latest
              resources:
                requests:
                  cpu: "2"
                  memory: "4Gi"
    - replicas: 2
      name: worker
      template:
        spec:
          containers:
            - name: tensorflow
              image: tensorflow/tensorflow:latest
              resources:
                requests:
                  nvidia.com/gpu: 1

关键字段说明:

  • minAvailable: 控制成组调度的下限
  • tasks: 定义不同角色的 Pod
  • policies: 定义生命周期事件的处理策略

2.3 创建队列管理资源

队列用于多租户隔离和资源配额:

apiVersion: scheduling.volcano.sh/v1beta1
kind: Queue
metadata:
  name: team-a
spec:
  weight: 4                    # 资源分配权重
  capability:                  # 队列资源上限
    cpu: "100"
    memory: "200Gi"
    nvidia.com/gpu: "8"
  reclaimable: true            # 是否允许其他队列回收闲置资源

提交 Job 时通过 spec.queue: team-a 指定即可。

3. 核心特性一览

特性 用途
Gang Scheduling 一组 Pod 要么全调度,要么全不调度,避免分布式任务死锁
Queue 队列 多租户资源隔离、配额管理、权重分配
DRF 公平调度 多维资源(CPU/GPU/内存)下的公平性保证
优先级与抢占 高优先级任务可抢占低优先级任务的资源
资源回收 队列超用资源时可被其他队列回收
拓扑感知调度 NUMA、GPU 拓扑感知,优化训练性能
多框架支持 TensorFlow、PyTorch、MPI、Spark、Flink 等
Job 生命周期 失败重启、依赖控制、插件化扩展(如 SSH、SVC)

第二部分:Volcano 整体架构

4. Volcano 解决的核心问题

Kubernetes 默认调度器是为在线服务设计的:Pod 粒度、逐个调度、无状态。但批处理场景有几个根本不同:

痛点 1:任务整体性

分布式训练的 100 个 Pod 必须同时启动,少一个就跑不起来。默认调度器逐个调度,可能调度了 99 个就因为资源不足卡死,而这 99 个还占着资源。

痛点 2:多租户公平性

共享 GPU 集群中,A 团队不能把资源全占了让 B 团队饿死,需要按团队/队列做配额和公平分配。

痛点 3:任务间关系

批任务有优先级、有抢占、有依赖,需要更复杂的策略。

Volcano 就是在 Kubernetes 之上补齐这些能力的批处理调度器。

5. 整体架构与数据流

┌─────────────────────────────────────────────────┐
│                  用户提交 Job                    │
└────────────────────┬────────────────────────────┘
                     ↓
┌─────────────────────────────────────────────────┐
│  Admission Webhook(校验、默认值注入)            │
└────────────────────┬────────────────────────────┘
                     ↓
┌─────────────────────────────────────────────────┐
│  Controller Manager                              │
│  Job → 拆分成 Pods + 创建 PodGroup               │
└────────────────────┬────────────────────────────┘
                     ↓
┌─────────────────────────────────────────────────┐
│  Scheduler(核心)                                │
│  Session → Actions → Plugins → 调度决策          │
└────────────────────┬────────────────────────────┘
                     ↓
                kubelet 拉起 Pod

核心数据流:Job → PodGroup → Pods

  • Job 是用户视角的任务
  • Controller 把 Job 转换成原生 Pods,并创建一个 PodGroup 资源把这组 Pod 关联起来
  • Scheduler 调度时以 PodGroup 为单位做决策

6. 三层抽象模型

理解 Volcano 调度器,记住这三层就够了:

抽象 角色 类比
Action 调度的"动作流程",决定先做什么后做什么 工作流的步骤
Plugin 调度的"决策依据",提供具体的策略函数 工作流中的判断规则
Session 一次调度周期的"上下文",存储集群快照,串起 Action 和 Plugin 工作流的执行实例

核心理解:

  • Action 是 做什么(enqueue / allocate / preempt / reclaim / backfill)
  • Plugin 是 怎么判断(gang / priority / drf / binpack 等)
  • Session 是 在什么环境下(集群快照)

第三部分:Volcano 核心实现原理

7. Session 机制

Session 是调度的基本单元。每隔约 1 秒(可配置),Volcano scheduler 开启一个 Session:

1. Open Session
   ├─ 从 Cache 拉取集群快照(Nodes、Pods、PodGroups、Queues)
   ├─ 加载启用的 Plugins,注册回调函数
   └─ 构造 Session 上下文

2. Execute Actions(按配置顺序)
   enqueue → allocate → preempt → reclaim → backfill
   每个 Action 内部调用 Plugin 的回调来做决策

3. Close Session
   └─ 提交 Bind 结果,清理状态

为什么要 Session 化?

如果直接读 etcd 实时状态,决策依据会不断变化,难以保证一致性。Session 用快照保证一次调度周期内看到的世界是稳定的。决策完成后才统一提交,避免中间态污染。

Snapshot 实现:

func (sc *SchedulerCache) Snapshot() *api.ClusterInfo {
    snapshot := &api.ClusterInfo{
        Nodes:  make(map[string]*api.NodeInfo),
        Jobs:   make(map[api.JobID]*api.JobInfo),
        Queues: make(map[api.QueueID]*api.QueueInfo),
    }
    sc.Mutex.Lock()
    defer sc.Mutex.Unlock()
    
    // 深拷贝当前 Cache 状态到 snapshot
    for _, value := range sc.Nodes {
        snapshot.Nodes[value.Name] = value.Clone()
    }
    return snapshot
}

OpenSession 时调用 Snapshot() 做深拷贝,整个调度过程操作的是这份快照副本,不会受外部 Cache 变化影响。

8. Gang Scheduling 实现

这是 Volcano 最有代表性的能力。

问题:分布式训练 4 个 worker,默认调度器先调度了 3 个、第 4 个资源不够卡住,前 3 个空跑占资源。

实现思路:

  1. Controller 把 Job 拆成 Pods 时,同时创建一个 PodGroup,并把每个 Pod 标记 annotations[scheduling.k8s.io/group-name] 指向这个 PodGroup
  2. 调度器在 allocate 阶段做"预占"而不是直接 bind:先把节点上的资源标记为某个 Pod 占用,但不真正下发到 kubelet
  3. 当 PodGroup 中预占成功的 Pod 数 ≥ minAvailable 时,gang plugin 的 JobReadyFn 返回 true,统一执行 bind
  4. 如果 Session 结束时仍未达到 minAvailable,所有预占回滚,资源释放给其他任务

关键代码逻辑:

// gang plugin 注册的 JobReady 回调
ssn.AddJobReadyFn(func(obj interface{}) bool {
    job := obj.(*api.JobInfo)
    occupied := job.ReadyTaskNum()  // 已预占成功的 Pod 数
    return occupied >= job.MinAvailable
})

// allocate action 中的判断
if !ssn.JobReady(job) {
    // 还没成组,继续预占但不 bind
    continue
}
// 成组了,统一 bind
ssn.Allocate(...)

9. Queue 与 DRF 公平调度

9.1 DRF 算法

DRF(Dominant Resource Fairness) 是多维资源公平算法的经典实现。

在只有 CPU 一种资源时,公平很简单 —— 按使用量排序。但有 CPU、GPU、内存多种资源时怎么定义"用得多"?

DRF 的答案:看每个用户最稀缺的那种资源占比(叫"主导份额")。

示例:集群有 100 CPU、10 GPU

  • A 用了 20 CPU + 1 GPU → CPU 占 20%,GPU 占 10%,主导份额是 20%
  • B 用了 10 CPU + 4 GPU → CPU 占 10%,GPU 占 40%,主导份额是 40%

下次调度时优先给主导份额低的(A),这样多种资源下也能保持公平。

Volcano 的 drf plugin 实现:

// drf plugin 注册的 JobOrder 回调(决定 Job 调度顺序)
ssn.AddJobOrderFn(func(l, r interface{}) int {
    lv := l.(*api.JobInfo)
    rv := r.(*api.JobInfo)
    // share = 主导份额 = max(used[i] / total[i])
    if lv.Share < rv.Share {
        return -1  // share 小的优先
    }
    return 1
})

9.2 Queue 队列

Queue 在 DRF 之上加了一层:每个 Queue 有 weight,资源按 weight 比例分配,配合 capability 设置上限。这样既有公平性又有租户边界。

10. 抢占与回收

概念 范围 触发条件
抢占(Preempt) 同队列内 高优先级任务无法调度时,挑选低优先级 victim 驱逐
回收(Reclaim) 跨队列 Queue A 资源使用低于 deserved、Queue B 高于 deserved 时,A 提交新任务可回收 B 超用部分

两者本质都是"释放资源给更应该获得的任务",区别在于范围。实现上都是 Action 阶段调用 plugin 的 Preemptable / Reclaimable 回调判断哪些 Pod 可被驱逐:

ssn.AddPreemptableFn("priority", func(preemptor, preemptees) []victim {
    // 只允许优先级低于 preemptor 的 Pod 成为 victim
    var victims []*api.TaskInfo
    for _, p := range preemptees {
        if p.Priority < preemptor.Priority {
            victims = append(victims, p)
        }
    }
    return victims
})

11. Action 与 Plugin 协作流程

一次完整的调度流程:

Session 开启,加载 plugins: [priority, gang, drf, predicates, nodeorder]
│
├─ Action: enqueue
│  └─ 调用各 plugin 的 JobEnqueueable,决定哪些 Job 进入待调度队列
│
├─ Action: allocate
│  ├─ 调用 JobOrderFn (drf):按主导份额排序 Job
│  ├─ 调用 TaskOrderFn (priority):按优先级排序 Pod
│  ├─ 调用 PredicateFn (predicates):过滤不满足约束的节点
│  ├─ 调用 NodeOrderFn (nodeorder, binpack):给可用节点打分排序
│  ├─ 选最优节点做预占
│  └─ 调用 JobReadyFn (gang):成组检查,决定是否 bind
│
├─ Action: preempt
│  └─ 调用 PreemptableFn 找 victim,驱逐低优 Pod
│
├─ Action: reclaim
│  └─ 调用 ReclaimableFn,跨队列回收超用资源
│
└─ Session 关闭,提交决策

插件化的好处:Action 是固定的流程骨架,Plugin 可按需启用。配置在 volcano-scheduler-configmap 里:

actions: "enqueue, allocate, preempt, reclaim, backfill"
tiers:
  - plugins:
      - name: priority
      - name: gang
      - name: conformance
  - plugins:
      - name: drf
      - name: predicates
      - name: nodeorder
      - name: binpack

12. 拓扑感知调度

针对 AI 训练,跨 NUMA 或跨 PCIe 域的通信会显著降低性能。Volcano 通过 NUMA-aware plugin 和 GPU 拓扑感知做优化:

调度时不仅看节点是否有 N 张 GPU,还看这 N 张 GPU 是否在同一个 NVLink group / 同一个 PCIe switch 下,优先选择拓扑紧凑的节点。这通过 vc-agent 在节点上采集拓扑信息、上报给调度器实现。


第四部分:并发问题深度分析

13. Volcano 调度的并发模型

核心结论:单个 scheduler 实例内不存在并发问题,但多实例场景和与外部系统交互时存在挑战。

Volcano scheduler 的核心调度循环本质上是单线程串行的:

for {
    ssn := OpenSession(cache)      // 拿快照
    for _, action := range actions {
        action.Execute(ssn)         // 串行执行
    }
    CloseSession(ssn)               // 提交结果
    time.Sleep(period)              // 默认 1 秒
}

在一个 Session 内,多个 PodGroup 的调度决策是顺序处理的,不是并发处理。具体来说:

  • allocate action 内部会遍历所有待调度的 Job/PodGroup,按 JobOrderFn(比如 DRF)排序后逐个尝试分配
  • 每分配一个 Pod,Session 内的资源视图会立即更新,下一个 PodGroup 看到的就是更新后的状态
  • 整个过程在单个 goroutine 中完成

14. 五种并发场景分析

场景 是否有并发问题 解法
单 Session 内多 PodGroup 串行处理 + 快照
Cache 写 vs Session 读 有但可控 深拷贝快照 + Mutex
Session 决策 vs K8s 实际 有时间差 Assume 机制 + 失败重试
多 Scheduler 副本 Leader Election
与 kube-scheduler 共存 API Server 乐观锁兜底

14.1 Cache 与 Session 的并发:读写分离

┌──────────────────────────────────────┐
│  Informer (watch K8s API)            │
│  ├─ Pod Add/Update/Delete            │   写
│  ├─ Node Add/Update/Delete           │  ─────►  Cache (加锁)
│  └─ PodGroup Add/Update/Delete       │
└──────────────────────────────────────┘
                                              ▲
                                              │ Snapshot (读)
                                              │
┌──────────────────────────────────────┐      │
│  Scheduler 主循环                     │ ─────┘
│  Snapshot → Session → Decisions      │
└──────────────────────────────────────┘

Informer 协程持续把 K8s 事件写入 Cache,Scheduler 主循环周期性从 Cache 读快照。两者通过 Cache 内部的 sync.Mutex 互斥。

14.2 Session 与 K8s 实际状态的不一致风险

Session 内的决策是基于快照的,但实际 bind 操作要写入 K8s API,两者之间存在时间差:

T0: OpenSession,快照显示 Node-A 有 8 GPU 空闲
T1: Session 内决定把 PodGroup-X 的 4 个 Pod 调度到 Node-A
T2: 此时另一个进程(比如手动 kubectl run)也往 Node-A 塞了 Pod
T3: Session 结束,bind 请求发到 kubelet
T4: kubelet 发现资源不够,调度失败

Volcano 的应对:

  1. 乐观假设 + 失败重试:bind 失败后,Pod 会回到待调度状态,下个 Session 重新决策
  2. Assume 机制:Session 内做出决策后,立即在 Cache 中"假设"这个 Pod 已经分配
  3. Gang 的预占回滚:Session 内未达到 minAvailable 的预占会在 Session 结束时回滚
func (sc *SchedulerCache) AddBindTask(taskInfo *api.TaskInfo) error {
    sc.Mutex.Lock()
    defer sc.Mutex.Unlock()
    
    // 在 Cache 中预占资源
    node := sc.Nodes[taskInfo.NodeName]
    node.AddTask(taskInfo)
    // 加入 bind 队列异步执行
    sc.BindFlowChannel <- taskInfo
    return nil
}

14.3 多 Scheduler 实例

Volcano 通过 Leader Election(K8s Lease 资源选主)保证只有 Leader 实例执行调度循环,其他副本待机。

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    Lock: lock,
    Callbacks: leaderelection.LeaderCallbacks{
        OnStartedLeading: func(ctx context.Context) {
            scheduler.Run(ctx.Done())  // 只有 Leader 才跑
        },
    },
})

代价:多副本不能水平扩展调度吞吐,这也是大规模集群下 Volcano 性能的一个限制点。

核心设计哲学:"Session 内绝对一致,Session 间允许偏差,偏差通过重试收敛" —— 典型的最终一致性思路。


第五部分:Kubernetes 调度框架对比

15. kube-scheduler 四阶段

K8s 1.19+ 默认调度器的标准模型:Filter、Score、Reserve、Bind(更细粒度还有 PreFilter、PostFilter、Permit、PreBind 等扩展点)。

Pod 进入调度队列
       ↓
┌─────────────────┐
│  Filter         │  过滤:哪些节点满足 Pod 要求?
│  (Predicate)    │  例如资源够不够、亲和性、污点容忍
└────────┬────────┘
         ↓
┌─────────────────┐
│  Score          │  打分:在可用节点中,哪个最合适?
│  (Priority)     │  例如负载均衡、镜像本地性、拓扑分布
└────────┬────────┘
         ↓
┌─────────────────┐
│  Reserve        │  预占:在 Cache 中标记节点资源已被占用
│                 │  避免下个 Pod 重复使用同一份资源
└────────┬────────┘
         ↓
┌─────────────────┐
│  Bind           │  绑定:写 Pod.spec.nodeName 到 etcd
│                 │  通知 kubelet 启动
└─────────────────┘

这是单 Pod 视角的流程:每次处理一个 Pod,走完四阶段就提交。

16. Volcano 与四阶段的对应关系

kube-scheduler 阶段 Volcano 对应实现
Filter PredicateFn (在 allocate Action 内调用)
Score NodeOrderFn / BatchNodeOrderFn
Reserve AddBindTask (Cache 预占)
Bind Bind Action / 异步 bind 队列
(无对应) JobReadyFn (gang 检查,Volcano 独有)

Volcano 一次 allocate 的内部流程:

for 每个待调度的 Job (按 JobOrderFn 排序,例如 DRF):
    for 每个 Task (Pod,按 TaskOrderFn 排序):
        ┌─────────────────────────────────────┐
        │ Step 1: PredicateFn  ← 对应 Filter  │
        │ 遍历所有节点,过滤不满足约束的       │
        └─────────────────────────────────────┘
                    ↓
        ┌─────────────────────────────────────┐
        │ Step 2: NodeOrderFn  ← 对应 Score   │
        │ 给候选节点打分,选最高分的           │
        └─────────────────────────────────────┘
                    ↓
        ┌─────────────────────────────────────┐
        │ Step 3: Allocate  ← 对应 Reserve    │
        │ 在 Cache 中预占资源(不提交 K8s)     │
        └─────────────────────────────────────┘
                    ↓
        ┌─────────────────────────────────────┐
        │ Step 4: JobReadyFn  ← Volcano 独有  │
        │ 检查 Job 是否成组就绪 (Gang)         │
        │   未就绪:继续预占下一个 Task        │
        │   已就绪:进入 Bind                  │
        └─────────────────────────────────────┘
                    ↓
        ┌─────────────────────────────────────┐
        │ Step 5: Bind  ← 对应 Bind           │
        │ 异步写 Pod.spec.nodeName 到 etcd     │
        └─────────────────────────────────────┘

17. 结构性差异

17.1 调度循环组织方式

kube-scheduler:Pod-by-Pod 循环

while true:
    pod = queue.Pop()
    Filter(pod) → Score(pod) → Reserve(pod) → Bind(pod)
    // 一个 Pod 走完所有阶段,再处理下一个

每个 Pod 是独立调度单元,阶段是 Pod 内部串行的。

Volcano:Session + Action 循环

while true:
    session = OpenSession(snapshot)
    
    enqueue_action(session)    // 处理所有 Job 的入队
    allocate_action(session)   // 处理所有 Pod 的 Filter+Score+Reserve
    preempt_action(session)    // 处理所有抢占
    reclaim_action(session)    // 处理所有跨队列回收
    backfill_action(session)   // 处理所有回填
    
    CloseSession(session)      // 统一提交 bind

Action 是批量处理一类操作,阶段是横向跨 Pod 的,不是纵向单 Pod 的。

17.2 这种组织方式带来两个能力

  1. 全局视角决策 —— 比如 DRF 公平算法,需要看到所有 Job 的资源占用才能排序
  2. Job 级原子操作 —— Gang scheduling 需要"要么全调度要么不调度",这只能在 Job 维度做

17.3 完整对照速查表

维度 kube-scheduler Volcano
调度单元 Pod Job/PodGroup
调度循环 Pod-by-Pod 顺序 Session 内批量 Action
Filter Filter 插件接口 PredicateFn 回调
Score Score 插件接口 NodeOrderFn / BatchNodeOrderFn
Reserve 立即在 Cache 预占 Allocate 预占(Job 级别可延迟提交)
Bind 单 Pod 异步 bind 批量异步 bind
全局排序 不支持(Pod 间独立) JobOrderFn、QueueOrderFn
成组就绪 不支持 JobReadyFn (Gang)
抢占 PostFilter 阶段 独立的 preempt action
跨队列回收 不支持 独立的 reclaim action

第六部分:GPU 设备分配与重复分配问题

18. GPU ID 到底是谁分配的

核心认知:GPU ID 实际是 kubelet 分配的,不是调度器,也不是 device-plugin 主动选的。

完整链路:

调度器选节点(只看数量)→ kubelet 选具体设备 ID → device-plugin 按 ID 构造容器视图

18.1 调度器看到的 GPU

调度器对 nvidia.com/gpu 的处理跟对 cpumemory 完全一样 —— 就是个数字

# Node Status 中的 GPU 信息
status:
  allocatable:
    nvidia.com/gpu: "8"     # 总数 8
  capacity:
    nvidia.com/gpu: "8"

调度器的工作只有:

  1. 看节点 allocatable 还剩多少 GPU
  2. 累加节点上已有 Pod 的 requests.nvidia.com/gpu
  3. 判断"剩余 ≥ Pod 申请数"是否成立
  4. 选定节点后,写 pod.spec.nodeName

调度器完全不参与"选哪几张具体的卡"

18.2 完整的分配链路

Pod 被调度到 Node
       ↓
kubelet 收到 Pod 创建事件
       ↓
┌─────────────────────────────────────────────────┐
│  kubelet 内部组件: DevicePluginManager           │
│                                                  │
│  1. 解析 Pod.spec.containers[].resources.limits  │
│     发现需要 nvidia.com/gpu: 2                   │
│                                                  │
│  2. 查询自己维护的设备池:                        │
│     allDevices["nvidia.com/gpu"] = {            │
│       "GPU-uuid-0": Healthy,  // 空闲           │
│       "GPU-uuid-1": Healthy,  // 已分配给Pod-X  │
│       "GPU-uuid-2": Healthy,  // 空闲           │
│       ...                                        │
│     }                                            │
│     allocatedDevices["GPU-uuid-1"] = "Pod-X"    │
│                                                  │
│  3. ★ 关键步骤:从空闲设备中挑选 N 个            │
│     devicesToAllocate = ["GPU-uuid-0", "GPU-uuid-2"] │
│                                                  │
│  4. 标记这些设备为已分配                         │
│     podDevices.insert(podUID, devicesToAllocate)│
└────────────────────┬────────────────────────────┘
                     ↓ gRPC 调用 Allocate
┌─────────────────────────────────────────────────┐
│  nvidia-device-plugin                            │
│                                                  │
│  收到: AllocateRequest{                          │
│    DevicesIDs: ["GPU-uuid-0", "GPU-uuid-2"]      │
│  }                                               │
│  ↑ 注意:ID 是 kubelet 选好传过来的,不是 plugin 选的 │
│                                                  │
│  Plugin 的工作只是构造 AllocateResponse:         │
│    - 设置环境变量 NVIDIA_VISIBLE_DEVICES         │
│    - 挂载 /dev/nvidia0, /dev/nvidia2             │
│    - 配置驱动库                                  │
└────────────────────┬────────────────────────────┘
                     ↓ 返回 Response
            kubelet 用响应启动容器

关键认知翻转:很多人以为是 device-plugin 选的卡,实际上 plugin 是被动的 —— kubelet 把"要哪几张"算好了通过 DevicesIDs 字段告诉 plugin,plugin 只负责告诉 kubelet"这几张卡怎么暴露给容器"。

19. kubelet 的设备分配实现

具体看 kubelet 源码(pkg/kubelet/cm/devicemanager/manager.go)中的核心逻辑:

// 简化后的 devicesToAllocate 逻辑
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int) (sets.String, error) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    
    // 1. 看这个 Pod 之前已经分配过的(重启场景复用)
    devices := m.podDevices.containerDevices(podUID, contName, resource)
    if devices.Len() == required {
        return nil, nil  // 已分配
    }
    
    // 2. 计算还需要多少
    needed := required - devices.Len()
    
    // 3. 从健康设备池中减去已分配的,得到可用设备
    available := m.healthyDevices[resource].Difference(m.allocatedDevices[resource])
    
    if available.Len() < needed {
        return nil, fmt.Errorf("not enough devices")
    }
    
    // 4. ★ 从可用设备中选 needed 个
    allocated := available.UnsortedList()[:needed]
    
    // 5. ★ 立即标记为已分配
    for _, device := range allocated {
        m.allocatedDevices[resource].Insert(device)
        m.podDevices.insert(podUID, contName, resource, device, ...)
    }
    
    return sets.NewString(allocated...), nil
}

理论上无懈可击的设计:单 kubelet 进程,加锁,状态一致。

20. 重复分配问题的真正发生点

既然 kubelet 内部有锁,问题只可能出在状态丢失或不一致上。

20.1 场景一:kubelet 重启导致状态不一致

kubelet 通过 checkpoint 文件持久化分配状态:/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint

T0: kubelet 运行中,podDevices: { Pod-A: [GPU-0, GPU-1] }
T1: kubelet 进程崩溃
T2: kubelet 重启,从 checkpoint 恢复
T3: ★ 关键空窗期:device-plugin 还没重新注册
T4: 此时如果有新 Pod 调度过来,kubelet 看到 healthyDevices 是空的,
    新 Pod 阻塞等待
T5: device-plugin 重新注册,上报 GPU 列表
T6: kubelet 用 checkpoint 中的记录 + plugin 上报的设备重建状态

T6 这一步如果有 bug 就可能出现状态不一致。

20.2 场景二:Pod 删除时序问题(最常见)

T0: Pod-A 占用 GPU-0
T1: Pod-A 被删除,进入 Terminating
T2: kubelet 发起容器停止,但容器还在运行(terminationGracePeriodSeconds)
T3: ★ kubelet 内部状态:podDevices 中 Pod-A 还在,GPU-0 仍标记 allocated
T4: 容器最终停止
T5: kubelet 异步清理 podDevices,移除 Pod-A 的记录
T6: ★ allocatedDevices 中 GPU-0 被标记为可用

如果在 T2~T6 之间 kubelet 释放 GPU ID 早于容器实际释放 GPU,新 Pod 就会被分配同一张卡而老进程还没退出。

这是最容易出问题的场景——账面上没有重复分配(kubelet 觉得 GPU-0 已释放可分配),但物理上重复使用(老进程还在跑)。

20.3 场景三:device-plugin 重启导致状态丢失

并发创建场景下节点压力大,plugin 容易 OOM。重启时如果:

  • plugin 重启时间过长,kubelet 可能已把节点 GPU 资源标记为 0
  • 新 plugin 上报的设备 ID 和老 plugin 不一致
  • 早期版本(< 0.12)的 nvidia-device-plugin 在重启后状态恢复有 bug

20.4 场景四:plugin 自身实现错误

// 错误的 plugin 实现
func (p *Plugin) Allocate(ctx, req *AllocateRequest) (*AllocateResponse, error) {
    // BUG: 不看 req.DevicesIDs,自己随便选
    response := &ContainerAllocateResponse{
        Envs: map[string]string{
            "NVIDIA_VISIBLE_DEVICES": "0",  // 永远返回 0 号卡!
        },
    }
}

这时 kubelet 账上记的是 GPU-uuid-0,实际容器拿到的却是 GPU-0(物理 0 号卡),账面和实际错位。

20.5 场景五:GPU 共享方案的混乱

如果用了 vGPU、MPS、Time-Slicing 等共享方案,把一张物理卡当多份分配,"重复分配"反而是预期行为。

21. 排查方法

21.1 第一步:确认是调度层超分还是 device-plugin 冲突

# 出问题时立即抓现场
NODE=<问题节点>
kubectl get pods -A --field-selector spec.nodeName=$NODE -o json \
  | jq -r '.items[] | "\(.metadata.namespace)/\(.metadata.name): \(.spec.containers[].resources.requests."nvidia.com/gpu" // "0")"'

# 累加 GPU 请求
kubectl get pods -A --field-selector spec.nodeName=$NODE -o json \
  | jq '[.items[].spec.containers[].resources.requests."nvidia.com/gpu" // "0" | tonumber] | add'
  • 总和等于 8 但物理冲突 → device-plugin 问题(最可能)
  • 总和 > 8 → 调度器层面问题

21.2 第二步:检查 device-plugin 状态

# 看 plugin 日志
kubectl logs -n kube-system <nvidia-device-plugin-pod>

# 看 checkpoint
cat /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint | jq

# 看每个容器实际拿到的卡
for pod in $(kubectl get pods -A --field-selector spec.nodeName=$NODE -o name); do
  echo "=== $pod ==="
  kubectl exec $pod -- printenv NVIDIA_VISIBLE_DEVICES 2>/dev/null
done

21.3 第三步:确认物理冲突

# 查看哪些进程在用同一张卡
nvidia-smi --query-compute-apps=pid,process_name,used_memory --format=csv
ps -ef | grep <pid>  # 确认进程归属哪个容器
crictl ps -a | grep <container-id>  # 看容器状态

21.4 解决方案

  1. 升级 NVIDIA device-plugin 到最新稳定版
  2. 使用 HAMi 等方案,把 GPU 决策上提到调度器层
  3. 避免 Pod 频繁重建,设置合理的 terminationGracePeriodSeconds
  4. 容器启动前的健康检查,主动失败而不是继续运行

第七部分:Device Plugin 原理与实现

22. Device Plugin 概述

Kubernetes 本身只认识 CPU 和 Memory。Device Plugin 框架通过 gRPC 标准接口,让第三方设备厂商把自己的设备注册成 K8s 的 Extended Resource,让 Pod 通过 requests.example.com/gpu: 1 这样的方式申请。

┌─────────────────────────────────────────────────┐
│  用户提交 Pod                                    │
│  resources.requests:                             │
│    example.com/mydevice: 2                       │
└────────────────────┬────────────────────────────┘
                     ↓
┌─────────────────────────────────────────────────┐
│  kube-scheduler                                  │
│  根据 Node.Status.Allocatable 找有资源的节点    │
└────────────────────┬────────────────────────────┘
                     ↓
┌─────────────────────────────────────────────────┐
│  kubelet                                         │
│  ↓ gRPC 调用                                     │
└────────────────────┬────────────────────────────┘
                     ↓
┌─────────────────────────────────────────────────┐
│  Device Plugin (DaemonSet)                       │
│  - ListAndWatch: 上报设备列表                    │
│  - Allocate: 分配设备给 Pod                      │
└─────────────────────────────────────────────────┘

23. 核心机制

23.1 注册流程

1. Plugin 在 /var/lib/kubelet/device-plugins/ 下创建自己的 socket
   例如: /var/lib/kubelet/device-plugins/mydevice.sock

2. Plugin 在自己的 socket 上启动 gRPC server,实现 DevicePlugin 接口

3. Plugin 通过 kubelet 的注册 socket(kubelet.sock)发起注册:
   Register(RegisterRequest{
       Version:      "v1beta1",
       Endpoint:     "mydevice.sock",
       ResourceName: "example.com/mydevice",
   })

4. kubelet 收到注册后,连接 Plugin 的 socket,调用 ListAndWatch

为什么用 Unix Socket? Plugin 是以 DaemonSet 形式运行在节点上,和 kubelet 在同一台机器,UDS 既高效又安全(不暴露网络端口)。

23.2 核心 gRPC 接口

type DevicePluginServer interface {
    GetDevicePluginOptions(ctx, Empty) (*DevicePluginOptions, error)
    ListAndWatch(*Empty, DevicePlugin_ListAndWatchServer) error
    Allocate(ctx, *AllocateRequest) (*AllocateResponse, error)
    PreStartContainer(ctx, *PreStartContainerRequest) (*PreStartContainerResponse, error)
    GetPreferredAllocation(ctx, *PreferredAllocationRequest) (*PreferredAllocationResponse, error)
}

23.3 ListAndWatch:设备发现与健康上报

这是个流式 RPC —— Plugin 在这个连接上持续推送设备列表:

func (m *MyDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
    // 首次推送当前设备列表
    s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devices})
    
    // 后续监听设备变化
    for {
        select {
        case <-m.healthCheck:
            s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devices})
        case <-m.stop:
            return nil
        }
    }
}

23.4 Allocate:分配设备的关键时刻

func (m *MyDevicePlugin) Allocate(ctx context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
    responses := pluginapi.AllocateResponse{}
    
    for _, containerReq := range req.ContainerRequests {
        // containerReq.DevicesIDs 是 kubelet 选好的设备 ID 列表
        // Plugin 的工作是告诉 kubelet 怎么把这些设备暴露给容器
        
        response := pluginapi.ContainerAllocateResponse{
            Envs: map[string]string{
                "MYDEVICE_VISIBLE_IDS": strings.Join(containerReq.DevicesIDs, ","),
            },
            Devices: []*pluginapi.DeviceSpec{
                {
                    HostPath:      "/dev/mydevice0",
                    ContainerPath: "/dev/mydevice0",
                    Permissions:   "rw",
                },
            },
        }
        responses.ContainerResponses = append(responses.ContainerResponses, &response)
    }
    return &responses, nil
}

24. 手搓 Device Plugin Demo

实现一个虚拟设备的 Device Plugin:模拟节点上有 4 个名叫 example.com/foo 的设备。

24.1 核心代码

// plugin.go
package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "os"
    "path"
    "sync"
    "time"

    "google.golang.org/grpc"
    pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)

const (
    resourceName = "example.com/foo"
    serverSock   = pluginapi.DevicePluginPath + "foo.sock"
    deviceCount  = 4
)

type FooDevicePlugin struct {
    devices []*pluginapi.Device
    server  *grpc.Server
    stop    chan struct{}
    health  chan *pluginapi.Device
    
    // 关键:跟踪设备分配状态,并发安全
    mu        sync.Mutex
    allocated map[string]string // deviceID -> podUID
}

func NewFooDevicePlugin() *FooDevicePlugin {
    devs := make([]*pluginapi.Device, deviceCount)
    for i := 0; i < deviceCount; i++ {
        devs[i] = &pluginapi.Device{
            ID:     fmt.Sprintf("foo-%d", i),
            Health: pluginapi.Healthy,
        }
    }
    return &FooDevicePlugin{
        devices:   devs,
        stop:      make(chan struct{}),
        health:    make(chan *pluginapi.Device),
        allocated: make(map[string]string),
    }
}

// Start 启动 gRPC 服务并注册到 kubelet
func (p *FooDevicePlugin) Start() error {
    os.Remove(serverSock)
    
    sock, err := net.Listen("unix", serverSock)
    if err != nil {
        return err
    }
    
    p.server = grpc.NewServer()
    pluginapi.RegisterDevicePluginServer(p.server, p)
    
    go func() {
        if err := p.server.Serve(sock); err != nil {
            log.Printf("gRPC serve error: %v", err)
        }
    }()
    
    time.Sleep(1 * time.Second)
    return p.register()
}

func (p *FooDevicePlugin) register() error {
    conn, err := grpc.Dial(
        pluginapi.KubeletSocket,
        grpc.WithInsecure(),
        grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
            return net.DialTimeout("unix", addr, timeout)
        }),
    )
    if err != nil {
        return err
    }
    defer conn.Close()
    
    client := pluginapi.NewRegistrationClient(conn)
    req := &pluginapi.RegisterRequest{
        Version:      pluginapi.Version,
        Endpoint:     path.Base(serverSock),
        ResourceName: resourceName,
    }
    
    _, err = client.Register(context.Background(), req)
    return err
}

func (p *FooDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
    if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices}); err != nil {
        return err
    }
    
    for {
        select {
        case <-p.stop:
            return nil
        case d := <-p.health:
            for i, dev := range p.devices {
                if dev.ID == d.ID {
                    p.devices[i].Health = d.Health
                }
            }
            s.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices})
        }
    }
}

func (p *FooDevicePlugin) Allocate(ctx context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
    // 关键:加锁防止并发分配冲突
    p.mu.Lock()
    defer p.mu.Unlock()
    
    responses := pluginapi.AllocateResponse{}
    
    for _, containerReq := range req.ContainerRequests {
        for _, id := range containerReq.DevicesIDs {
            if existingPod, ok := p.allocated[id]; ok {
                log.Printf("WARNING: device %s already allocated to %s", id, existingPod)
            }
            p.allocated[id] = "current-pod"
        }
        
        response := pluginapi.ContainerAllocateResponse{
            Envs: map[string]string{
                "FOO_VISIBLE_DEVICES": strings.Join(containerReq.DevicesIDs, ","),
            },
        }
        responses.ContainerResponses = append(responses.ContainerResponses, &response)
    }
    
    return &responses, nil
}

func (p *FooDevicePlugin) GetDevicePluginOptions(ctx context.Context, e *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
    return &pluginapi.DevicePluginOptions{}, nil
}

func (p *FooDevicePlugin) PreStartContainer(ctx context.Context, req *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
    return &pluginapi.PreStartContainerResponse{}, nil
}

func (p *FooDevicePlugin) GetPreferredAllocation(ctx context.Context, req *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
    return &pluginapi.PreferredAllocationResponse{}, nil
}

24.2 部署 DaemonSet

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: foo-device-plugin
  namespace: kube-system
spec:
  selector:
    matchLabels:
      name: foo-device-plugin
  template:
    metadata:
      labels:
        name: foo-device-plugin
    spec:
      tolerations:
        - operator: Exists
      priorityClassName: system-node-critical
      containers:
        - name: foo-device-plugin
          image: your-registry/foo-device-plugin:v0.1
          securityContext:
            privileged: true
          volumeMounts:
            - name: device-plugin
              mountPath: /var/lib/kubelet/device-plugins
      volumes:
        - name: device-plugin
          hostPath:
            path: /var/lib/kubelet/device-plugins

24.3 关键设计要点

  1. Allocate 必须并发安全 —— mu.Lock() 是底线
  2. 状态持久化 —— 生产级实现要把分配状态写到 checkpoint 文件
  3. 健康检查 —— 持续检测设备健康状态
  4. ListAndWatch 必须保持长连接 —— 连接断了 kubelet 会认为 Plugin 不可用
  5. 与调度器的协作边界 —— Plugin 只在 Allocate 时才知道是哪个 Pod,这是 device-plugin 框架的设计缺陷

第八部分:手搓调度器 Demo

25. 调度器的本质

抛开所有框架,K8s 调度器的工作可以简化成一个循环:

while true:
    pod = 找一个 pending 状态的 Pod
    node = 决定把它放到哪个节点
    binding = 写 Pod.spec.nodeName = node
    POST /api/v1/namespaces/{ns}/pods/{name}/binding

只要能创建 Binding 对象,你就是一个调度器。

26. 渐进式实现演进

阶段 代码量 解决了什么 还差什么
Demo 1 50 行 能 bind Pod 一切
Demo 2 200 行 Informer + Filter + Score 超分、并发问题
Demo 3 +50 行 Cache 预占防超分 设备 ID 分配
Demo 4 +200 行 GPU ID 调度器分配 Gang、抢占、HA

26.1 Demo 1:50 行的最小调度器

package main

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "time"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
)

const schedulerName = "demo-scheduler"

func main() {
    config, _ := rest.InClusterConfig()
    client, _ := kubernetes.NewForConfig(config)

    for {
        pods, _ := client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
            FieldSelector: "spec.nodeName=,status.phase=Pending",
        })

        for _, pod := range pods.Items {
            if pod.Spec.SchedulerName != schedulerName {
                continue
            }

            nodes, _ := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
            if len(nodes.Items) == 0 {
                continue
            }

            target := nodes.Items[rand.Intn(len(nodes.Items))]

            binding := &corev1.Binding{
                ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
                Target: corev1.ObjectReference{
                    Kind: "Node",
                    Name: target.Name,
                },
            }

            err := client.CoreV1().Pods(pod.Namespace).Bind(context.TODO(), binding, metav1.CreateOptions{})
            if err != nil {
                log.Printf("Bind failed: %v", err)
                continue
            }
            fmt.Printf("Scheduled %s/%s to %s\n", pod.Namespace, pod.Name, target.Name)
        }

        time.Sleep(2 * time.Second)
    }
}

26.2 Demo 2:用 Informer 改造

生产级调度器都用 Informer 监听资源变化,核心改进:

type Scheduler struct {
    client      kubernetes.Interface
    podLister   cache.Indexer
    nodeLister  cache.Indexer
    queue       workqueue.RateLimitingInterface
}

// Pod 事件触发入队
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        pod := obj.(*corev1.Pod)
        if s.needSchedule(pod) {
            key, _ := cache.MetaNamespaceKeyFunc(obj)
            s.queue.Add(key)
        }
    },
})

func (s *Scheduler) scheduleOne(key string) error {
    pod := getPod(key)

    // Filter
    candidates := s.filter(pod)
    if len(candidates) == 0 {
        return fmt.Errorf("no fit node for %s", pod.Name)
    }

    // Score
    best := s.score(pod, candidates)

    // Bind
    return s.bind(pod, best)
}

// Filter: 过滤不满足资源要求的节点
func (s *Scheduler) fitsResources(pod *corev1.Pod, node *corev1.Node) bool {
    var cpuReq, memReq int64
    for _, c := range pod.Spec.Containers {
        cpuReq += c.Resources.Requests.Cpu().MilliValue()
        memReq += c.Resources.Requests.Memory().Value()
    }

    var cpuUsed, memUsed int64
    pods := s.podsOnNode(node.Name)
    for _, p := range pods {
        for _, c := range p.Spec.Containers {
            cpuUsed += c.Resources.Requests.Cpu().MilliValue()
            memUsed += c.Resources.Requests.Memory().Value()
        }
    }

    cpuAlloc := node.Status.Allocatable.Cpu().MilliValue()
    memAlloc := node.Status.Allocatable.Memory().Value()

    return cpuUsed+cpuReq <= cpuAlloc && memUsed+memReq <= memAlloc
}

这版的提升:Informer 事件驱动、WorkQueue 限速重试、真实 Filter/Score、解耦。

还存在的问题:Cache 与决策不一致,刚 bind 完的 Pod 可能还没出现在缓存里,下个 Pod 算资源时会"看不到"它,导致超分。

26.3 Demo 3:加入 Cache 预占解决超分

type Cache struct {
    mu          sync.RWMutex
    nodeAlloc   map[string]*ResourceUsage
    assumedPods map[string]string
}

// Assume: 决策完成立即更新本地视图
func (c *Cache) Assume(pod *corev1.Pod, nodeName string) {
    c.mu.Lock()
    defer c.mu.Unlock()

    key := podKey(pod)
    if _, exists := c.assumedPods[key]; exists {
        return
    }
    c.assumedPods[key] = nodeName

    if c.nodeAlloc[nodeName] == nil {
        c.nodeAlloc[nodeName] = &ResourceUsage{}
    }
    for _, container := range pod.Spec.Containers {
        c.nodeAlloc[nodeName].CPU += container.Resources.Requests.Cpu().MilliValue()
        c.nodeAlloc[nodeName].Memory += container.Resources.Requests.Memory().Value()
    }
}

// Forget: bind 失败时回滚
func (c *Cache) Forget(pod *corev1.Pod, nodeName string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.assumedPods, podKey(pod))
    if usage := c.nodeAlloc[nodeName]; usage != nil {
        for _, container := range pod.Spec.Containers {
            usage.CPU -= container.Resources.Requests.Cpu().MilliValue()
            usage.Memory -= container.Resources.Requests.Memory().Value()
        }
    }
}

调度流程:

func (s *Scheduler) scheduleOne(key string) error {
    pod := getPod(key)

    candidates := s.filter(pod)
    best := s.score(pod, candidates)

    s.cache.Assume(pod, best.Name)           // 立即预占
    
    if err := s.bind(pod, best); err != nil {
        s.cache.Forget(pod, best.Name)       // 失败回滚
        return err
    }
    
    return nil
}

27. 支持 GPU ID 分配的调度器

回到核心目标 —— 让调度器直接决定 GPU ID,而不是只决定数量。

27.1 整体设计

┌─────────────────────────────────────────────────┐
│  Demo Scheduler                                  │
│                                                  │
│  Filter: 节点剩余 GPU 数 ≥ 请求数?                │
│  Score: 选剩 GPU 多的节点                         │
│  Allocate: ★ 从节点空闲 GPU ID 中选具体 ID       │
│  Bind: ★ 把选好的 ID 写到 Pod annotation         │
└────────────────────┬────────────────────────────┘
                     ↓
┌─────────────────────────────────────────────────┐
│  Modified Device Plugin                          │
│  Allocate(): 读 Pod annotation 中调度器指定的 ID │
│              不再自己选                          │
└─────────────────────────────────────────────────┘

27.2 节点 GPU 状态管理

type NodeGPUState struct {
    Total     []string             // 所有 GPU ID
    Allocated map[string]string    // GPU ID -> Pod Key
}

type GPUCache struct {
    mu    sync.RWMutex
    nodes map[string]*NodeGPUState
}

// 分配(核心方法,加锁防并发)
func (c *GPUCache) Allocate(nodeName string, podKey string, count int) ([]string, error) {
    c.mu.Lock()
    defer c.mu.Unlock()

    state := c.nodes[nodeName]
    if state == nil {
        return nil, fmt.Errorf("node %s has no GPU info", nodeName)
    }

    var free []string
    for _, id := range state.Total {
        if _, used := state.Allocated[id]; !used {
            free = append(free, id)
        }
    }

    if len(free) < count {
        return nil, fmt.Errorf("not enough GPU on %s: need %d, have %d", nodeName, count, len(free))
    }

    selected := free[:count]
    for _, id := range selected {
        state.Allocated[id] = podKey
    }
    return selected, nil
}

func (c *GPUCache) Release(nodeName string, podKey string) {
    c.mu.Lock()
    defer c.mu.Unlock()

    state := c.nodes[nodeName]
    if state == nil {
        return
    }
    for id, owner := range state.Allocated {
        if owner == podKey {
            delete(state.Allocated, id)
        }
    }
}

27.3 调度流程改造

func (s *Scheduler) scheduleOne(key string) error {
    pod := getPod(key)

    gpuRequest := parseGPURequest(pod)

    candidates := s.filter(pod, gpuRequest)
    if len(candidates) == 0 {
        return fmt.Errorf("no fit node")
    }

    best := s.score(pod, candidates)

    // ★ 关键:调度器直接选具体的 GPU ID
    var allocatedGPUs []string
    if gpuRequest > 0 {
        ids, err := s.gpuCache.Allocate(best.Name, podKey(pod), gpuRequest)
        if err != nil {
            return err
        }
        allocatedGPUs = ids
    }

    return s.bindWithGPU(pod, best, allocatedGPUs)
}

func (s *Scheduler) bindWithGPU(pod *corev1.Pod, node *corev1.Node, gpuIDs []string) error {
    // 1. 先 patch annotation(让 device-plugin 能读到)
    if len(gpuIDs) > 0 {
        patch := fmt.Sprintf(`{"metadata":{"annotations":{"demo-scheduler/allocated-gpus":"%s"}}}`,
            strings.Join(gpuIDs, ","))
        _, err := s.client.CoreV1().Pods(pod.Namespace).Patch(
            context.TODO(), pod.Name, types.StrategicMergePatchType,
            []byte(patch), metav1.PatchOptions{})
        if err != nil {
            s.gpuCache.Release(node.Name, podKey(pod))
            return err
        }
    }

    // 2. 再 bind
    binding := &corev1.Binding{
        ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
        Target: corev1.ObjectReference{Kind: "Node", Name: node.Name},
    }
    return s.client.CoreV1().Pods(pod.Namespace).Bind(context.TODO(), binding, metav1.CreateOptions{})
}

27.4 配套的 Device Plugin 改造

func (p *Plugin) Allocate(ctx context.Context, req *AllocateRequest) (*AllocateResponse, error) {
    responses := AllocateResponse{}

    for _, containerReq := range req.ContainerRequests {
        // 尝试从 Pod annotation 读调度器指定的 ID
        gpuIDs := p.lookupAllocatedGPUsFromPodAnnotation(containerReq)
        if len(gpuIDs) == 0 {
            gpuIDs = containerReq.DevicesIDs
        }

        response := &ContainerAllocateResponse{
            Envs: map[string]string{
                "NVIDIA_VISIBLE_DEVICES": strings.Join(gpuIDs, ","),
            },
        }
        responses.ContainerResponses = append(responses.ContainerResponses, response)
    }
    return &responses, nil
}

27.5 演进方向

方向 说明
Leader Election 多副本时用 K8s Lease 选主,防止并发决策
抢占 找不到节点时驱逐低优先级 Pod 腾资源
Gang Scheduling 支持 PodGroup,所有 Pod 同时就绪才 bind
队列与公平调度 按 Queue 做配额、按 DRF 排序 Job
拓扑感知 选 GPU 时优先同一 NVLink group / NUMA
性能优化 Filter 阶段并发跑,Score 也是
Scheduler Framework 集成 基于 k8s.io/kubernetes/pkg/scheduler/framework 写插件

核心认知:

  1. K8s 调度器本质就是一个控制循环 —— 找 pending Pod,选节点,写 binding
  2. 复杂的工程问题(性能、一致性、扩展性)才是大头
  3. 想解决 GPU 重复分配,根本路径是把决策上提到调度器层

第九部分:HAMi 异构算力虚拟化

28. HAMi 简介

HAMi(Heterogeneous AI Computing Virtualization Middleware,原名 k8s-vGPU-scheduler)是一个 CNCF Sandbox 项目,专注于在 Kubernetes 上做异构 AI 算力的虚拟化和精细调度

HAMi 解决的核心问题:

  1. GPU 整卡分配粒度太粗 —— 利用率普遍只有 20-40%
  2. 调度决策与设备分配脱节 —— 重复分配问题
  3. 异构设备管理混乱 —— 多家芯片厂商各一套
  4. 缺乏使用约束 —— 申请 1 卡的 Pod 实际能看到整张卡

29. HAMi 核心能力

29.1 GPU 细粒度切分

一张物理 GPU 可以被切成多份给多个 Pod 用,按显存算力两个维度独立切分:

apiVersion: v1
kind: Pod
metadata:
  name: gpu-pod
spec:
  containers:
  - name: trainer
    image: pytorch:latest
    resources:
      limits:
        nvidia.com/gpu: 1           # 用 1 张卡(可以是切片)
        nvidia.com/gpumem: 3000     # 限制 3000 MB 显存
        nvidia.com/gpucores: 30     # 限制 30% 算力
  • gpumem: 显存限制(MB)
  • gpucores: 算力百分比
  • 一张 24GB 的 A10 可以同时跑 8 个 gpumem: 3000 的 Pod

这不是逻辑切分,而是真正的硬隔离(软件层硬隔离,后面会讲实现机制)。

29.2 拓扑感知调度

节点 GPU 拓扑:
GPU-0 ─ NVLink ─ GPU-1     ┐
GPU-2 ─ NVLink ─ GPU-3     ├─ Group A (PCIe Switch 1)
                            ┘
GPU-4 ─ NVLink ─ GPU-5     ┐
GPU-6 ─ NVLink ─ GPU-7     ├─ Group B (PCIe Switch 2)
                            ┘

需要 4 张卡的训练任务,HAMi 会尝试给同一个 Group 内的 4 张(避免跨 PCIe 通信)。

29.3 多种异构设备统一管理

支持的设备:NVIDIA GPU、华为昇腾(910B/910/310)、寒武纪 MLU、海光 DCU、Intel GPU、天数智芯、沐曦、摩尔线程等。

所有设备走统一的资源接口和调度逻辑

29.4 节点选择策略

  • binpack:尽量装满一个节点再用下一个,适合提高单节点利用率
  • spread:均匀分布到多个节点,适合避免单点故障
  • 拓扑亲和:同一 Job 的 Pod 尽量调度到拓扑近的节点

30. HAMi 架构与实现

┌────────────────────────────────────────────────────┐
│                  用户提交 Pod                       │
│  resources.limits:                                 │
│    nvidia.com/gpu: 1                               │
│    nvidia.com/gpumem: 3000                         │
└──────────────────────┬─────────────────────────────┘
                       ↓
┌────────────────────────────────────────────────────┐
│  HAMi Scheduler Extender / Webhook                 │
│  ┌──────────────────────────────────────────────┐  │
│  │ 1. Webhook 拦截 Pod,识别 HAMi 资源请求       │  │
│  │ 2. 修改 schedulerName 走 HAMi 调度          │  │
│  │ 3. Extender 与 kube-scheduler 协作做决策    │  │
│  │ 4. ★ 选具体 GPU ID,写入 Pod annotation     │  │
│  └──────────────────────────────────────────────┘  │
└──────────────────────┬─────────────────────────────┘
                       ↓
┌────────────────────────────────────────────────────┐
│  HAMi Device Plugin (DaemonSet)                    │
│  ┌──────────────────────────────────────────────┐  │
│  │ 1. 上报"虚拟化"后的资源(一卡多份)             │  │
│  │ 2. Allocate 时读取 Pod annotation 中调度    │  │
│  │    器指定的 GPU ID + 显存/算力配额          │  │
│  │ 3. 注入 HAMi-Core 库到容器                  │  │
│  └──────────────────────────────────────────────┘  │
└──────────────────────┬─────────────────────────────┘
                       ↓
┌────────────────────────────────────────────────────┐
│  Container 运行时                                   │
│  ┌──────────────────────────────────────────────┐  │
│  │ HAMi-Core (libvgpu.so) 通过 LD_PRELOAD       │  │
│  │ 拦截 CUDA API:                               │  │
│  │ - cuMemAlloc → 检查显存配额                  │  │
│  │ - cuLaunchKernel → 限制算力时间片            │  │
│  └──────────────────────────────────────────────┘  │
└────────────────────────────────────────────────────┘

四个核心组件:

  1. HAMi Scheduler / Webhook —— 调度决策层
  2. HAMi Device Plugin —— 节点 agent
  3. HAMi-Core (libvgpu.so) —— 容器内的运行时拦截层(关键创新)
  4. HAMi-WebUI —— 可视化监控

30.1 HAMi 怎么解决 GPU ID 重复分配

传统流程(出问题的):
调度器选节点(只看数量)→ kubelet 选 ID → device-plugin 暴露设备
                          ↑
                          状态分散,并发出问题

HAMi 流程:
HAMi 调度器选节点 + 选具体 GPU ID(全局一致视图)
       ↓
   写入 Pod annotation: hami.io/vgpu-devices-allocated="GPU-uuid-0,GPU-uuid-2"
       ↓
   kubelet 调用 device-plugin
       ↓
   HAMi device-plugin 读 annotation,按指定 ID 暴露给容器
   ★ 不再做选择决策,只做执行

HAMi 调度器的 Filter 阶段:

func (s *Scheduler) Filter(pod *Pod, node *Node) bool {
    request := parseGPURequest(pod)
    nodeGPUs := s.deviceCache.GetNodeGPUs(node.Name)
    
    // ★ 用装箱算法找出能满足请求的具体 GPU 组合
    selected := s.selectGPUs(nodeGPUs, request)
    if selected == nil {
        return false
    }
    
    s.cache.Reserve(pod, node.Name, selected)
    return true
}

func (s *Scheduler) Bind(pod *Pod, nodeName string) error {
    selected := s.cache.GetReserved(pod)
    
    annotation := strings.Join(selected.IDs, ",")
    patchAnnotation(pod, "hami.io/vgpu-devices-allocated", annotation)
    
    patchAnnotation(pod, "hami.io/vgpu-memory", fmt.Sprintf("%d", request.Memory))
    patchAnnotation(pod, "hami.io/vgpu-cores", fmt.Sprintf("%d", request.Cores))
    
    return doBinding(pod, nodeName)
}

device-plugin 端:

func (p *Plugin) Allocate(ctx, req *AllocateRequest) (*AllocateResponse, error) {
    pod := p.findPendingPod()
    
    allocatedIDs := pod.Annotations["hami.io/vgpu-devices-allocated"]
    memLimit := pod.Annotations["hami.io/vgpu-memory"]
    coreLimit := pod.Annotations["hami.io/vgpu-cores"]
    
    response := &ContainerAllocateResponse{
        Envs: map[string]string{
            "NVIDIA_VISIBLE_DEVICES":   allocatedIDs,
            "CUDA_DEVICE_MEMORY_LIMIT": memLimit,
            "CUDA_DEVICE_SM_LIMIT":     coreLimit,
        },
        Mounts: []*Mount{
            // 关键:挂载 HAMi-Core 库
            {ContainerPath: "/usr/local/vgpu/libvgpu.so",
             HostPath: "/usr/local/vgpu/libvgpu.so"},
        },
    }
    return response, nil
}

核心改造:决策权完全收归调度器,device-plugin 只是执行者。

31. HAMi-Core 隔离机制

光有 ID 分配还不够,多个 Pod 共享一张物理卡时,怎么保证 A 不能用超 B 的显存、不能抢占 B 的算力?

HAMi 的杀手锏是 HAMi-Core —— 一个通过 LD_PRELOAD 注入容器、拦截 CUDA API 调用的动态库(libvgpu.so)。

应用程序 (PyTorch / TensorFlow)
       ↓
   调用 CUDA API: cuMemAlloc(size=10GB)
       ↓
┌──────────────────────────────────────────┐
│  HAMi-Core (libvgpu.so) 拦截器           │
│                                          │
│  if 已用显存 + size > 配额 (3GB):       │
│      返回 OOM 错误                       │
│  else:                                   │
│      调用真实 CUDA API                   │
│      记账已用显存                        │
└──────────────────────────────────────────┘
       ↓
   真实 CUDA Driver
       ↓
   GPU 硬件

显存隔离:拦截 cuMemAlloc 等内存分配 API,超过配额返回 OOM。

算力隔离:拦截 cuLaunchKernel,通过时间片机制控制 kernel 执行频率。如果配额是 30%,那 100ms 内只允许这个进程的 kernel 占用 30ms。

这是软件层的隔离,不是硬件级(NVIDIA 的 MIG 才是硬件级),但优势是:

  • 任何 GPU 都能用(不像 MIG 只支持 A100/H100)
  • 切分粒度任意(不像 MIG 只能切固定档位)
  • 不需要重启 GPU 配置

代价是有一定性能开销(通常 < 5%),且对应用 100% 透明。

32. 方案对比与选型

32.1 GPU 调度方案对比

方案 切分粒度 隔离强度 兼容性 调度智能
nvidia-device-plugin 整卡 全 NVIDIA 弱(数量)
NVIDIA MIG 固定档位 硬件级 A100/H100 only
NVIDIA Time-Slicing 整卡共享,时间轮转 几乎无 全 NVIDIA
NVIDIA MPS 算力共享 弱(无显存隔离) 全 NVIDIA
vCUDA(腾讯) 显存+算力 软件级 NVIDIA
HAMi 显存+算力任意切 软件级 NVIDIA + 多家国产

32.2 HAMi 的局限

  1. 软件隔离,不是绝对安全 —— LD_PRELOAD 可以被绕过,适合协作式共享,不适合对抗式多租户
  2. 性能开销 —— API 拦截有微小开销
  3. 兼容性问题 —— 某些用 CUDA Driver API 而非 Runtime API 的程序可能行为异常
  4. 调度复杂度上升 —— 切分后调度决策维度变多

32.3 适用场景

适合:

  • AI 推理平台(多模型共享 GPU)
  • 算法团队开发环境
  • 中小模型训练
  • 异构集群(多种 AI 芯片混用)
  • GPU 利用率优化是 KPI

不适合:

  • 大模型训练(本来就要满卡甚至多卡)
  • 强隔离要求的多租户公有云
  • 对延迟极致敏感的低延迟推理

32.4 Volcano + HAMi 协同

Volcano 和 HAMi 不是替代关系而是互补:

  • Volcano:解决批处理调度(Job、Gang、Queue)
  • HAMi:解决 GPU 资源虚拟化和精细分配

生产环境完全可以 Volcano + HAMi 一起用,前者管"任务怎么调度",后者管"GPU 怎么分"。


附录:速查表

A. Volcano Action 与 Plugin 对照

Action 作用 常用 Plugin
enqueue 决定 Job 是否能进入待调度队列 proportion, gang
allocate 资源分配主流程 priority, gang, drf, predicates, nodeorder, binpack
preempt 同队列内抢占 priority, conformance
reclaim 跨队列资源回收 proportion
backfill 小任务回填空隙 (无特殊)

B. Volcano Plugin 注册的回调函数

回调 作用
JobOrderFn 决定 Job 的调度顺序(全局)
TaskOrderFn 决定同 Job 内 Task 的顺序
QueueOrderFn 决定 Queue 的调度顺序
PredicateFn 节点过滤(Filter)
NodeOrderFn 节点打分(Score)
BatchNodeOrderFn 批量节点打分(性能优化)
JobReadyFn Job 是否成组就绪(Gang)
JobEnqueueable Job 能否入队
PreemptableFn 找抢占的 victim
ReclaimableFn 找回收的 victim

C. 排查 GPU 重复分配问题清单

# 1. 检查节点 Pod 总 GPU 请求
NODE=<问题节点>
kubectl get pods -A --field-selector spec.nodeName=$NODE -o json \
  | jq '[.items[].spec.containers[].resources.requests."nvidia.com/gpu" // "0" | tonumber] | add'

# 2. 对比节点容量
kubectl get node $NODE -o jsonpath='{.status.allocatable.nvidia\.com/gpu}'

# 3. 查看 device-plugin 日志
kubectl logs -n kube-system <nvidia-device-plugin-pod>
kubectl logs -n kube-system <nvidia-device-plugin-pod> --previous  # 重启历史

# 4. 查看 checkpoint
cat /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint | jq

# 5. 查看每个容器实际拿到的 GPU
for pod in $(kubectl get pods -A --field-selector spec.nodeName=$NODE -o name); do
  echo "=== $pod ==="
  kubectl exec $pod -- printenv NVIDIA_VISIBLE_DEVICES 2>/dev/null
done

# 6. 查看 GPU 上的进程
nvidia-smi --query-compute-apps=pid,process_name,used_memory,gpu_uuid --format=csv

# 7. 查看 kubelet 设备管理事件
journalctl -u kubelet | grep "device plugin"

D. 关键设计哲学

  1. Session 内绝对一致,Session 间允许偏差,偏差通过重试收敛 —— Volcano 的并发模型
  2. 预占但延迟提交 —— Gang Scheduling 的核心机制
  3. 决策上提到调度器层 —— 解决 GPU 重复分配的根本路径
  4. Action 是骨架,Plugin 是策略 —— Volcano 的扩展性来源
  5. GPU ID 实际是 kubelet 分配的 —— 重复分配问题的根本认知

E. 生态项目关系图

┌────────────────────────────────────────────────────┐
│              Kubernetes 调度生态                    │
└────────────────────────────────────────────────────┘
                       │
        ┌──────────────┼──────────────┐
        ↓              ↓              ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ kube-scheduler│ │   Volcano    │ │   Kueue      │
│ (默认)        │ │ (批处理增强) │ │ (Job 排队)   │
└──────────────┘ └──────┬───────┘ └──────────────┘
                        │
                        │ 协同使用
                        ↓
                 ┌──────────────┐
                 │    HAMi      │
                 │ (GPU 虚拟化) │
                 └──────┬───────┘
                        │
                        ↓
                 ┌──────────────┐
                 │ Device Plugin │
                 │   框架       │
                 └──────┬───────┘
                        │
                        ↓
                 ┌──────────────┐
                 │  GPU 驱动    │
                 └──────────────┘

参考资源

  • Volcano 官方文档: https://volcano.sh
  • Volcano GitHub: https://github.com/volcano-sh/volcano
  • HAMi GitHub: https://github.com/Project-HAMi/HAMi
  • Kubernetes Device Plugin: https://kubernetes.io/docs/concepts/extend-kubernetes/compute-storage-net/device-plugins/
  • Scheduling Framework: https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/
  • DRF 论文: https://people.eecs.berkeley.edu/~alig/papers/drf.pdf

学习建议:先理解第一、二部分,建立全局认知第三部分逐节深入实现细节第四、五部分对比理解,加深对调度模型的认知第六、七部分结合实际问题排查,实战导向第八部分动手敲代码,体验调度器实现第九部分作为生态扩展,了解前沿方案

Read more

容器镜像(4):镜像的常用工具箱

容器镜像(4):镜像的常用工具箱

前几篇在讲多架构镜像时已经用过 skopeo 和 crane 做镜像复制,这篇系统整理这两个工具的完整能力,同时介绍几个日常操作镜像时同样好用的工具。 一、skopeo:不依赖 Daemon 的镜像瑞士军刀 skopeo 的核心价值是绕过 Docker daemon,直接与 Registry API 交互。上一篇用它做镜像复制和离线传输,但它的能力远不止于此。 1.1 安装 # Ubuntu / Debian sudo apt install -y skopeo skopeo --version # skopeo version 1.15.1 1.2 inspect:免拉取检查镜像元数据 docker inspect 需要先把镜像拉到本地,skopeo inspect 直接向 Registry

容器镜像(3):多架构镜像构建

容器镜像(3):多架构镜像构建

一、什么是多架构镜像 1.1 OCI Image Index 上一篇介绍了单平台镜像的结构:一个 Manifest 指向 Config 和若干 Layer blob。多架构镜像在此之上多了一层——OCI Image Index(也叫 Manifest List),是一个轻量的索引文件,把多个单平台 Manifest 组织在一起: $ docker manifest inspect golang:1.22-alpine { "schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests&

容器镜像(2):containerd 视角下的镜像

容器镜像(2):containerd 视角下的镜像

一、为什么需要了解 containerd 如果你只用 docker run 跑容器,从来不关心底层,那可以不了解 containerd。但如果你在用 Kubernetes,或者想真正理解"容器运行时"是什么,containerd 是绕不开的。 事实上,当你执行 docker run 的时候,containerd 早就在后台悄悄工作了——Docker 从 1.11 版本开始,就把核心运行时剥离出来交给 containerd 负责。 1.1 Docker 的架构演变 早期的 Docker(1.10 及之前)是一个"大一统"的单体程序:一个 dockerd