Volcano 与 Kubernetes GPU 调度学习笔记
本笔记系统整理 Volcano 调度器、Kubernetes 调度框架、GPU Device Plugin、HAMi 等云原生 AI 调度领域的核心知识,适合用于学习、复习和工程实践参考。
目录
- 第一部分:Volcano 入门
- 第二部分:Volcano 整体架构
- 第三部分:Volcano 核心实现原理
- 第四部分:并发问题深度分析
- 第五部分:Kubernetes 调度框架对比
- 第六部分:GPU 设备分配与重复分配问题
- 第七部分:Device Plugin 原理与实现
- 第八部分:手搓调度器 Demo
- 第九部分:HAMi 异构算力虚拟化
- 附录:速查表
第一部分:Volcano 入门
1. Volcano 是什么
Volcano 是 CNCF 旗下的云原生批处理调度系统,最初由华为开源,主要用于解决 Kubernetes 默认调度器在 AI 训练、大数据、HPC 等高性能计算场景下的不足。
定位:Kubernetes 之上的批处理调度增强层,与 kube-scheduler 互补而非替代。
2. 安装与快速使用
2.1 安装
最简单方式,使用官方 YAML 一键部署:
kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/master/installer/volcano-development.yaml
也支持 Helm 安装。安装完成后会在 volcano-system 命名空间下看到三个核心组件:
volcano-scheduler—— 调度器volcano-controllers—— Controller Managervolcano-admission—— 准入控制
验证安装:
kubectl get pods -n volcano-system
2.2 提交一个 Volcano Job
Volcano 的核心使用方式是通过它的 Job CRD 提交批处理任务。下面是一个典型的分布式训练任务:
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tensorflow-dist
spec:
minAvailable: 3 # 至少 3 个 Pod 就绪才启动(gang scheduling)
schedulerName: volcano
queue: default # 提交到哪个队列
policies:
- event: PodEvicted
action: RestartJob # Pod 被驱逐时整个 Job 重启
tasks:
- replicas: 1
name: ps
template:
spec:
containers:
- name: tensorflow
image: tensorflow/tensorflow:latest
resources:
requests:
cpu: "2"
memory: "4Gi"
- replicas: 2
name: worker
template:
spec:
containers:
- name: tensorflow
image: tensorflow/tensorflow:latest
resources:
requests:
nvidia.com/gpu: 1
关键字段说明:
minAvailable: 控制成组调度的下限tasks: 定义不同角色的 Podpolicies: 定义生命周期事件的处理策略
2.3 创建队列管理资源
队列用于多租户隔离和资源配额:
apiVersion: scheduling.volcano.sh/v1beta1
kind: Queue
metadata:
name: team-a
spec:
weight: 4 # 资源分配权重
capability: # 队列资源上限
cpu: "100"
memory: "200Gi"
nvidia.com/gpu: "8"
reclaimable: true # 是否允许其他队列回收闲置资源
提交 Job 时通过 spec.queue: team-a 指定即可。
3. 核心特性一览
| 特性 | 用途 |
|---|---|
| Gang Scheduling | 一组 Pod 要么全调度,要么全不调度,避免分布式任务死锁 |
| Queue 队列 | 多租户资源隔离、配额管理、权重分配 |
| DRF 公平调度 | 多维资源(CPU/GPU/内存)下的公平性保证 |
| 优先级与抢占 | 高优先级任务可抢占低优先级任务的资源 |
| 资源回收 | 队列超用资源时可被其他队列回收 |
| 拓扑感知调度 | NUMA、GPU 拓扑感知,优化训练性能 |
| 多框架支持 | TensorFlow、PyTorch、MPI、Spark、Flink 等 |
| Job 生命周期 | 失败重启、依赖控制、插件化扩展(如 SSH、SVC) |
第二部分:Volcano 整体架构
4. Volcano 解决的核心问题
Kubernetes 默认调度器是为在线服务设计的:Pod 粒度、逐个调度、无状态。但批处理场景有几个根本不同:
痛点 1:任务整体性
分布式训练的 100 个 Pod 必须同时启动,少一个就跑不起来。默认调度器逐个调度,可能调度了 99 个就因为资源不足卡死,而这 99 个还占着资源。
痛点 2:多租户公平性
共享 GPU 集群中,A 团队不能把资源全占了让 B 团队饿死,需要按团队/队列做配额和公平分配。
痛点 3:任务间关系
批任务有优先级、有抢占、有依赖,需要更复杂的策略。
Volcano 就是在 Kubernetes 之上补齐这些能力的批处理调度器。
5. 整体架构与数据流
┌─────────────────────────────────────────────────┐
│ 用户提交 Job │
└────────────────────┬────────────────────────────┘
↓
┌─────────────────────────────────────────────────┐
│ Admission Webhook(校验、默认值注入) │
└────────────────────┬────────────────────────────┘
↓
┌─────────────────────────────────────────────────┐
│ Controller Manager │
│ Job → 拆分成 Pods + 创建 PodGroup │
└────────────────────┬────────────────────────────┘
↓
┌─────────────────────────────────────────────────┐
│ Scheduler(核心) │
│ Session → Actions → Plugins → 调度决策 │
└────────────────────┬────────────────────────────┘
↓
kubelet 拉起 Pod
核心数据流:Job → PodGroup → Pods
Job是用户视角的任务- Controller 把 Job 转换成原生 Pods,并创建一个
PodGroup资源把这组 Pod 关联起来 - Scheduler 调度时以
PodGroup为单位做决策
6. 三层抽象模型
理解 Volcano 调度器,记住这三层就够了:
| 抽象 | 角色 | 类比 |
|---|---|---|
| Action | 调度的"动作流程",决定先做什么后做什么 | 工作流的步骤 |
| Plugin | 调度的"决策依据",提供具体的策略函数 | 工作流中的判断规则 |
| Session | 一次调度周期的"上下文",存储集群快照,串起 Action 和 Plugin | 工作流的执行实例 |
核心理解:
- Action 是 做什么(enqueue / allocate / preempt / reclaim / backfill)
- Plugin 是 怎么判断(gang / priority / drf / binpack 等)
- Session 是 在什么环境下(集群快照)
第三部分:Volcano 核心实现原理
7. Session 机制
Session 是调度的基本单元。每隔约 1 秒(可配置),Volcano scheduler 开启一个 Session:
1. Open Session
├─ 从 Cache 拉取集群快照(Nodes、Pods、PodGroups、Queues)
├─ 加载启用的 Plugins,注册回调函数
└─ 构造 Session 上下文
2. Execute Actions(按配置顺序)
enqueue → allocate → preempt → reclaim → backfill
每个 Action 内部调用 Plugin 的回调来做决策
3. Close Session
└─ 提交 Bind 结果,清理状态
为什么要 Session 化?
如果直接读 etcd 实时状态,决策依据会不断变化,难以保证一致性。Session 用快照保证一次调度周期内看到的世界是稳定的。决策完成后才统一提交,避免中间态污染。
Snapshot 实现:
func (sc *SchedulerCache) Snapshot() *api.ClusterInfo {
snapshot := &api.ClusterInfo{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
Queues: make(map[api.QueueID]*api.QueueInfo),
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
// 深拷贝当前 Cache 状态到 snapshot
for _, value := range sc.Nodes {
snapshot.Nodes[value.Name] = value.Clone()
}
return snapshot
}
OpenSession 时调用 Snapshot() 做深拷贝,整个调度过程操作的是这份快照副本,不会受外部 Cache 变化影响。
8. Gang Scheduling 实现
这是 Volcano 最有代表性的能力。
问题:分布式训练 4 个 worker,默认调度器先调度了 3 个、第 4 个资源不够卡住,前 3 个空跑占资源。
实现思路:
- Controller 把 Job 拆成 Pods 时,同时创建一个 PodGroup,并把每个 Pod 标记
annotations[scheduling.k8s.io/group-name]指向这个 PodGroup - 调度器在 allocate 阶段做"预占"而不是直接 bind:先把节点上的资源标记为某个 Pod 占用,但不真正下发到 kubelet
- 当 PodGroup 中预占成功的 Pod 数 ≥
minAvailable时,gang plugin 的JobReadyFn返回 true,统一执行 bind - 如果 Session 结束时仍未达到 minAvailable,所有预占回滚,资源释放给其他任务
关键代码逻辑:
// gang plugin 注册的 JobReady 回调
ssn.AddJobReadyFn(func(obj interface{}) bool {
job := obj.(*api.JobInfo)
occupied := job.ReadyTaskNum() // 已预占成功的 Pod 数
return occupied >= job.MinAvailable
})
// allocate action 中的判断
if !ssn.JobReady(job) {
// 还没成组,继续预占但不 bind
continue
}
// 成组了,统一 bind
ssn.Allocate(...)
9. Queue 与 DRF 公平调度
9.1 DRF 算法
DRF(Dominant Resource Fairness) 是多维资源公平算法的经典实现。
在只有 CPU 一种资源时,公平很简单 —— 按使用量排序。但有 CPU、GPU、内存多种资源时怎么定义"用得多"?
DRF 的答案:看每个用户最稀缺的那种资源占比(叫"主导份额")。
示例:集群有 100 CPU、10 GPU
- A 用了 20 CPU + 1 GPU → CPU 占 20%,GPU 占 10%,主导份额是 20%
- B 用了 10 CPU + 4 GPU → CPU 占 10%,GPU 占 40%,主导份额是 40%
下次调度时优先给主导份额低的(A),这样多种资源下也能保持公平。
Volcano 的 drf plugin 实现:
// drf plugin 注册的 JobOrder 回调(决定 Job 调度顺序)
ssn.AddJobOrderFn(func(l, r interface{}) int {
lv := l.(*api.JobInfo)
rv := r.(*api.JobInfo)
// share = 主导份额 = max(used[i] / total[i])
if lv.Share < rv.Share {
return -1 // share 小的优先
}
return 1
})
9.2 Queue 队列
Queue 在 DRF 之上加了一层:每个 Queue 有 weight,资源按 weight 比例分配,配合 capability 设置上限。这样既有公平性又有租户边界。
10. 抢占与回收
| 概念 | 范围 | 触发条件 |
|---|---|---|
| 抢占(Preempt) | 同队列内 | 高优先级任务无法调度时,挑选低优先级 victim 驱逐 |
| 回收(Reclaim) | 跨队列 | Queue A 资源使用低于 deserved、Queue B 高于 deserved 时,A 提交新任务可回收 B 超用部分 |
两者本质都是"释放资源给更应该获得的任务",区别在于范围。实现上都是 Action 阶段调用 plugin 的 Preemptable / Reclaimable 回调判断哪些 Pod 可被驱逐:
ssn.AddPreemptableFn("priority", func(preemptor, preemptees) []victim {
// 只允许优先级低于 preemptor 的 Pod 成为 victim
var victims []*api.TaskInfo
for _, p := range preemptees {
if p.Priority < preemptor.Priority {
victims = append(victims, p)
}
}
return victims
})
11. Action 与 Plugin 协作流程
一次完整的调度流程:
Session 开启,加载 plugins: [priority, gang, drf, predicates, nodeorder]
│
├─ Action: enqueue
│ └─ 调用各 plugin 的 JobEnqueueable,决定哪些 Job 进入待调度队列
│
├─ Action: allocate
│ ├─ 调用 JobOrderFn (drf):按主导份额排序 Job
│ ├─ 调用 TaskOrderFn (priority):按优先级排序 Pod
│ ├─ 调用 PredicateFn (predicates):过滤不满足约束的节点
│ ├─ 调用 NodeOrderFn (nodeorder, binpack):给可用节点打分排序
│ ├─ 选最优节点做预占
│ └─ 调用 JobReadyFn (gang):成组检查,决定是否 bind
│
├─ Action: preempt
│ └─ 调用 PreemptableFn 找 victim,驱逐低优 Pod
│
├─ Action: reclaim
│ └─ 调用 ReclaimableFn,跨队列回收超用资源
│
└─ Session 关闭,提交决策
插件化的好处:Action 是固定的流程骨架,Plugin 可按需启用。配置在 volcano-scheduler-configmap 里:
actions: "enqueue, allocate, preempt, reclaim, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: drf
- name: predicates
- name: nodeorder
- name: binpack
12. 拓扑感知调度
针对 AI 训练,跨 NUMA 或跨 PCIe 域的通信会显著降低性能。Volcano 通过 NUMA-aware plugin 和 GPU 拓扑感知做优化:
调度时不仅看节点是否有 N 张 GPU,还看这 N 张 GPU 是否在同一个 NVLink group / 同一个 PCIe switch 下,优先选择拓扑紧凑的节点。这通过 vc-agent 在节点上采集拓扑信息、上报给调度器实现。
第四部分:并发问题深度分析
13. Volcano 调度的并发模型
核心结论:单个 scheduler 实例内不存在并发问题,但多实例场景和与外部系统交互时存在挑战。
Volcano scheduler 的核心调度循环本质上是单线程串行的:
for {
ssn := OpenSession(cache) // 拿快照
for _, action := range actions {
action.Execute(ssn) // 串行执行
}
CloseSession(ssn) // 提交结果
time.Sleep(period) // 默认 1 秒
}
在一个 Session 内,多个 PodGroup 的调度决策是顺序处理的,不是并发处理。具体来说:
allocateaction 内部会遍历所有待调度的 Job/PodGroup,按JobOrderFn(比如 DRF)排序后逐个尝试分配- 每分配一个 Pod,Session 内的资源视图会立即更新,下一个 PodGroup 看到的就是更新后的状态
- 整个过程在单个 goroutine 中完成
14. 五种并发场景分析
| 场景 | 是否有并发问题 | 解法 |
|---|---|---|
| 单 Session 内多 PodGroup | 无 | 串行处理 + 快照 |
| Cache 写 vs Session 读 | 有但可控 | 深拷贝快照 + Mutex |
| Session 决策 vs K8s 实际 | 有时间差 | Assume 机制 + 失败重试 |
| 多 Scheduler 副本 | 有 | Leader Election |
| 与 kube-scheduler 共存 | 有 | API Server 乐观锁兜底 |
14.1 Cache 与 Session 的并发:读写分离
┌──────────────────────────────────────┐
│ Informer (watch K8s API) │
│ ├─ Pod Add/Update/Delete │ 写
│ ├─ Node Add/Update/Delete │ ─────► Cache (加锁)
│ └─ PodGroup Add/Update/Delete │
└──────────────────────────────────────┘
▲
│ Snapshot (读)
│
┌──────────────────────────────────────┐ │
│ Scheduler 主循环 │ ─────┘
│ Snapshot → Session → Decisions │
└──────────────────────────────────────┘
Informer 协程持续把 K8s 事件写入 Cache,Scheduler 主循环周期性从 Cache 读快照。两者通过 Cache 内部的 sync.Mutex 互斥。
14.2 Session 与 K8s 实际状态的不一致风险
Session 内的决策是基于快照的,但实际 bind 操作要写入 K8s API,两者之间存在时间差:
T0: OpenSession,快照显示 Node-A 有 8 GPU 空闲
T1: Session 内决定把 PodGroup-X 的 4 个 Pod 调度到 Node-A
T2: 此时另一个进程(比如手动 kubectl run)也往 Node-A 塞了 Pod
T3: Session 结束,bind 请求发到 kubelet
T4: kubelet 发现资源不够,调度失败
Volcano 的应对:
- 乐观假设 + 失败重试:bind 失败后,Pod 会回到待调度状态,下个 Session 重新决策
- Assume 机制:Session 内做出决策后,立即在 Cache 中"假设"这个 Pod 已经分配
- Gang 的预占回滚:Session 内未达到 minAvailable 的预占会在 Session 结束时回滚
func (sc *SchedulerCache) AddBindTask(taskInfo *api.TaskInfo) error {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
// 在 Cache 中预占资源
node := sc.Nodes[taskInfo.NodeName]
node.AddTask(taskInfo)
// 加入 bind 队列异步执行
sc.BindFlowChannel <- taskInfo
return nil
}
14.3 多 Scheduler 实例
Volcano 通过 Leader Election(K8s Lease 资源选主)保证只有 Leader 实例执行调度循环,其他副本待机。
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
scheduler.Run(ctx.Done()) // 只有 Leader 才跑
},
},
})
代价:多副本不能水平扩展调度吞吐,这也是大规模集群下 Volcano 性能的一个限制点。
核心设计哲学:"Session 内绝对一致,Session 间允许偏差,偏差通过重试收敛" —— 典型的最终一致性思路。
第五部分:Kubernetes 调度框架对比
15. kube-scheduler 四阶段
K8s 1.19+ 默认调度器的标准模型:Filter、Score、Reserve、Bind(更细粒度还有 PreFilter、PostFilter、Permit、PreBind 等扩展点)。
Pod 进入调度队列
↓
┌─────────────────┐
│ Filter │ 过滤:哪些节点满足 Pod 要求?
│ (Predicate) │ 例如资源够不够、亲和性、污点容忍
└────────┬────────┘
↓
┌─────────────────┐
│ Score │ 打分:在可用节点中,哪个最合适?
│ (Priority) │ 例如负载均衡、镜像本地性、拓扑分布
└────────┬────────┘
↓
┌─────────────────┐
│ Reserve │ 预占:在 Cache 中标记节点资源已被占用
│ │ 避免下个 Pod 重复使用同一份资源
└────────┬────────┘
↓
┌─────────────────┐
│ Bind │ 绑定:写 Pod.spec.nodeName 到 etcd
│ │ 通知 kubelet 启动
└─────────────────┘
这是单 Pod 视角的流程:每次处理一个 Pod,走完四阶段就提交。
16. Volcano 与四阶段的对应关系
| kube-scheduler 阶段 | Volcano 对应实现 |
|---|---|
| Filter | PredicateFn (在 allocate Action 内调用) |
| Score | NodeOrderFn / BatchNodeOrderFn |
| Reserve | AddBindTask (Cache 预占) |
| Bind | Bind Action / 异步 bind 队列 |
| (无对应) | JobReadyFn (gang 检查,Volcano 独有) |
Volcano 一次 allocate 的内部流程:
for 每个待调度的 Job (按 JobOrderFn 排序,例如 DRF):
for 每个 Task (Pod,按 TaskOrderFn 排序):
┌─────────────────────────────────────┐
│ Step 1: PredicateFn ← 对应 Filter │
│ 遍历所有节点,过滤不满足约束的 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Step 2: NodeOrderFn ← 对应 Score │
│ 给候选节点打分,选最高分的 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Step 3: Allocate ← 对应 Reserve │
│ 在 Cache 中预占资源(不提交 K8s) │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Step 4: JobReadyFn ← Volcano 独有 │
│ 检查 Job 是否成组就绪 (Gang) │
│ 未就绪:继续预占下一个 Task │
│ 已就绪:进入 Bind │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Step 5: Bind ← 对应 Bind │
│ 异步写 Pod.spec.nodeName 到 etcd │
└─────────────────────────────────────┘
17. 结构性差异
17.1 调度循环组织方式
kube-scheduler:Pod-by-Pod 循环
while true:
pod = queue.Pop()
Filter(pod) → Score(pod) → Reserve(pod) → Bind(pod)
// 一个 Pod 走完所有阶段,再处理下一个
每个 Pod 是独立调度单元,阶段是 Pod 内部串行的。
Volcano:Session + Action 循环
while true:
session = OpenSession(snapshot)
enqueue_action(session) // 处理所有 Job 的入队
allocate_action(session) // 处理所有 Pod 的 Filter+Score+Reserve
preempt_action(session) // 处理所有抢占
reclaim_action(session) // 处理所有跨队列回收
backfill_action(session) // 处理所有回填
CloseSession(session) // 统一提交 bind
Action 是批量处理一类操作,阶段是横向跨 Pod 的,不是纵向单 Pod 的。
17.2 这种组织方式带来两个能力
- 全局视角决策 —— 比如 DRF 公平算法,需要看到所有 Job 的资源占用才能排序
- Job 级原子操作 —— Gang scheduling 需要"要么全调度要么不调度",这只能在 Job 维度做
17.3 完整对照速查表
| 维度 | kube-scheduler | Volcano |
|---|---|---|
| 调度单元 | Pod | Job/PodGroup |
| 调度循环 | Pod-by-Pod 顺序 | Session 内批量 Action |
| Filter | Filter 插件接口 | PredicateFn 回调 |
| Score | Score 插件接口 | NodeOrderFn / BatchNodeOrderFn |
| Reserve | 立即在 Cache 预占 | Allocate 预占(Job 级别可延迟提交) |
| Bind | 单 Pod 异步 bind | 批量异步 bind |
| 全局排序 | 不支持(Pod 间独立) | JobOrderFn、QueueOrderFn |
| 成组就绪 | 不支持 | JobReadyFn (Gang) |
| 抢占 | PostFilter 阶段 | 独立的 preempt action |
| 跨队列回收 | 不支持 | 独立的 reclaim action |
第六部分:GPU 设备分配与重复分配问题
18. GPU ID 到底是谁分配的
核心认知:GPU ID 实际是 kubelet 分配的,不是调度器,也不是 device-plugin 主动选的。
完整链路:
调度器选节点(只看数量)→ kubelet 选具体设备 ID → device-plugin 按 ID 构造容器视图
18.1 调度器看到的 GPU
调度器对 nvidia.com/gpu 的处理跟对 cpu、memory 完全一样 —— 就是个数字。
# Node Status 中的 GPU 信息
status:
allocatable:
nvidia.com/gpu: "8" # 总数 8
capacity:
nvidia.com/gpu: "8"
调度器的工作只有:
- 看节点 allocatable 还剩多少 GPU
- 累加节点上已有 Pod 的
requests.nvidia.com/gpu - 判断"剩余 ≥ Pod 申请数"是否成立
- 选定节点后,写
pod.spec.nodeName
调度器完全不参与"选哪几张具体的卡"。
18.2 完整的分配链路
Pod 被调度到 Node
↓
kubelet 收到 Pod 创建事件
↓
┌─────────────────────────────────────────────────┐
│ kubelet 内部组件: DevicePluginManager │
│ │
│ 1. 解析 Pod.spec.containers[].resources.limits │
│ 发现需要 nvidia.com/gpu: 2 │
│ │
│ 2. 查询自己维护的设备池: │
│ allDevices["nvidia.com/gpu"] = { │
│ "GPU-uuid-0": Healthy, // 空闲 │
│ "GPU-uuid-1": Healthy, // 已分配给Pod-X │
│ "GPU-uuid-2": Healthy, // 空闲 │
│ ... │
│ } │
│ allocatedDevices["GPU-uuid-1"] = "Pod-X" │
│ │
│ 3. ★ 关键步骤:从空闲设备中挑选 N 个 │
│ devicesToAllocate = ["GPU-uuid-0", "GPU-uuid-2"] │
│ │
│ 4. 标记这些设备为已分配 │
│ podDevices.insert(podUID, devicesToAllocate)│
└────────────────────┬────────────────────────────┘
↓ gRPC 调用 Allocate
┌─────────────────────────────────────────────────┐
│ nvidia-device-plugin │
│ │
│ 收到: AllocateRequest{ │
│ DevicesIDs: ["GPU-uuid-0", "GPU-uuid-2"] │
│ } │
│ ↑ 注意:ID 是 kubelet 选好传过来的,不是 plugin 选的 │
│ │
│ Plugin 的工作只是构造 AllocateResponse: │
│ - 设置环境变量 NVIDIA_VISIBLE_DEVICES │
│ - 挂载 /dev/nvidia0, /dev/nvidia2 │
│ - 配置驱动库 │
└────────────────────┬────────────────────────────┘
↓ 返回 Response
kubelet 用响应启动容器
关键认知翻转:很多人以为是 device-plugin 选的卡,实际上 plugin 是被动的 —— kubelet 把"要哪几张"算好了通过 DevicesIDs 字段告诉 plugin,plugin 只负责告诉 kubelet"这几张卡怎么暴露给容器"。
19. kubelet 的设备分配实现
具体看 kubelet 源码(pkg/kubelet/cm/devicemanager/manager.go)中的核心逻辑:
// 简化后的 devicesToAllocate 逻辑
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int) (sets.String, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
// 1. 看这个 Pod 之前已经分配过的(重启场景复用)
devices := m.podDevices.containerDevices(podUID, contName, resource)
if devices.Len() == required {
return nil, nil // 已分配
}
// 2. 计算还需要多少
needed := required - devices.Len()
// 3. 从健康设备池中减去已分配的,得到可用设备
available := m.healthyDevices[resource].Difference(m.allocatedDevices[resource])
if available.Len() < needed {
return nil, fmt.Errorf("not enough devices")
}
// 4. ★ 从可用设备中选 needed 个
allocated := available.UnsortedList()[:needed]
// 5. ★ 立即标记为已分配
for _, device := range allocated {
m.allocatedDevices[resource].Insert(device)
m.podDevices.insert(podUID, contName, resource, device, ...)
}
return sets.NewString(allocated...), nil
}
理论上无懈可击的设计:单 kubelet 进程,加锁,状态一致。
20. 重复分配问题的真正发生点
既然 kubelet 内部有锁,问题只可能出在状态丢失或不一致上。
20.1 场景一:kubelet 重启导致状态不一致
kubelet 通过 checkpoint 文件持久化分配状态:/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint
T0: kubelet 运行中,podDevices: { Pod-A: [GPU-0, GPU-1] }
T1: kubelet 进程崩溃
T2: kubelet 重启,从 checkpoint 恢复
T3: ★ 关键空窗期:device-plugin 还没重新注册
T4: 此时如果有新 Pod 调度过来,kubelet 看到 healthyDevices 是空的,
新 Pod 阻塞等待
T5: device-plugin 重新注册,上报 GPU 列表
T6: kubelet 用 checkpoint 中的记录 + plugin 上报的设备重建状态
T6 这一步如果有 bug 就可能出现状态不一致。
20.2 场景二:Pod 删除时序问题(最常见)
T0: Pod-A 占用 GPU-0
T1: Pod-A 被删除,进入 Terminating
T2: kubelet 发起容器停止,但容器还在运行(terminationGracePeriodSeconds)
T3: ★ kubelet 内部状态:podDevices 中 Pod-A 还在,GPU-0 仍标记 allocated
T4: 容器最终停止
T5: kubelet 异步清理 podDevices,移除 Pod-A 的记录
T6: ★ allocatedDevices 中 GPU-0 被标记为可用
如果在 T2~T6 之间 kubelet 释放 GPU ID 早于容器实际释放 GPU,新 Pod 就会被分配同一张卡而老进程还没退出。
这是最容易出问题的场景——账面上没有重复分配(kubelet 觉得 GPU-0 已释放可分配),但物理上重复使用(老进程还在跑)。
20.3 场景三:device-plugin 重启导致状态丢失
并发创建场景下节点压力大,plugin 容易 OOM。重启时如果:
- plugin 重启时间过长,kubelet 可能已把节点 GPU 资源标记为 0
- 新 plugin 上报的设备 ID 和老 plugin 不一致
- 早期版本(< 0.12)的 nvidia-device-plugin 在重启后状态恢复有 bug
20.4 场景四:plugin 自身实现错误
// 错误的 plugin 实现
func (p *Plugin) Allocate(ctx, req *AllocateRequest) (*AllocateResponse, error) {
// BUG: 不看 req.DevicesIDs,自己随便选
response := &ContainerAllocateResponse{
Envs: map[string]string{
"NVIDIA_VISIBLE_DEVICES": "0", // 永远返回 0 号卡!
},
}
}
这时 kubelet 账上记的是 GPU-uuid-0,实际容器拿到的却是 GPU-0(物理 0 号卡),账面和实际错位。
20.5 场景五:GPU 共享方案的混乱
如果用了 vGPU、MPS、Time-Slicing 等共享方案,把一张物理卡当多份分配,"重复分配"反而是预期行为。
21. 排查方法
21.1 第一步:确认是调度层超分还是 device-plugin 冲突
# 出问题时立即抓现场
NODE=<问题节点>
kubectl get pods -A --field-selector spec.nodeName=$NODE -o json \
| jq -r '.items[] | "\(.metadata.namespace)/\(.metadata.name): \(.spec.containers[].resources.requests."nvidia.com/gpu" // "0")"'
# 累加 GPU 请求
kubectl get pods -A --field-selector spec.nodeName=$NODE -o json \
| jq '[.items[].spec.containers[].resources.requests."nvidia.com/gpu" // "0" | tonumber] | add'
- 总和等于 8 但物理冲突 → device-plugin 问题(最可能)
- 总和 > 8 → 调度器层面问题
21.2 第二步:检查 device-plugin 状态
# 看 plugin 日志
kubectl logs -n kube-system <nvidia-device-plugin-pod>
# 看 checkpoint
cat /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint | jq
# 看每个容器实际拿到的卡
for pod in $(kubectl get pods -A --field-selector spec.nodeName=$NODE -o name); do
echo "=== $pod ==="
kubectl exec $pod -- printenv NVIDIA_VISIBLE_DEVICES 2>/dev/null
done
21.3 第三步:确认物理冲突
# 查看哪些进程在用同一张卡
nvidia-smi --query-compute-apps=pid,process_name,used_memory --format=csv
ps -ef | grep <pid> # 确认进程归属哪个容器
crictl ps -a | grep <container-id> # 看容器状态
21.4 解决方案
- 升级 NVIDIA device-plugin 到最新稳定版
- 使用 HAMi 等方案,把 GPU 决策上提到调度器层
- 避免 Pod 频繁重建,设置合理的
terminationGracePeriodSeconds - 容器启动前的健康检查,主动失败而不是继续运行
第七部分:Device Plugin 原理与实现
22. Device Plugin 概述
Kubernetes 本身只认识 CPU 和 Memory。Device Plugin 框架通过 gRPC 标准接口,让第三方设备厂商把自己的设备注册成 K8s 的 Extended Resource,让 Pod 通过 requests.example.com/gpu: 1 这样的方式申请。
┌─────────────────────────────────────────────────┐
│ 用户提交 Pod │
│ resources.requests: │
│ example.com/mydevice: 2 │
└────────────────────┬────────────────────────────┘
↓
┌─────────────────────────────────────────────────┐
│ kube-scheduler │
│ 根据 Node.Status.Allocatable 找有资源的节点 │
└────────────────────┬────────────────────────────┘
↓
┌─────────────────────────────────────────────────┐
│ kubelet │
│ ↓ gRPC 调用 │
└────────────────────┬────────────────────────────┘
↓
┌─────────────────────────────────────────────────┐
│ Device Plugin (DaemonSet) │
│ - ListAndWatch: 上报设备列表 │
│ - Allocate: 分配设备给 Pod │
└─────────────────────────────────────────────────┘
23. 核心机制
23.1 注册流程
1. Plugin 在 /var/lib/kubelet/device-plugins/ 下创建自己的 socket
例如: /var/lib/kubelet/device-plugins/mydevice.sock
2. Plugin 在自己的 socket 上启动 gRPC server,实现 DevicePlugin 接口
3. Plugin 通过 kubelet 的注册 socket(kubelet.sock)发起注册:
Register(RegisterRequest{
Version: "v1beta1",
Endpoint: "mydevice.sock",
ResourceName: "example.com/mydevice",
})
4. kubelet 收到注册后,连接 Plugin 的 socket,调用 ListAndWatch
为什么用 Unix Socket? Plugin 是以 DaemonSet 形式运行在节点上,和 kubelet 在同一台机器,UDS 既高效又安全(不暴露网络端口)。
23.2 核心 gRPC 接口
type DevicePluginServer interface {
GetDevicePluginOptions(ctx, Empty) (*DevicePluginOptions, error)
ListAndWatch(*Empty, DevicePlugin_ListAndWatchServer) error
Allocate(ctx, *AllocateRequest) (*AllocateResponse, error)
PreStartContainer(ctx, *PreStartContainerRequest) (*PreStartContainerResponse, error)
GetPreferredAllocation(ctx, *PreferredAllocationRequest) (*PreferredAllocationResponse, error)
}
23.3 ListAndWatch:设备发现与健康上报
这是个流式 RPC —— Plugin 在这个连接上持续推送设备列表:
func (m *MyDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
// 首次推送当前设备列表
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devices})
// 后续监听设备变化
for {
select {
case <-m.healthCheck:
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devices})
case <-m.stop:
return nil
}
}
}
23.4 Allocate:分配设备的关键时刻
func (m *MyDevicePlugin) Allocate(ctx context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
responses := pluginapi.AllocateResponse{}
for _, containerReq := range req.ContainerRequests {
// containerReq.DevicesIDs 是 kubelet 选好的设备 ID 列表
// Plugin 的工作是告诉 kubelet 怎么把这些设备暴露给容器
response := pluginapi.ContainerAllocateResponse{
Envs: map[string]string{
"MYDEVICE_VISIBLE_IDS": strings.Join(containerReq.DevicesIDs, ","),
},
Devices: []*pluginapi.DeviceSpec{
{
HostPath: "/dev/mydevice0",
ContainerPath: "/dev/mydevice0",
Permissions: "rw",
},
},
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}
return &responses, nil
}
24. 手搓 Device Plugin Demo
实现一个虚拟设备的 Device Plugin:模拟节点上有 4 个名叫 example.com/foo 的设备。
24.1 核心代码
// plugin.go
package main
import (
"context"
"fmt"
"log"
"net"
"os"
"path"
"sync"
"time"
"google.golang.org/grpc"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
const (
resourceName = "example.com/foo"
serverSock = pluginapi.DevicePluginPath + "foo.sock"
deviceCount = 4
)
type FooDevicePlugin struct {
devices []*pluginapi.Device
server *grpc.Server
stop chan struct{}
health chan *pluginapi.Device
// 关键:跟踪设备分配状态,并发安全
mu sync.Mutex
allocated map[string]string // deviceID -> podUID
}
func NewFooDevicePlugin() *FooDevicePlugin {
devs := make([]*pluginapi.Device, deviceCount)
for i := 0; i < deviceCount; i++ {
devs[i] = &pluginapi.Device{
ID: fmt.Sprintf("foo-%d", i),
Health: pluginapi.Healthy,
}
}
return &FooDevicePlugin{
devices: devs,
stop: make(chan struct{}),
health: make(chan *pluginapi.Device),
allocated: make(map[string]string),
}
}
// Start 启动 gRPC 服务并注册到 kubelet
func (p *FooDevicePlugin) Start() error {
os.Remove(serverSock)
sock, err := net.Listen("unix", serverSock)
if err != nil {
return err
}
p.server = grpc.NewServer()
pluginapi.RegisterDevicePluginServer(p.server, p)
go func() {
if err := p.server.Serve(sock); err != nil {
log.Printf("gRPC serve error: %v", err)
}
}()
time.Sleep(1 * time.Second)
return p.register()
}
func (p *FooDevicePlugin) register() error {
conn, err := grpc.Dial(
pluginapi.KubeletSocket,
grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return err
}
defer conn.Close()
client := pluginapi.NewRegistrationClient(conn)
req := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(serverSock),
ResourceName: resourceName,
}
_, err = client.Register(context.Background(), req)
return err
}
func (p *FooDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices}); err != nil {
return err
}
for {
select {
case <-p.stop:
return nil
case d := <-p.health:
for i, dev := range p.devices {
if dev.ID == d.ID {
p.devices[i].Health = d.Health
}
}
s.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices})
}
}
}
func (p *FooDevicePlugin) Allocate(ctx context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
// 关键:加锁防止并发分配冲突
p.mu.Lock()
defer p.mu.Unlock()
responses := pluginapi.AllocateResponse{}
for _, containerReq := range req.ContainerRequests {
for _, id := range containerReq.DevicesIDs {
if existingPod, ok := p.allocated[id]; ok {
log.Printf("WARNING: device %s already allocated to %s", id, existingPod)
}
p.allocated[id] = "current-pod"
}
response := pluginapi.ContainerAllocateResponse{
Envs: map[string]string{
"FOO_VISIBLE_DEVICES": strings.Join(containerReq.DevicesIDs, ","),
},
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}
return &responses, nil
}
func (p *FooDevicePlugin) GetDevicePluginOptions(ctx context.Context, e *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
return &pluginapi.DevicePluginOptions{}, nil
}
func (p *FooDevicePlugin) PreStartContainer(ctx context.Context, req *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
return &pluginapi.PreStartContainerResponse{}, nil
}
func (p *FooDevicePlugin) GetPreferredAllocation(ctx context.Context, req *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
return &pluginapi.PreferredAllocationResponse{}, nil
}
24.2 部署 DaemonSet
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: foo-device-plugin
namespace: kube-system
spec:
selector:
matchLabels:
name: foo-device-plugin
template:
metadata:
labels:
name: foo-device-plugin
spec:
tolerations:
- operator: Exists
priorityClassName: system-node-critical
containers:
- name: foo-device-plugin
image: your-registry/foo-device-plugin:v0.1
securityContext:
privileged: true
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins
24.3 关键设计要点
- Allocate 必须并发安全 ——
mu.Lock()是底线 - 状态持久化 —— 生产级实现要把分配状态写到 checkpoint 文件
- 健康检查 —— 持续检测设备健康状态
- ListAndWatch 必须保持长连接 —— 连接断了 kubelet 会认为 Plugin 不可用
- 与调度器的协作边界 —— Plugin 只在 Allocate 时才知道是哪个 Pod,这是 device-plugin 框架的设计缺陷
第八部分:手搓调度器 Demo
25. 调度器的本质
抛开所有框架,K8s 调度器的工作可以简化成一个循环:
while true:
pod = 找一个 pending 状态的 Pod
node = 决定把它放到哪个节点
binding = 写 Pod.spec.nodeName = node
POST /api/v1/namespaces/{ns}/pods/{name}/binding
只要能创建 Binding 对象,你就是一个调度器。
26. 渐进式实现演进
| 阶段 | 代码量 | 解决了什么 | 还差什么 |
|---|---|---|---|
| Demo 1 | 50 行 | 能 bind Pod | 一切 |
| Demo 2 | 200 行 | Informer + Filter + Score | 超分、并发问题 |
| Demo 3 | +50 行 | Cache 预占防超分 | 设备 ID 分配 |
| Demo 4 | +200 行 | GPU ID 调度器分配 | Gang、抢占、HA |
26.1 Demo 1:50 行的最小调度器
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
const schedulerName = "demo-scheduler"
func main() {
config, _ := rest.InClusterConfig()
client, _ := kubernetes.NewForConfig(config)
for {
pods, _ := client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=,status.phase=Pending",
})
for _, pod := range pods.Items {
if pod.Spec.SchedulerName != schedulerName {
continue
}
nodes, _ := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if len(nodes.Items) == 0 {
continue
}
target := nodes.Items[rand.Intn(len(nodes.Items))]
binding := &corev1.Binding{
ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
Target: corev1.ObjectReference{
Kind: "Node",
Name: target.Name,
},
}
err := client.CoreV1().Pods(pod.Namespace).Bind(context.TODO(), binding, metav1.CreateOptions{})
if err != nil {
log.Printf("Bind failed: %v", err)
continue
}
fmt.Printf("Scheduled %s/%s to %s\n", pod.Namespace, pod.Name, target.Name)
}
time.Sleep(2 * time.Second)
}
}
26.2 Demo 2:用 Informer 改造
生产级调度器都用 Informer 监听资源变化,核心改进:
type Scheduler struct {
client kubernetes.Interface
podLister cache.Indexer
nodeLister cache.Indexer
queue workqueue.RateLimitingInterface
}
// Pod 事件触发入队
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
if s.needSchedule(pod) {
key, _ := cache.MetaNamespaceKeyFunc(obj)
s.queue.Add(key)
}
},
})
func (s *Scheduler) scheduleOne(key string) error {
pod := getPod(key)
// Filter
candidates := s.filter(pod)
if len(candidates) == 0 {
return fmt.Errorf("no fit node for %s", pod.Name)
}
// Score
best := s.score(pod, candidates)
// Bind
return s.bind(pod, best)
}
// Filter: 过滤不满足资源要求的节点
func (s *Scheduler) fitsResources(pod *corev1.Pod, node *corev1.Node) bool {
var cpuReq, memReq int64
for _, c := range pod.Spec.Containers {
cpuReq += c.Resources.Requests.Cpu().MilliValue()
memReq += c.Resources.Requests.Memory().Value()
}
var cpuUsed, memUsed int64
pods := s.podsOnNode(node.Name)
for _, p := range pods {
for _, c := range p.Spec.Containers {
cpuUsed += c.Resources.Requests.Cpu().MilliValue()
memUsed += c.Resources.Requests.Memory().Value()
}
}
cpuAlloc := node.Status.Allocatable.Cpu().MilliValue()
memAlloc := node.Status.Allocatable.Memory().Value()
return cpuUsed+cpuReq <= cpuAlloc && memUsed+memReq <= memAlloc
}
这版的提升:Informer 事件驱动、WorkQueue 限速重试、真实 Filter/Score、解耦。
还存在的问题:Cache 与决策不一致,刚 bind 完的 Pod 可能还没出现在缓存里,下个 Pod 算资源时会"看不到"它,导致超分。
26.3 Demo 3:加入 Cache 预占解决超分
type Cache struct {
mu sync.RWMutex
nodeAlloc map[string]*ResourceUsage
assumedPods map[string]string
}
// Assume: 决策完成立即更新本地视图
func (c *Cache) Assume(pod *corev1.Pod, nodeName string) {
c.mu.Lock()
defer c.mu.Unlock()
key := podKey(pod)
if _, exists := c.assumedPods[key]; exists {
return
}
c.assumedPods[key] = nodeName
if c.nodeAlloc[nodeName] == nil {
c.nodeAlloc[nodeName] = &ResourceUsage{}
}
for _, container := range pod.Spec.Containers {
c.nodeAlloc[nodeName].CPU += container.Resources.Requests.Cpu().MilliValue()
c.nodeAlloc[nodeName].Memory += container.Resources.Requests.Memory().Value()
}
}
// Forget: bind 失败时回滚
func (c *Cache) Forget(pod *corev1.Pod, nodeName string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.assumedPods, podKey(pod))
if usage := c.nodeAlloc[nodeName]; usage != nil {
for _, container := range pod.Spec.Containers {
usage.CPU -= container.Resources.Requests.Cpu().MilliValue()
usage.Memory -= container.Resources.Requests.Memory().Value()
}
}
}
调度流程:
func (s *Scheduler) scheduleOne(key string) error {
pod := getPod(key)
candidates := s.filter(pod)
best := s.score(pod, candidates)
s.cache.Assume(pod, best.Name) // 立即预占
if err := s.bind(pod, best); err != nil {
s.cache.Forget(pod, best.Name) // 失败回滚
return err
}
return nil
}
27. 支持 GPU ID 分配的调度器
回到核心目标 —— 让调度器直接决定 GPU ID,而不是只决定数量。
27.1 整体设计
┌─────────────────────────────────────────────────┐
│ Demo Scheduler │
│ │
│ Filter: 节点剩余 GPU 数 ≥ 请求数? │
│ Score: 选剩 GPU 多的节点 │
│ Allocate: ★ 从节点空闲 GPU ID 中选具体 ID │
│ Bind: ★ 把选好的 ID 写到 Pod annotation │
└────────────────────┬────────────────────────────┘
↓
┌─────────────────────────────────────────────────┐
│ Modified Device Plugin │
│ Allocate(): 读 Pod annotation 中调度器指定的 ID │
│ 不再自己选 │
└─────────────────────────────────────────────────┘
27.2 节点 GPU 状态管理
type NodeGPUState struct {
Total []string // 所有 GPU ID
Allocated map[string]string // GPU ID -> Pod Key
}
type GPUCache struct {
mu sync.RWMutex
nodes map[string]*NodeGPUState
}
// 分配(核心方法,加锁防并发)
func (c *GPUCache) Allocate(nodeName string, podKey string, count int) ([]string, error) {
c.mu.Lock()
defer c.mu.Unlock()
state := c.nodes[nodeName]
if state == nil {
return nil, fmt.Errorf("node %s has no GPU info", nodeName)
}
var free []string
for _, id := range state.Total {
if _, used := state.Allocated[id]; !used {
free = append(free, id)
}
}
if len(free) < count {
return nil, fmt.Errorf("not enough GPU on %s: need %d, have %d", nodeName, count, len(free))
}
selected := free[:count]
for _, id := range selected {
state.Allocated[id] = podKey
}
return selected, nil
}
func (c *GPUCache) Release(nodeName string, podKey string) {
c.mu.Lock()
defer c.mu.Unlock()
state := c.nodes[nodeName]
if state == nil {
return
}
for id, owner := range state.Allocated {
if owner == podKey {
delete(state.Allocated, id)
}
}
}
27.3 调度流程改造
func (s *Scheduler) scheduleOne(key string) error {
pod := getPod(key)
gpuRequest := parseGPURequest(pod)
candidates := s.filter(pod, gpuRequest)
if len(candidates) == 0 {
return fmt.Errorf("no fit node")
}
best := s.score(pod, candidates)
// ★ 关键:调度器直接选具体的 GPU ID
var allocatedGPUs []string
if gpuRequest > 0 {
ids, err := s.gpuCache.Allocate(best.Name, podKey(pod), gpuRequest)
if err != nil {
return err
}
allocatedGPUs = ids
}
return s.bindWithGPU(pod, best, allocatedGPUs)
}
func (s *Scheduler) bindWithGPU(pod *corev1.Pod, node *corev1.Node, gpuIDs []string) error {
// 1. 先 patch annotation(让 device-plugin 能读到)
if len(gpuIDs) > 0 {
patch := fmt.Sprintf(`{"metadata":{"annotations":{"demo-scheduler/allocated-gpus":"%s"}}}`,
strings.Join(gpuIDs, ","))
_, err := s.client.CoreV1().Pods(pod.Namespace).Patch(
context.TODO(), pod.Name, types.StrategicMergePatchType,
[]byte(patch), metav1.PatchOptions{})
if err != nil {
s.gpuCache.Release(node.Name, podKey(pod))
return err
}
}
// 2. 再 bind
binding := &corev1.Binding{
ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
Target: corev1.ObjectReference{Kind: "Node", Name: node.Name},
}
return s.client.CoreV1().Pods(pod.Namespace).Bind(context.TODO(), binding, metav1.CreateOptions{})
}
27.4 配套的 Device Plugin 改造
func (p *Plugin) Allocate(ctx context.Context, req *AllocateRequest) (*AllocateResponse, error) {
responses := AllocateResponse{}
for _, containerReq := range req.ContainerRequests {
// 尝试从 Pod annotation 读调度器指定的 ID
gpuIDs := p.lookupAllocatedGPUsFromPodAnnotation(containerReq)
if len(gpuIDs) == 0 {
gpuIDs = containerReq.DevicesIDs
}
response := &ContainerAllocateResponse{
Envs: map[string]string{
"NVIDIA_VISIBLE_DEVICES": strings.Join(gpuIDs, ","),
},
}
responses.ContainerResponses = append(responses.ContainerResponses, response)
}
return &responses, nil
}
27.5 演进方向
| 方向 | 说明 |
|---|---|
| Leader Election | 多副本时用 K8s Lease 选主,防止并发决策 |
| 抢占 | 找不到节点时驱逐低优先级 Pod 腾资源 |
| Gang Scheduling | 支持 PodGroup,所有 Pod 同时就绪才 bind |
| 队列与公平调度 | 按 Queue 做配额、按 DRF 排序 Job |
| 拓扑感知 | 选 GPU 时优先同一 NVLink group / NUMA |
| 性能优化 | Filter 阶段并发跑,Score 也是 |
| Scheduler Framework 集成 | 基于 k8s.io/kubernetes/pkg/scheduler/framework 写插件 |
核心认知:
- K8s 调度器本质就是一个控制循环 —— 找 pending Pod,选节点,写 binding
- 复杂的工程问题(性能、一致性、扩展性)才是大头
- 想解决 GPU 重复分配,根本路径是把决策上提到调度器层
第九部分:HAMi 异构算力虚拟化
28. HAMi 简介
HAMi(Heterogeneous AI Computing Virtualization Middleware,原名 k8s-vGPU-scheduler)是一个 CNCF Sandbox 项目,专注于在 Kubernetes 上做异构 AI 算力的虚拟化和精细调度。
HAMi 解决的核心问题:
- GPU 整卡分配粒度太粗 —— 利用率普遍只有 20-40%
- 调度决策与设备分配脱节 —— 重复分配问题
- 异构设备管理混乱 —— 多家芯片厂商各一套
- 缺乏使用约束 —— 申请 1 卡的 Pod 实际能看到整张卡
29. HAMi 核心能力
29.1 GPU 细粒度切分
一张物理 GPU 可以被切成多份给多个 Pod 用,按显存和算力两个维度独立切分:
apiVersion: v1
kind: Pod
metadata:
name: gpu-pod
spec:
containers:
- name: trainer
image: pytorch:latest
resources:
limits:
nvidia.com/gpu: 1 # 用 1 张卡(可以是切片)
nvidia.com/gpumem: 3000 # 限制 3000 MB 显存
nvidia.com/gpucores: 30 # 限制 30% 算力
gpumem: 显存限制(MB)gpucores: 算力百分比- 一张 24GB 的 A10 可以同时跑 8 个
gpumem: 3000的 Pod
这不是逻辑切分,而是真正的硬隔离(软件层硬隔离,后面会讲实现机制)。
29.2 拓扑感知调度
节点 GPU 拓扑:
GPU-0 ─ NVLink ─ GPU-1 ┐
GPU-2 ─ NVLink ─ GPU-3 ├─ Group A (PCIe Switch 1)
┘
GPU-4 ─ NVLink ─ GPU-5 ┐
GPU-6 ─ NVLink ─ GPU-7 ├─ Group B (PCIe Switch 2)
┘
需要 4 张卡的训练任务,HAMi 会尝试给同一个 Group 内的 4 张(避免跨 PCIe 通信)。
29.3 多种异构设备统一管理
支持的设备:NVIDIA GPU、华为昇腾(910B/910/310)、寒武纪 MLU、海光 DCU、Intel GPU、天数智芯、沐曦、摩尔线程等。
所有设备走统一的资源接口和调度逻辑。
29.4 节点选择策略
- binpack:尽量装满一个节点再用下一个,适合提高单节点利用率
- spread:均匀分布到多个节点,适合避免单点故障
- 拓扑亲和:同一 Job 的 Pod 尽量调度到拓扑近的节点
30. HAMi 架构与实现
┌────────────────────────────────────────────────────┐
│ 用户提交 Pod │
│ resources.limits: │
│ nvidia.com/gpu: 1 │
│ nvidia.com/gpumem: 3000 │
└──────────────────────┬─────────────────────────────┘
↓
┌────────────────────────────────────────────────────┐
│ HAMi Scheduler Extender / Webhook │
│ ┌──────────────────────────────────────────────┐ │
│ │ 1. Webhook 拦截 Pod,识别 HAMi 资源请求 │ │
│ │ 2. 修改 schedulerName 走 HAMi 调度 │ │
│ │ 3. Extender 与 kube-scheduler 协作做决策 │ │
│ │ 4. ★ 选具体 GPU ID,写入 Pod annotation │ │
│ └──────────────────────────────────────────────┘ │
└──────────────────────┬─────────────────────────────┘
↓
┌────────────────────────────────────────────────────┐
│ HAMi Device Plugin (DaemonSet) │
│ ┌──────────────────────────────────────────────┐ │
│ │ 1. 上报"虚拟化"后的资源(一卡多份) │ │
│ │ 2. Allocate 时读取 Pod annotation 中调度 │ │
│ │ 器指定的 GPU ID + 显存/算力配额 │ │
│ │ 3. 注入 HAMi-Core 库到容器 │ │
│ └──────────────────────────────────────────────┘ │
└──────────────────────┬─────────────────────────────┘
↓
┌────────────────────────────────────────────────────┐
│ Container 运行时 │
│ ┌──────────────────────────────────────────────┐ │
│ │ HAMi-Core (libvgpu.so) 通过 LD_PRELOAD │ │
│ │ 拦截 CUDA API: │ │
│ │ - cuMemAlloc → 检查显存配额 │ │
│ │ - cuLaunchKernel → 限制算力时间片 │ │
│ └──────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────┘
四个核心组件:
- HAMi Scheduler / Webhook —— 调度决策层
- HAMi Device Plugin —— 节点 agent
- HAMi-Core (libvgpu.so) —— 容器内的运行时拦截层(关键创新)
- HAMi-WebUI —— 可视化监控
30.1 HAMi 怎么解决 GPU ID 重复分配
传统流程(出问题的):
调度器选节点(只看数量)→ kubelet 选 ID → device-plugin 暴露设备
↑
状态分散,并发出问题
HAMi 流程:
HAMi 调度器选节点 + 选具体 GPU ID(全局一致视图)
↓
写入 Pod annotation: hami.io/vgpu-devices-allocated="GPU-uuid-0,GPU-uuid-2"
↓
kubelet 调用 device-plugin
↓
HAMi device-plugin 读 annotation,按指定 ID 暴露给容器
★ 不再做选择决策,只做执行
HAMi 调度器的 Filter 阶段:
func (s *Scheduler) Filter(pod *Pod, node *Node) bool {
request := parseGPURequest(pod)
nodeGPUs := s.deviceCache.GetNodeGPUs(node.Name)
// ★ 用装箱算法找出能满足请求的具体 GPU 组合
selected := s.selectGPUs(nodeGPUs, request)
if selected == nil {
return false
}
s.cache.Reserve(pod, node.Name, selected)
return true
}
func (s *Scheduler) Bind(pod *Pod, nodeName string) error {
selected := s.cache.GetReserved(pod)
annotation := strings.Join(selected.IDs, ",")
patchAnnotation(pod, "hami.io/vgpu-devices-allocated", annotation)
patchAnnotation(pod, "hami.io/vgpu-memory", fmt.Sprintf("%d", request.Memory))
patchAnnotation(pod, "hami.io/vgpu-cores", fmt.Sprintf("%d", request.Cores))
return doBinding(pod, nodeName)
}
device-plugin 端:
func (p *Plugin) Allocate(ctx, req *AllocateRequest) (*AllocateResponse, error) {
pod := p.findPendingPod()
allocatedIDs := pod.Annotations["hami.io/vgpu-devices-allocated"]
memLimit := pod.Annotations["hami.io/vgpu-memory"]
coreLimit := pod.Annotations["hami.io/vgpu-cores"]
response := &ContainerAllocateResponse{
Envs: map[string]string{
"NVIDIA_VISIBLE_DEVICES": allocatedIDs,
"CUDA_DEVICE_MEMORY_LIMIT": memLimit,
"CUDA_DEVICE_SM_LIMIT": coreLimit,
},
Mounts: []*Mount{
// 关键:挂载 HAMi-Core 库
{ContainerPath: "/usr/local/vgpu/libvgpu.so",
HostPath: "/usr/local/vgpu/libvgpu.so"},
},
}
return response, nil
}
核心改造:决策权完全收归调度器,device-plugin 只是执行者。
31. HAMi-Core 隔离机制
光有 ID 分配还不够,多个 Pod 共享一张物理卡时,怎么保证 A 不能用超 B 的显存、不能抢占 B 的算力?
HAMi 的杀手锏是 HAMi-Core —— 一个通过 LD_PRELOAD 注入容器、拦截 CUDA API 调用的动态库(libvgpu.so)。
应用程序 (PyTorch / TensorFlow)
↓
调用 CUDA API: cuMemAlloc(size=10GB)
↓
┌──────────────────────────────────────────┐
│ HAMi-Core (libvgpu.so) 拦截器 │
│ │
│ if 已用显存 + size > 配额 (3GB): │
│ 返回 OOM 错误 │
│ else: │
│ 调用真实 CUDA API │
│ 记账已用显存 │
└──────────────────────────────────────────┘
↓
真实 CUDA Driver
↓
GPU 硬件
显存隔离:拦截 cuMemAlloc 等内存分配 API,超过配额返回 OOM。
算力隔离:拦截 cuLaunchKernel,通过时间片机制控制 kernel 执行频率。如果配额是 30%,那 100ms 内只允许这个进程的 kernel 占用 30ms。
这是软件层的隔离,不是硬件级(NVIDIA 的 MIG 才是硬件级),但优势是:
- 任何 GPU 都能用(不像 MIG 只支持 A100/H100)
- 切分粒度任意(不像 MIG 只能切固定档位)
- 不需要重启 GPU 配置
代价是有一定性能开销(通常 < 5%),且对应用 100% 透明。
32. 方案对比与选型
32.1 GPU 调度方案对比
| 方案 | 切分粒度 | 隔离强度 | 兼容性 | 调度智能 |
|---|---|---|---|---|
| nvidia-device-plugin | 整卡 | 强 | 全 NVIDIA | 弱(数量) |
| NVIDIA MIG | 固定档位 | 硬件级 | A100/H100 only | 弱 |
| NVIDIA Time-Slicing | 整卡共享,时间轮转 | 几乎无 | 全 NVIDIA | 弱 |
| NVIDIA MPS | 算力共享 | 弱(无显存隔离) | 全 NVIDIA | 弱 |
| vCUDA(腾讯) | 显存+算力 | 软件级 | NVIDIA | 弱 |
| HAMi | 显存+算力任意切 | 软件级 | NVIDIA + 多家国产 | 强 |
32.2 HAMi 的局限
- 软件隔离,不是绝对安全 ——
LD_PRELOAD可以被绕过,适合协作式共享,不适合对抗式多租户 - 性能开销 —— API 拦截有微小开销
- 兼容性问题 —— 某些用 CUDA Driver API 而非 Runtime API 的程序可能行为异常
- 调度复杂度上升 —— 切分后调度决策维度变多
32.3 适用场景
适合:
- AI 推理平台(多模型共享 GPU)
- 算法团队开发环境
- 中小模型训练
- 异构集群(多种 AI 芯片混用)
- GPU 利用率优化是 KPI
不适合:
- 大模型训练(本来就要满卡甚至多卡)
- 强隔离要求的多租户公有云
- 对延迟极致敏感的低延迟推理
32.4 Volcano + HAMi 协同
Volcano 和 HAMi 不是替代关系而是互补:
- Volcano:解决批处理调度(Job、Gang、Queue)
- HAMi:解决 GPU 资源虚拟化和精细分配
生产环境完全可以 Volcano + HAMi 一起用,前者管"任务怎么调度",后者管"GPU 怎么分"。
附录:速查表
A. Volcano Action 与 Plugin 对照
| Action | 作用 | 常用 Plugin |
|---|---|---|
| enqueue | 决定 Job 是否能进入待调度队列 | proportion, gang |
| allocate | 资源分配主流程 | priority, gang, drf, predicates, nodeorder, binpack |
| preempt | 同队列内抢占 | priority, conformance |
| reclaim | 跨队列资源回收 | proportion |
| backfill | 小任务回填空隙 | (无特殊) |
B. Volcano Plugin 注册的回调函数
| 回调 | 作用 |
|---|---|
| JobOrderFn | 决定 Job 的调度顺序(全局) |
| TaskOrderFn | 决定同 Job 内 Task 的顺序 |
| QueueOrderFn | 决定 Queue 的调度顺序 |
| PredicateFn | 节点过滤(Filter) |
| NodeOrderFn | 节点打分(Score) |
| BatchNodeOrderFn | 批量节点打分(性能优化) |
| JobReadyFn | Job 是否成组就绪(Gang) |
| JobEnqueueable | Job 能否入队 |
| PreemptableFn | 找抢占的 victim |
| ReclaimableFn | 找回收的 victim |
C. 排查 GPU 重复分配问题清单
# 1. 检查节点 Pod 总 GPU 请求
NODE=<问题节点>
kubectl get pods -A --field-selector spec.nodeName=$NODE -o json \
| jq '[.items[].spec.containers[].resources.requests."nvidia.com/gpu" // "0" | tonumber] | add'
# 2. 对比节点容量
kubectl get node $NODE -o jsonpath='{.status.allocatable.nvidia\.com/gpu}'
# 3. 查看 device-plugin 日志
kubectl logs -n kube-system <nvidia-device-plugin-pod>
kubectl logs -n kube-system <nvidia-device-plugin-pod> --previous # 重启历史
# 4. 查看 checkpoint
cat /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint | jq
# 5. 查看每个容器实际拿到的 GPU
for pod in $(kubectl get pods -A --field-selector spec.nodeName=$NODE -o name); do
echo "=== $pod ==="
kubectl exec $pod -- printenv NVIDIA_VISIBLE_DEVICES 2>/dev/null
done
# 6. 查看 GPU 上的进程
nvidia-smi --query-compute-apps=pid,process_name,used_memory,gpu_uuid --format=csv
# 7. 查看 kubelet 设备管理事件
journalctl -u kubelet | grep "device plugin"
D. 关键设计哲学
- Session 内绝对一致,Session 间允许偏差,偏差通过重试收敛 —— Volcano 的并发模型
- 预占但延迟提交 —— Gang Scheduling 的核心机制
- 决策上提到调度器层 —— 解决 GPU 重复分配的根本路径
- Action 是骨架,Plugin 是策略 —— Volcano 的扩展性来源
- GPU ID 实际是 kubelet 分配的 —— 重复分配问题的根本认知
E. 生态项目关系图
┌────────────────────────────────────────────────────┐
│ Kubernetes 调度生态 │
└────────────────────────────────────────────────────┘
│
┌──────────────┼──────────────┐
↓ ↓ ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ kube-scheduler│ │ Volcano │ │ Kueue │
│ (默认) │ │ (批处理增强) │ │ (Job 排队) │
└──────────────┘ └──────┬───────┘ └──────────────┘
│
│ 协同使用
↓
┌──────────────┐
│ HAMi │
│ (GPU 虚拟化) │
└──────┬───────┘
│
↓
┌──────────────┐
│ Device Plugin │
│ 框架 │
└──────┬───────┘
│
↓
┌──────────────┐
│ GPU 驱动 │
└──────────────┘
参考资源
- Volcano 官方文档: https://volcano.sh
- Volcano GitHub: https://github.com/volcano-sh/volcano
- HAMi GitHub: https://github.com/Project-HAMi/HAMi
- Kubernetes Device Plugin: https://kubernetes.io/docs/concepts/extend-kubernetes/compute-storage-net/device-plugins/
- Scheduling Framework: https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/
- DRF 论文: https://people.eecs.berkeley.edu/~alig/papers/drf.pdf
学习建议:先理解第一、二部分,建立全局认知第三部分逐节深入实现细节第四、五部分对比理解,加深对调度模型的认知第六、七部分结合实际问题排查,实战导向第八部分动手敲代码,体验调度器实现第九部分作为生态扩展,了解前沿方案