1. HAMi是什么
想象一下你是一位繁忙的AI研究员,手头有好几个实验要跑,但实验室里的GPU显卡资源有限。这时候,HAMi就像一位智慧的资源管理员,帮你合理分配和调度这些宝贵的GPU资源。它就像是给GPU装上了"分身术",让多个AI任务能够和谐共处在同一块显卡上。HAMi目前已经加入了云原生界的"黄埔军校"CNCF(云原生计算基金会),作为一个充满潜力的沙箱项目茁壮成长。
让我们通过几个生动的场景,来了解HAMi的神奇之处:
-
设备共享有妙招,就像一个灵活的"设备调度大师":
- 不管是GPU还是NPU,都能轻松管理,就像训练有素的管家
- 一个AI任务想要多个显卡?没问题!就像给你配备多位得力助手
-
显存管理很智能,犹如一位精明的"内存管家":
- 每个任务的显存使用都有明确的"配额",不会互相争抢
- 需要更多显存?可以随时调整,就像弹性伸缩的口袋
- 想用具体数值还是百分比来分配?都可以,就像点菜可以按份量也可以按比例
-
设备挑选很贴心,像个专业的"设备配对专家":
- 只想用RTX 4090?没问题,就像指定特定型号的跑车
- 还能通过UUID精确定位设备,就像每个显卡都有自己的身份证
-
上手特别容易,就像一个"零门槛"的好帮手:
- AI程序完全不用改代码,就像无缝衔接的魔法
- 安装卸载像玩积木一样简单,helm工具轻松搞定
-
开放合作大家庭,像一个温暖的"开源社区":
- 各行各业的大咖都在参与,从互联网到金融,从制造业到云服务
- 加入CNCF大家庭,让更多朋友一起来建设,就像一个开放的创新工坊
2. HAMi架构
HAMi这套架构就像一个高效运转的智能办公大楼,由四个关键人员协同工作:
-
前台接待员(HAMi MutatingWebhook)就像大楼的智能前台,当新的AI任务来访时:
- 会仔细检查访客的"需求清单"(资源申请)
- 如果发现这位访客只需要基础设施(CPU、内存)或HAMi特供服务,就会贴上"HAMi特别通行证"
- 相当于一位细心的门卫,确保每个来访者都能得到合适的接待方式
-
调度主管(HAMi scheduler-extender)犹如大楼的总调度室管理员:
- 负责给每位访客安排最合适的办公位置(节点)和工作设备(GPU/NPU)
- 就像有一个实时更新的"全楼设备状态大屏",随时掌握每个设备的使用情况
- 能够根据访客需求和设备状态,做出最优的分配决策
-
设备管家(Device-plugin)像是一位神通广大的设备连接专家:
- 看到调度主管的工位安排单后,立即着手准备相应的设备
- 负责将显卡等设备"插上电源",确保设备随时可用
- 就像是在访客和设备之间搭建一座便捷的桥梁
-
资源管控员(HAMi-Core)就像一位尽职的资源监管员:
- 时刻关注每个访客的资源使用情况,确保不会超出预定配额
- 设置明确的资源使用界限,防止互相干扰
- 当某个任务试图占用过多资源时,会及时进行管控
3. HAMi应用场景
3.1 设备共享
HAMi的设备共享就像是一个智能的资源分配系统,让一块显卡能够同时服务多个AI任务:
-
灵活的显存分配
- 可以精确指定每个任务使用多少显存
- 就像给每个租户分配固定大小的"储物柜"
-
算力精确控制
- 对计算单元(流处理器)进行严格限制
- 就像给每个租户分配固定数量的"工作人员"
-
核心使用率管理
- 支持按需分配设备核心使用率
- 就像给不同任务分配不同的"工作时间"
-
无缝适配已有程序
- 完全不需要修改现有的程序代码
- 就像租客入住时不需要重新装修,直接拎包入住
3.2 设备资源隔离
resources:
limits:
nvidia.com/gpu: 1 # 请求1个虚拟GPU
nvidia.com/gpumem: 3000 # 每个虚拟GPU包含3000MB设备内存
在容器内部看到的1张卡,显存3GB
4. Helm部署HAMi
4.1 前置条件
- Helm version v3+
- kubectl version v1.16+
- CUDA version v10.2+
- Nvidia Driver version v440+
4.2 配置nvidia runtime
安装nvidia-container-toolkit,配置nvidia-container-runtime
distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
curl -s -L https://nvidia.github.io/libnvidia-container/gpgkey | sudo apt-key add -
curl -s -L https://nvidia.github.io/libnvidia-container/$distribution/libnvidia-container.list | sudo tee /etc/apt/sources.list.d/libnvidia-container.list
sudo apt-get update && sudo apt-get install -y nvidia-container-toolkit
# 如果container runtime使用的是docker
vim /etc/docker/daemon.json
{
"default-runtime": "nvidia",
"runtimes": {
"nvidia": {
"path": "/usr/bin/nvidia-container-runtime",
"runtimeArgs": []
}
}
}
sudo systemctl daemon-reload && systemctl restart docker
# 如果container runtime使用的是containerd
(base) root@vgpu:/root# nvidia-ctk runtime configure --runtime=containerd
INFO[0000] Wrote updated config to /etc/containerd/config.toml
INFO[0000] It is recommended that containerd daemon be restarted.
# 如果container runtime使用的是containerd
vim /etc/containerd/config.toml
version = 2
[plugins]
[plugins."io.containerd.grpc.v1.cri"]
[plugins."io.containerd.grpc.v1.cri".containerd]
default_runtime_name = "nvidia"
[plugins."io.containerd.grpc.v1.cri".containerd.runtimes]
[plugins."io.containerd.grpc.v1.cri".containerd.runtimes.nvidia]
privileged_without_host_devices = false
runtime_engine = ""
runtime_root = ""
runtime_type = "io.containerd.runc.v2"
[plugins."io.containerd.grpc.v1.cri".containerd.runtimes.nvidia.options]
BinaryName = "/usr/bin/nvidia-container-runtime"
sudo systemctl daemon-reload && systemctl restart containerd
为containerd配置代理(docker daemon同理)
vim /etc/systemd/system/containerd.service
[Service]
Environment="HTTP_PROXY=http://<代理地址>:<代理端口>"
Environment="HTTPS_PROXY=http://<代理地址>:<代理端口>"
Environment="NO_PROXY=localhost,127.0.0.1"
4.3 GPU节点标签
kubectl label nodes {nodeid} gpu=on
4.4 部署HAMi
image_version=$(kubectl version 2>/dev/null | grep GitVersion | head -1 | awk -F'"' '{print $6}')
helm repo add hami-charts https://project-hami.github.io/HAMi/
helm install hami hami-charts/hami --set scheduler.kubeScheduler.imageTag=${image_version} -n kube-system
4.5 验证HAMi
创建gpu pod
# 创建一个测试用的Pod,用于验证HAMi的基本功能
cat << 'EOF' | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
name: gpu-pod
spec:
containers:
- name: ubuntu-container
image: ubuntu:18.04
command: ["bash", "-c", "sleep 86400"]
resources:
limits:
nvidia.com/gpu: 1 # requesting 1 vGPUs
nvidia.com/gpumem: 10240 # Each vGPU contains 3000m device memory (Optional,Integer)
EOF
进入到容器里查看,可以看到1张卡,显存1GB
(base) root@vgpu:~# kubectl exec -it gpu-pod -- nvidia-smi
[HAMI-core Msg(24:139957573396288:libvgpu.c:836)]: Initializing.....
Fri Dec 13 05:55:05 2024
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.135 Driver Version: 550.135 CUDA Version: 12.4 |
|-----------------------------------------+------------------------+----------------------+
| GPU Name Persistence-M | Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap | Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|=========================================+========================+======================|
| 0 NVIDIA GeForce RTX 3090 Off | 00000000:00:05.0 Off | N/A |
| 0% 26C P8 7W / 370W | 0MiB / 10240MiB | 0% Default |
| | | N/A |
+-----------------------------------------+------------------------+----------------------+
+-----------------------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=========================================================================================|
| No running processes found |
+-----------------------------------------------------------------------------------------+
[HAMI-core Msg(24:139957573396288:multiprocess_memory_limit.c:497)]: Calling exit handler 24
4.6 多pod共享GPU
# 创建一个包含3个Pod的Deployment,用于测试GPU共享功能
cat << 'EOF' | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
name: hami-npod-1gpu
spec:
replicas: 3 # 创建三个相同的 Pod,可根据需要修改数量
selector:
matchLabels:
app: pytorch
template:
metadata:
labels:
app: pytorch
spec:
containers:
- name: pytorch-container
image: uhub.service.ucloud.cn/gpu-share/gpu_pytorch_test:latest
command: ["/bin/sh", "-c"]
args: ["cd /app/pytorch_code && python3 2.py"]
resources:
limits:
nvidia.com/gpu: 1
nvidia.com/gpumem: 3000
nvidia.com/gpucores: 25
EOF
# 查看创建的Pod状态
(base) root@vgpu:~# kubectl get pod
NAME READY STATUS RESTARTS AGE
hami-npod-1gpu-6f65f668b7-6k2jh 1/1 Running 0 5s
hami-npod-1gpu-6f65f668b7-f48gm 1/1 Running 0 5s
hami-npod-1gpu-6f65f668b7-kpnbg 1/1 Running 0 5s
# 查看主机上的GPU使用情况
(base) root@vgpu:~# nvidia-smi
Fri Dec 13 09:32:47 2024
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.135 Driver Version: 550.135 CUDA Version: 12.4 |
|-----------------------------------------+------------------------+----------------------+
| GPU Name Persistence-M | Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap | Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|=========================================+========================+======================|
| 0 NVIDIA GeForce RTX 3090 Off | 00000000:00:05.0 Off | N/A |
| 34% 33C P2 103W / 370W | 1320MiB / 24576MiB | 2% Default |
| | | N/A |
+-----------------------------------------+------------------------+----------------------+
+-----------------------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=========================================================================================|
| 0 N/A N/A 19635 C python3 478MiB |
| 0 N/A N/A 19713 C python3 478MiB |
| 0 N/A N/A 19900 C python3 352MiB |
+-----------------------------------------------------------------------------------------+
5. HAMi代码分析
以代码commit为例:96adaec66c4472383108de2d18698b47f5e6e553
5.1 hami-scheduler调试环境
因为gpu环境在远程服务器端,所以利用dlv+golang来搭建远程调试环境.大体思路是:
- 在远程gpu服务器上用dlv启动hami-scheduler
- 把hami-scheduler相关的配置指向本地本地dlv启动的hami-scheduler服务
- 在远程gpu服务器上用dlv启动hami-scheduler
root@vgpu:~# nerdctl -n k8s.io cp <hami-scheduler-container-id>:/tls/tls.crt /root/tls/
root@vgpu:~# nerdctl -n k8s.io cp <hami-scheduler-container-id>:/tls/tls.key /root/tls/
dlv exec bin/scheduler --headless -l 0.0.0.0:2345 --api-version=2 -- \
--http_bind=0.0.0.0:9443 \
--cert_file=/root/tls/tls.crt \
--key_file=/root/tls/tls.key \
--scheduler-name=hami-scheduler \
--node-scheduler-policy=binpack \
--gpu-scheduler-policy=spread \
--device-config-file=/root/device-config.yaml
启动参数通过kubectl -n kube-system edit deployments.apps hami-scheduler
可以查看,device-config.yaml
文件通过nerdctl -n k8s.io cp <hami-scheduler-container-id>:/root/device-config.yaml
获取。dlv debug
直接调试源代码并自动编译(适合开发环境),而dlv exec
调试已编译的二进制文件(适合生产环境,但需要编译时包含调试信息 -gcflags=“all=-N -l”),dlv exec
直接作用于二进制,启动较快。HAMi代码库中有提供Makefile
需要移除下-s -w
参数,make build
可以编译出包含调试信息的二进制文件。
- hami-webhook指向本地dlv启动的hami-scheduler服务
(base) root@vgpu:~# kubectl edit mutatingwebhookconfigurations.admissionregistration.k8s.io hami-webhook
# webhook限制强制使用https,复用证书,用127.0.0.1的地址
...
webhooks:
- admissionReviewVersions:
- v1beta1
clientConfig:
url: https://127.0.0.1:9443/webhook
...
- hami-scheduler extender指向本地dlv启动的hami-scheduler服务
(base) root@vgpu:~# kubectl -n kube-system edit cm hami-scheduler-newversion
...
apiVersion: v1
data:
config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: false
profiles:
- schedulerName: hami-scheduler
extenders:
- urlPrefix: "https://<dlv-local-ip>:9443"
filterVerb: filter
bindVerb: bind
nodeCacheCapable: true
weight: 1
httpTimeout: 30s
enableHTTPS: false
tlsConfig:
insecure: true
...
hami-scheduler deployment实际上包含两个container:一个是kube-scheduler原生调度器,一个是hami-scheduler自定义调度器;扩展调度器用的还是废弃的extender机制,并非scheduler framework。exterder机制虽有性能问题,但是使用起来比较简单。
- goland添加Go Remote Debugger指向:2345
5.2 hami-scheduler核心逻辑
hami-scheduler带的命令行参数
func init() {
// 禁用了命令行参数的自动排序,保持参数的定义顺序
rootCmd.Flags().SortFlags = false
rootCmd.PersistentFlags().SortFlags = false
rootCmd.Flags().StringVar(&config.HTTPBind, "http_bind", "127.0.0.1:8080", "http server bind address")
rootCmd.Flags().StringVar(&tlsCertFile, "cert_file", "", "tls cert file")
rootCmd.Flags().StringVar(&tlsKeyFile, "key_file", "", "tls key file")
rootCmd.Flags().StringVar(&config.SchedulerName, "scheduler-name", "", "the name to be added to pod.spec.schedulerName if not empty")
rootCmd.Flags().Int32Var(&config.DefaultMem, "default-mem", 0, "default gpu device memory to allocate")
rootCmd.Flags().Int32Var(&config.DefaultCores, "default-cores", 0, "default gpu core percentage to allocate")
rootCmd.Flags().Int32Var(&config.DefaultResourceNum, "default-gpu", 1, "default gpu to allocate")
rootCmd.Flags().StringVar(&config.NodeSchedulerPolicy, "node-scheduler-policy", util.NodeSchedulerPolicyBinpack.String(), "node scheduler policy")
rootCmd.Flags().StringVar(&config.GPUSchedulerPolicy, "gpu-scheduler-policy", util.GPUSchedulerPolicySpread.String(), "GPU scheduler policy")
rootCmd.Flags().StringVar(&config.MetricsBindAddress, "metrics-bind-address", ":9395", "The TCP address that the scheduler should bind to for serving prometheus metrics(e.g. 127.0.0.1:9395, :9395)")
// 区分了 Flags() 和 PersistentFlags():
// Flags():仅对当前命令有效的参数
// PersistentFlags():对当前命令及其子命令都有效的参数
rootCmd.Flags().StringToStringVar(&config.NodeLabelSelector, "node-label-selector", nil, "key=value pairs separated by commas")
rootCmd.PersistentFlags().AddGoFlagSet(device.GlobalFlagSet())
rootCmd.AddCommand(version.VersionCmd)
rootCmd.Flags().AddGoFlagSet(util.InitKlogFlags())
}
// 其它的全局命令行参数
func GlobalFlagSet() *flag.FlagSet {
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
ascend.ParseConfig(fs)
cambricon.ParseConfig(fs)
hygon.ParseConfig(fs)
iluvatar.ParseConfig(fs)
nvidia.ParseConfig(fs)
mthreads.ParseConfig(fs)
metax.ParseConfig(fs)
fs.BoolVar(&DebugMode, "debug", false, "debug mode")
fs.StringVar(&configFile, "device-config-file", "", "device config file")
klog.InitFlags(fs)
return fs
}
scheduler启动函数
func start() {
// 根据device-config初始化devices map
device.InitDevices()
// 初始化hami-scheduelr调度器
sher = scheduler.NewScheduler()
// 启动hami-scheduelr调度器
sher.Start()
// 停止调度器
defer sher.Stop()
// 这个函数的主要职责是:
// 1. 持续监控:
// 通过channel和定时器实现持续监控
// 响应节点变更事件和定时检查
// 2. 设备健康检查:
// 定期检查每个节点上的设备健康状态
// 处理不健康的设备(清理和移除)
// 3. 设备信息更新:
// 维护最新的节点设备信息
// 处理设备握手机制
// 更新调度器的设备缓存
// 4. 资源使用统计:
// 追踪并更新节点资源使用情况
// 这是调度器的核心组件之一,确保调度决策基于最新的节点和设备状态信息。
go sher.RegisterFromNodeAnnotations()
// 暴露metrics监控指标
go initMetrics(config.MetricsBindAddress)
// 启动一个http server,用于处理调度器请求
router := httprouter.New()
// 调度filter阶段请求
router.POST("/filter", routes.PredicateRoute(sher))
// 调度bind阶段请求
router.POST("/bind", routes.Bind(sher))
// webhook请求,实测没有捕获到请求
router.POST("/webhook", routes.WebHookRoute())
// 健康检查请求
router.GET("/healthz", routes.HealthzRoute())
klog.Info("listen on ", config.HTTPBind)
if len(tlsCertFile) == 0 || len(tlsKeyFile) == 0 {
if err := http.ListenAndServe(config.HTTPBind, router); err != nil {
klog.Fatal("Listen and Serve error, ", err)
}
} else {
if err := http.ListenAndServeTLS(config.HTTPBind, tlsCertFile, tlsKeyFile, router); err != nil {
klog.Fatal("Listen and Serve error, ", err)
}
}
}
为什么要初始化GPU设备devices map?通过这种初始化机制,HAMi可以统一管理不同类型的GPU,为上层调度提供统一的设备抽象。需要维护一个设备注册表,记录所有支持的GPU。每个设备都需要实现Devices
接口,提供以下核心功能:
- 资源分配和管理
- 设备健康检查
- 节点打分和筛选
- 设备锁定和释放
- 资源使用统计
// Devices接口定义了GPU设备的核心功能,从目前实现来看支持7种设备:nvidia、ascend、cambricon、hygon、iluvatar、mthreads、metax
type Devices interface {
CommonWord() string
MutateAdmission(ctr *corev1.Container, pod *corev1.Pod) (bool, error)
CheckHealth(devType string, n *corev1.Node) (bool, bool)
NodeCleanUp(nn string) error
GetNodeDevices(n corev1.Node) ([]*util.DeviceInfo, error)
CheckType(annos map[string]string, d util.DeviceUsage, n util.ContainerDeviceRequest) (bool, bool, bool)
// CheckUUID is check current device id whether in GPUUseUUID or GPUNoUseUUID set, return true is check success.
CheckUUID(annos map[string]string, d util.DeviceUsage) bool
LockNode(n *corev1.Node, p *corev1.Pod) error
ReleaseNodeLock(n *corev1.Node, p *corev1.Pod) error
GenerateResourceRequests(ctr *corev1.Container) util.ContainerDeviceRequest
PatchAnnotations(annoinput *map[string]string, pd util.PodDevices) map[string]string
CustomFilterRule(allocated *util.PodDevices, request util.ContainerDeviceRequest, toAllicate util.ContainerDevices, device *util.DeviceUsage) bool
ScoreNode(node *corev1.Node, podDevices util.PodSingleDevice, policy string) float32
AddResourceUsage(n *util.DeviceUsage, ctr *util.ContainerDevice) error
// This should not be associated with a specific device object
//ParseConfig(fs *flag.FlagSet)
}
func InitDevicesWithConfig(config *Config) {
// 初始化devices map
devices = make(map[string]Devices)
DevicesToHandle = []string{}
// 初始化nvidia设备
devices[nvidia.NvidiaGPUDevice] = nvidia.InitNvidiaDevice(config.NvidiaConfig)
// 初始化cambricon设备
devices[cambricon.CambriconMLUDevice] = cambricon.InitMLUDevice(config.CambriconConfig)
// 初始化hygon设备
devices[hygon.HygonDCUDevice] = hygon.InitDCUDevice(config.HygonConfig)
// 初始化iluvatar设备
devices[iluvatar.IluvatarGPUDevice] = iluvatar.InitIluvatarDevice(config.IluvatarConfig)
// 初始化mthreads设备
devices[mthreads.MthreadsGPUDevice] = mthreads.InitMthreadsDevice(config.MthreadsConfig)
// 初始化metax设备
devices[metax.MetaxGPUDevice] = metax.InitMetaxDevice(config.MetaxConfig)
// CommonWord函数,看起来像是设备别名转换
DevicesToHandle = append(DevicesToHandle, nvidia.NvidiaGPUCommonWord)
DevicesToHandle = append(DevicesToHandle, cambricon.CambriconMLUCommonWord)
DevicesToHandle = append(DevicesToHandle, hygon.HygonDCUCommonWord)
DevicesToHandle = append(DevicesToHandle, iluvatar.IluvatarGPUCommonWord)
DevicesToHandle = append(DevicesToHandle, mthreads.MthreadsGPUCommonWord)
DevicesToHandle = append(DevicesToHandle, metax.MetaxGPUCommonWord)
for _, dev := range ascend.InitDevices(config.VNPUs) {
devices[dev.CommonWord()] = dev
DevicesToHandle = append(DevicesToHandle, dev.CommonWord())
}
}
hami也会在集群中运行一个hami-scheduler的调度器,pod调度时,如何区分是集群内原有的调度器还是hami-scheduler?mutatingwebhook
hami-webhook会指定pod调度器为hami-scheduler
// 创建一个webhook,用于处理pod调度请求
func NewWebHook() (*admission.Webhook, error) {
logf.SetLogger(klog.NewKlogr())
schema := runtime.NewScheme()
if err := clientgoscheme.AddToScheme(schema); err != nil {
return nil, err
}
decoder := admission.NewDecoder(schema)
wh := &admission.Webhook{Handler: &webhook{decoder: decoder}}
return wh, nil
}
// webhook结构体实现了Handler interface
func (h *webhook) Handle(_ context.Context, req admission.Request) admission.Response {
pod := &corev1.Pod{}
// 解码请求,获取pod信息
err := h.decoder.Decode(req, pod)
if err != nil {
klog.Errorf("Failed to decode request: %v", err)
return admission.Errored(http.StatusBadRequest, err)
}
// 检查pod是否有容器
if len(pod.Spec.Containers) == 0 {
klog.Warningf(template+" - Denying admission as pod has no containers", req.Namespace, req.Name, req.UID)
return admission.Denied("pod has no containers")
}
klog.Infof(template, req.Namespace, req.Name, req.UID)
hasResource := false
for idx, ctr := range pod.Spec.Containers {
c := &pod.Spec.Containers[idx]
// 检查容器是否为特权容器
if ctr.SecurityContext != nil {
if ctr.SecurityContext.Privileged != nil && *ctr.SecurityContext.Privileged {
klog.Warningf(template+" - Denying admission as container %s is privileged", req.Namespace, req.Name, req.UID, c.Name)
continue
}
}
// 检查容器是否为vgpu容器
for _, val := range device.GetDevices() {
// 调用设备接口的MutateAdmission方法,检查容器是否为vgpu容器
found, err := val.MutateAdmission(c, pod)
if err != nil {
klog.Errorf("validating pod failed:%s", err.Error())
return admission.Errored(http.StatusInternalServerError, err)
}
hasResource = hasResource || found
}
}
if !hasResource {
klog.Infof(template+" - Allowing admission for pod: no resource found", req.Namespace, req.Name, req.UID)
//return admission.Allowed("no resource found")
} else if len(config.SchedulerName) > 0 {
// 如果配置了scheduler-name,则设置pod调度器为hami-scheduler
pod.Spec.SchedulerName = config.SchedulerName
if pod.Spec.NodeName != "" {
klog.Infof(template+" - Pod already has node assigned", req.Namespace, req.Name, req.UID)
return admission.Denied("pod has node assigned")
}
}
marshaledPod, err := json.Marshal(pod)
if err != nil {
klog.Errorf(template+" - Failed to marshal pod, error: %v", req.Namespace, req.Name, req.UID, err)
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.Object.Raw, marshaledPod)
}
vgpu pod的annotation是什么时候添加的?在scheduler filter阶段添加
// filter核心函数
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace)
// 用于统计Pod中容器的设备资源请求
nums := k8sutil.Resourcereqs(args.Pod)
total := 0
for _, n := range nums {
for _, k := range n {
total += int(k.Nums)
}
}
if total == 0 {
// 如果Pod中没有请求任何资源,则记录调度失败事件
klog.V(1).Infof("pod %v not find resource", args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
return &extenderv1.ExtenderFilterResult{
NodeNames: args.NodeNames,
FailedNodes: nil,
Error: "",
}, nil
}
annos := args.Pod.Annotations
// 从podManager维护的pods map中删除pod
s.delPod(args.Pod)
// 调度器中的重要函数,用于获取节点资源使用情况
nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)
if err != nil {
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
return nil, err
}
if len(failedNodes) != 0 {
klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes)
}
// 计算节点得分的函数
// 1. 获取节点调度策略
// 2. 遍历所有可用节点
// 3. 计算每个节点的基础得分
// 4. 检查容器的设备需求适配性
// 5. 根据策略调整最终得分
// 6. 记录符合要求的节点得分
// 计算逻辑说明:
// 分别计算三个维度的使用率:设备、核心、显存
// 每个维度的得分是使用量与总量的比值
// 最终得分是三个维度得分之和乘以权重
// 适配性检查,检查节点上的设备是否能满足容器的资源请求;如果适配成功,会在score.Devices中记录设备的分配方案
// fit, _ := fitInDevices(
// node, // 节点信息(包含该节点上的设备状态)
// n, // 容器的设备请求数量
// annos, // Pod 的注解信息
// task, // 待调度的 Pod
// &score.Devices // 输出参数:记录设备分配结果
// )
nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod, failedNodes)
if err != nil {
err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
return nil, err
}
// 如果所有节点得分都不符合要求,则记录调度失败事件
if len((*nodeScores).NodeList) == 0 {
klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
return &extenderv1.ExtenderFilterResult{
FailedNodes: failedNodes,
}, nil
}
klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
// 对节点得分进行排序
sort.Sort(nodeScores)
// 获取得分最高的节点
m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)
annotations := make(map[string]string)
// 添加调度节点和调度时间到pod的annotation中
annotations[util.AssignedNodeAnnotations] = m.NodeID
// 添加调度时间到pod的annotation中
annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)
for _, val := range device.GetDevices() {
val.PatchAnnotations(&annotations, m.Devices)
}
//InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
//supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
//maps.Copy(annotations, InRequestDevices)
//maps.Copy(annotations, supportDevices)
// 将pod添加到podManager维护的pods map中
s.addPod(args.Pod, m.NodeID, m.Devices)
// 更新pod的annotation
err = util.PatchPodAnnotations(args.Pod, annotations)
if err != nil {
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
s.delPod(args.Pod)
return nil, err
}
// 记录调度成功事件
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
// 返回调度结果
res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
return &res, nil
}
除了filter阶段,scheduler还有bind阶段,bind阶段是带有资源锁定的安全绑定过程,确保设备资源不会被重复分配,并在失败时能够正确清理资源。
hami-scheduler本身也是个controller,这个controller监听什么资源,然后reconcile处理什么逻辑?
func NewScheduler() *Scheduler {
klog.Info("New Scheduler")
// 初始化scheduler
s := &Scheduler{
stopCh: make(chan struct{}),
// 维护一个缓存map,保存节点GPU设备使用情况
cachedstatus: make(map[string]*NodeUsage),
nodeNotify: make(chan struct{}, 1),
}
// 初始化nodeManager,维护一个nodes map,key为node_id,value为节点GPU设备的信息
s.nodeManager.init()
// 初始化podManager,维护一个pods map,key为pod_id,value为pod GPU设备信息
s.podManager.init()
return s
}
// 启动controller
func (s *Scheduler) Start() {
// 初始化k8s clientSet
kubeClient, err := k8sutil.NewClient()
check(err)
s.kubeClient = kubeClient
// 初始化informerFactory,用于监听k8s资源变化
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.kubeClient, time.Hour*1)
// 初始化podLister,用于获取pod信息
s.podLister = informerFactory.Core().V1().Pods().Lister()
// 初始化nodeLister,用于获取node信息
s.nodeLister = informerFactory.Core().V1().Nodes().Lister()
// 初始化podInformer,用于监听pod资源变化
informer := informerFactory.Core().V1().Pods().Informer()
// 添加pod资源事件处理函数
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// onAddPod 处理 Pod 添加事件
// 1. 验证对象类型是否为 Pod
// 2. 检查 Pod 是否有指定节点的注解
// 3. 如果 Pod 处于终止状态则删除
// 4. 解析 Pod 的设备注解并添加到podManager维护的pods map中
AddFunc: s.onAddPod,
// onUpdatePod 处理 Pod 更新事件,走的还是onAddPod逻辑
UpdateFunc: s.onUpdatePod,
// onDelPod 处理 Pod 删除事件,从podManager维护的pods map中删除pod
DeleteFunc: s.onDelPod,
})
// 初始化nodeInformer,用于监听node资源变化
informerFactory.Core().V1().Nodes().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// 这三个事件函数都是向nodeNotify channel发送一个空结构体信号
// 这个设计的关键点:
// 1. 通知机制:
// nodeNotify是一个 channel,用于发送通知信号
// struct{}{}是一个空结构体,在Go中常用作信号传递(因为它不占用内存)
// 2. 触发更新:
// 这些函数与RegisterFromNodeAnnotations()方法配合工作
// 当节点发生任何变化时,都会触发重新扫描和同步节点状态
// 3. 统一处理:
// 所有节点事件(添加/更新/删除)都触发相同的通知
// 实际的节点状态更新逻辑在接收通知的地方统一处理
AddFunc: s.onAddNode,
UpdateFunc: s.onUpdateNode,
DeleteFunc: s.onDelNode,
})
// 启动informerFactory,开始监听k8s资源变化
informerFactory.Start(s.stopCh)
// 等待informerFactory缓存同步完成
informerFactory.WaitForCacheSync(s.stopCh)
// 添加其它的事件处理函数
s.addAllEventHandlers()
}
5.3 hami-device-plugin调试环境
远程gpu服务器上dlv启动hami-device-plugin,本地goland添加Go Remote Debugger指向:2346
NODE_NAME=vgpu NVIDIA_MIG_MONITOR_DEVICES=all HOOK_PATH=/usr/local \
dlv exec bin/nvidia-device-plugin --headless -l 0.0.0.0:2346 --api-version=2 -- \
--config-file=/root/device-config.yaml \
--mig-strategy=none \
--disable-core-limit=false \
-v=false
5.4 hami-device-plugin核心逻辑
在了解hami nvidia-device-plugin前,需要了解下nvidia-runtime、GPU Operator、k8s device plugin等
- nvidia-runtime:对应Docker环境,安装nvidia-container-toolkit组件,docker配置使用nvidia-runtime,启动容器时增加
--gpu
参数;nvidia-container-toolkit根据NVIDIA_VISIBLE_DEVICES
环境变量将GPU、驱动等相关文件挂载到容器里。 - k8s device plugin:对应k8s环境,Nvidia提供的一种k8s GPU实现方案
- GPU Operator:对应k8s环境,Nvidia提供的一种在Kubernetes环境中简化使用GPU的部署方案
device-plugin原理:device plugin学习笔记
实现一个device-plugin大致分为两部分:插件注册、kubelet调用插件(grpc调用,device-plugin做为grpc server启动)
// DevicePluginServer是device-plugin的server接口,定义了device-plugin需要实现的方法
// DevicePluginServer is the server API for DevicePlugin service.
type DevicePluginServer interface {
// GetDevicePluginOptions returns options to be communicated with Device
// Manager
GetDevicePluginOptions(context.Context, *Empty) (*DevicePluginOptions, error)
// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
ListAndWatch(*Empty, DevicePlugin_ListAndWatchServer) error
// GetPreferredAllocation returns a preferred set of devices to allocate
// from a list of available ones. The resulting preferred allocation is not
// guaranteed to be the allocation ultimately performed by the
// devicemanager. It is only designed to help the devicemanager make a more
// informed allocation decision when possible.
GetPreferredAllocation(context.Context, *PreferredAllocationRequest) (*PreferredAllocationResponse, error)
// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
Allocate(context.Context, *AllocateRequest) (*AllocateResponse, error)
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as resetting the device before making devices available to the container
PreStartContainer(context.Context, *PreStartContainerRequest) (*PreStartContainerResponse, error)
}
虽然接口定义了5个方法,但是HAMi nvidia-device-plugin只实现了Allocate、ListAndWatch,外加一个Register方法,所以重点关注这三个实现
- Register:插件注册到kubelet
- ListAndWatch:获取GPU设备信息,并上报给kubelet
- Allocate:分配GPU设备给容器
device-plugin启动函数
func start(c *cli.Context, flags []cli.Flag) error {
klog.Info("Starting FS watcher.")
// 获取节点名称
util.NodeName = os.Getenv(util.NodeNameEnvName)
// 创建一个文件系统监视器,用于监听kubelet设备插件路径的变化
watcher, err := newFSWatcher(kubeletdevicepluginv1beta1.DevicePluginPath)
if err != nil {
return fmt.Errorf("failed to create FS watcher: %v", err)
}
defer watcher.Close()
//device.InitDevices()
/*Loading config files*/
klog.Infof("Start working on node %s", util.NodeName)
klog.Info("Starting OS watcher.")
// 创建一个信号监听器,用于监听SIGHUP、SIGINT、SIGTERM、SIGQUIT信号
sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
var restarting bool
var restartTimeout <-chan time.Time
var plugins []plugin.Interface
restart:
// If we are restarting, stop plugins from previous run.
if restarting {
err := stopPlugins(plugins)
if err != nil {
return fmt.Errorf("error stopping plugins from previous run: %v", err)
}
}
klog.Info("Starting Plugins.")
// 启动插件
plugins, restartPlugins, err := startPlugins(c, flags, restarting)
if err != nil {
return fmt.Errorf("error starting plugins: %v", err)
}
if restartPlugins {
klog.Info("Failed to start one or more plugins. Retrying in 30s...")
// 设置重启超时时间
restartTimeout = time.After(30 * time.Second)
}
// 设置重启标志
restarting = true
// Start an infinite loop, waiting for several indicators to either log
// some messages, trigger a restart of the plugins, or exit the program.
for {
select {
// If the restart timeout has expired, then restart the plugins
case <-restartTimeout:
goto restart
// Detect a kubelet restart by watching for a newly created
// 'kubeletdevicepluginv1beta1.KubeletSocket' file. When this occurs, restart this loop,
// restarting all of the plugins in the process.
case event := <-watcher.Events:
// 监听kubelet设备插件路径的变化
if event.Name == kubeletdevicepluginv1beta1.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
klog.Infof("inotify: %s created, restarting.", kubeletdevicepluginv1beta1.KubeletSocket)
goto restart
}
// Watch for any other fs errors and log them.
case err := <-watcher.Errors:
klog.Errorf("inotify: %s", err)
// Watch for any signals from the OS. On SIGHUP, restart this loop,
// restarting all of the plugins in the process. On all other
// signals, exit the loop and exit the program.
case s := <-sigs:
switch s {
case syscall.SIGHUP:
klog.Info("Received SIGHUP, restarting.")
goto restart
default:
klog.Infof("Received signal \"%v\", shutting down.", s)
goto exit
}
}
}
exit:
// 停止插件
err = stopPlugins(plugins)
if err != nil {
return fmt.Errorf("error stopping plugins: %v", err)
}
return nil
}
启动函数除了启动插件外,还启动监视器来监听/var/lib/kubelet/device-plugins/kubelet.sock
的变化,每次重启kubelet,kubelet.sock都会被重新创建,kubelet是通过map来存储device-plugin的注册信息,所以每次kubelet重启,device-plugin都需要重新注册(也跟着重启)
HAMi作为GPU管理框架,针对插件的启动、停止,自身抽象了一套接口:
// Interface defines the API for the plugin package
type Interface interface {
Devices() rm.Devices
Start() error
Stop() error
}
插件Start实现包含:plugin.Serve、plugin.Register、plugin.WatchAndRegister
// 在插件注册前,plugin.Serve会先启动插件grpc server
// Serve starts the gRPC server of the device plugin.
func (plugin *NvidiaDevicePlugin) Serve() error {
// 删除已存在的socket文件
os.Remove(plugin.socket)
// 监听socket文件
sock, err := net.Listen("unix", plugin.socket)
if err != nil {
return err
}
// 注册设备插件的gRPC服务器
kubeletdevicepluginv1beta1.RegisterDevicePluginServer(plugin.server, plugin)
// 启动grpc server
go func() {
lastCrashTime := time.Now()
restartCount := 0
for {
klog.Infof("Starting GRPC server for '%s'", plugin.rm.Resource())
err := plugin.server.Serve(sock)
if err == nil {
break
}
klog.Infof("GRPC server for '%s' crashed with error: %v", plugin.rm.Resource(), err)
// restart if it has not been too often
// i.e. if server has crashed more than 5 times and it didn't last more than one hour each time
if restartCount > 5 {
// quit
klog.Fatalf("GRPC server for '%s' has repeatedly crashed recently. Quitting", plugin.rm.Resource())
}
timeSinceLastCrash := time.Since(lastCrashTime).Seconds()
lastCrashTime = time.Now()
if timeSinceLastCrash > 3600 {
// it has been one hour since the last crash.. reset the count
// to reflect on the frequency
restartCount = 1
} else {
restartCount++
}
}
}()
// 验证连接到device-plugin
conn, err := plugin.dial(plugin.socket, 5*time.Second)
if err != nil {
return err
}
// 关闭连接
conn.Close()
return nil
}
plugin.Register实现
// Register registers the device plugin for the given resourceName with Kubelet.
func (plugin *NvidiaDevicePlugin) Register() error {
// 连接到kubelet
conn, err := plugin.dial(kubeletdevicepluginv1beta1.KubeletSocket, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
// 创建一个注册客户端
client := kubeletdevicepluginv1beta1.NewRegistrationClient(conn)
// 创建一个注册请求
reqt := &kubeletdevicepluginv1beta1.RegisterRequest{
Version: kubeletdevicepluginv1beta1.Version,
Endpoint: path.Base(plugin.socket),
// 资源名称
ResourceName: string(plugin.rm.Resource()),
// 设备插件选项
Options: &kubeletdevicepluginv1beta1.DevicePluginOptions{
GetPreferredAllocationAvailable: false,
},
}
// 发送注册请求
_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}
plugin.WatchAndRegister实现
func (plugin *NvidiaDevicePlugin) WatchAndRegister() {
klog.Info("Starting WatchAndRegister")
errorSleepInterval := time.Second * 5
successSleepInterval := time.Second * 30
for {
// annotation格式形如:
// annotations:
// hami.io/node-nvidia-register: 'GPU-a8243209-6b70-5b3d-de52-1aaafc1495fc,10,24576,100,NVIDIA-NVIDIA GeForce RTX 3090,0,true,0,:'
// 通过nvml库获取GPU信息,并更新到Node的annotation上
err := plugin.RegistrInAnnotation()
if err != nil {
klog.Errorf("Failed to register annotation: %v", err)
klog.Infof("Retrying in %v seconds...", errorSleepInterval)
time.Sleep(errorSleepInterval)
} else {
klog.Infof("Successfully registered annotation. Next check in %v seconds...", successSleepInterval)
time.Sleep(successSleepInterval)
}
}
}
这里device-plugin是直接和k8s-apiserver交互,更新Node信息,hami.io/node-nvidia-register
的annotation信息,后面hami-scheduler的RegisterFromNodeAnnotations
函数会根据annotation信息进行调度
插件ListAndWatch实现(启动nvidi-device-plugin时进入)
// ListAndWatch lists devices and update that list according to the health status
func (plugin *NvidiaDevicePlugin) ListAndWatch(e *kubeletdevicepluginv1beta1.Empty, s kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
// 发送GPU设备信息给kubelet
s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
for {
select {
case <-plugin.stop:
return nil
case d := <-plugin.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = kubeletdevicepluginv1beta1.Unhealthy
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
// 发送GPU设备信息给kubelet
s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
}
}
}
// plugin.apiDevices最终会进入到GetPluginDevices函数
// GetPluginDevices returns the plugin Devices from all devices in the Devices
func (ds Devices) GetPluginDevices(count uint) []*kubeletdevicepluginv1beta1.Device {
var res []*kubeletdevicepluginv1beta1.Device
// 如果GPU不是MIG模式,则需要根据count,复制多个device,每个device的ID不同
if !strings.Contains(ds.GetIDs()[0], "MIG") {
for _, dev := range ds {
// count取值来自--device-split-count参数,单卡同时运行多少个任务,一般推荐10以上
for i := uint(0); i < count; i++ {
id := fmt.Sprintf("%v-%v", dev.ID, i)
// 根据count,复制多个device,每个device的ID不同
res = append(res, &kubeletdevicepluginv1beta1.Device{
ID: id,
Health: dev.Health,
Topology: nil,
})
}
}
} else {
// 如果GPU是MIG模式,则直接返回所有GPU设备信息
for _, d := range ds {
res = append(res, &d.Device)
}
}
// 返回GPU设备信息
return res
}
这里ds Devices
参数是从哪里传入的?在pluginManager初始化的时候,最终b.buildGPUDeviceMap
传入的
# mig-strategies为none时
pluginManager.GetPlugins -> rm.NewNVMLResourceManagers -> NewDeviceMap -> b.build
-> b.buildDeviceMapFromConfigResources -> b.buildGPUDeviceMap
NVIDIA设备插件中用于构建GPU设备映射的函数
// buildGPUDeviceMap builds a map of resource names to GPU devices
func (b *deviceMapBuilder) buildGPUDeviceMap() (DeviceMap, error) {
// 创建一个空的设备映射
devices := make(DeviceMap)
// 遍历所有 GPU 设备
b.VisitDevices(func(i int, gpu device.Device) error {
// 获取 GPU 的产品名称
name, ret := gpu.GetName()
if ret != nvml.SUCCESS {
return fmt.Errorf("error getting product name for GPU: %v", ret)
}
// 检查 GPU 是否启用了 MIG (Multi-Instance GPU) 功能
migEnabled, err := gpu.IsMigEnabled()
if err != nil {
return fmt.Errorf("error checking if MIG is enabled on GPU: %v", err)
}
// 如果 GPU 启用了 MIG 且 MIG 策略不是 "None",则跳过这个 GPU
// 因为 MIG 启用的 GPU 将由 buildMigDeviceMap 函数处理
if migEnabled && *b.config.Flags.MigStrategy != spec.MigStrategyNone {
return nil
}
// 遍历配置中定义的 GPU 资源
for _, resource := range b.config.Resources.GPUs {
// 检查 GPU 名称是否匹配资源模式
if resource.Pattern.Matches(name) {
// 如果匹配,创建新的 GPU 设备条目
index, info := newGPUDevice(i, gpu)
return devices.setEntry(resource.Name, index, info)
}
}
// 如果 GPU 名称不匹配任何资源模式,返回错误
return fmt.Errorf("GPU name '%v' does not match any resource patterns", name)
})
return devices, nil
}
插件Allocate实现(pod挂载GPU设备时进入)
// Allocate which return list of devices.
func (plugin *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *kubeletdevicepluginv1beta1.AllocateRequest) (*kubeletdevicepluginv1beta1.AllocateResponse, error) {
klog.InfoS("Allocate", "request", reqs)
// 创建一个AllocateResponse对象
responses := kubeletdevicepluginv1beta1.AllocateResponse{}
// 获取当前节点名称
nodename := os.Getenv(util.NodeNameEnvName)
// 获取当前挂载的pod
current, err := util.GetPendingPod(ctx, nodename)
if err != nil {
//nodelock.ReleaseNodeLock(nodename, NodeLockNvidia, current)
return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
}
klog.Infof("Allocate pod name is %s/%s, annotation is %+v", current.Namespace, current.Name, current.Annotations)
// 遍历pod的容器请求
for idx, req := range reqs.ContainerRequests {
// If the devices being allocated are replicas, then (conditionally)
// error out if more than one resource is being allocated.
// 如果请求的设备ID包含MIG,则需要检查是否启用了时间切片,并且请求的设备ID数量大于1
if strings.Contains(req.DevicesIDs[0], "MIG") {
if plugin.config.Sharing.TimeSlicing.FailRequestsGreaterThanOne && rm.AnnotatedIDs(req.DevicesIDs).AnyHasAnnotations() {
if len(req.DevicesIDs) > 1 {
device.PodAllocationFailed(nodename, current, NodeLockNvidia)
return nil, fmt.Errorf("request for '%v: %v' too large: maximum request size for shared resources is 1", plugin.rm.Resource(), len(req.DevicesIDs))
}
}
for _, id := range req.DevicesIDs {
if !plugin.rm.Devices().Contains(id) {
device.PodAllocationFailed(nodename, current, NodeLockNvidia)
return nil, fmt.Errorf("invalid allocation request for '%s': unknown device: %s", plugin.rm.Resource(), id)
}
}
response, err := plugin.getAllocateResponse(req.DevicesIDs)
if err != nil {
device.PodAllocationFailed(nodename, current, NodeLockNvidia)
return nil, fmt.Errorf("failed to get allocate response: %v", err)
}
responses.ContainerResponses = append(responses.ContainerResponses, response)
} else {
// 如果请求的设备ID不包含MIG,则需要获取下一个设备请求
currentCtr, devreq, err := GetNextDeviceRequest(nvidia.NvidiaGPUDevice, *current)
klog.Infoln("deviceAllocateFromAnnotation=", devreq)
if err != nil {
device.PodAllocationFailed(nodename, current, NodeLockNvidia)
return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
}
// 检查设备数量是否匹配
if len(devreq) != len(reqs.ContainerRequests[idx].DevicesIDs) {
device.PodAllocationFailed(nodename, current, NodeLockNvidia)
return &kubeletdevicepluginv1beta1.AllocateResponse{}, errors.New("device number not matched")
}
// 获取容器设备字符串数组,通过设置NVIDIA_VISIBLE_DEVICES环境变量,将GPU设备ID传递给容器
response, err := plugin.getAllocateResponse(plugin.GetContainerDeviceStrArray(devreq))
if err != nil {
return nil, fmt.Errorf("failed to get allocate response: %v", err)
}
// 清除下一个设备类型从annotation
err = EraseNextDeviceTypeFromAnnotation(nvidia.NvidiaGPUDevice, *current)
if err != nil {
device.PodAllocationFailed(nodename, current, NodeLockNvidia)
return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
}
// 如果操作模式不是MIG,则需要设置CUDA_DEVICE_MEMORY_LIMIT、CUDA_DEVICE_SM_LIMIT、CUDA_DEVICE_MEMORY_SHARED_CACHE、CUDA_OVERSUBSCRIBE、CUDA_DISABLE_CONTROL等环境变量
if plugin.operatingMode != "mig" {
for i, dev := range devreq {
limitKey := fmt.Sprintf("CUDA_DEVICE_MEMORY_LIMIT_%v", i)
response.Envs[limitKey] = fmt.Sprintf("%vm", dev.Usedmem)
}
response.Envs["CUDA_DEVICE_SM_LIMIT"] = fmt.Sprint(devreq[0].Usedcores)
response.Envs["CUDA_DEVICE_MEMORY_SHARED_CACHE"] = fmt.Sprintf("%s/vgpu/%v.cache", hostHookPath, uuid.New().String())
if plugin.schedulerConfig.DeviceMemoryScaling > 1 {
response.Envs["CUDA_OVERSUBSCRIBE"] = "true"
}
if plugin.schedulerConfig.DisableCoreLimit {
response.Envs[util.CoreLimitSwitch] = "disable"
}
// 删除容器缓存目录
cacheFileHostDirectory := fmt.Sprintf("%s/vgpu/containers/%s_%s", hostHookPath, current.UID, currentCtr.Name)
os.RemoveAll(cacheFileHostDirectory)
// 创建容器缓存目录
os.MkdirAll(cacheFileHostDirectory, 0777)
os.Chmod(cacheFileHostDirectory, 0777)
// 创建vgpulock目录
os.MkdirAll("/tmp/vgpulock", 0777)
// 设置vgpulock目录权限
os.Chmod("/tmp/vgpulock", 0777)
response.Mounts = append(response.Mounts,
// 挂载libvgpu.so
&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu/libvgpu.so", hostHookPath),
HostPath: hostHookPath + "/vgpu/libvgpu.so",
ReadOnly: true},
// 挂载容器缓存目录
&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu", hostHookPath),
HostPath: cacheFileHostDirectory,
ReadOnly: false},
// 挂载vgpulock目录
&kubeletdevicepluginv1beta1.Mount{ContainerPath: "/tmp/vgpulock",
HostPath: "/tmp/vgpulock",
ReadOnly: false},
)
// 检查CUDA_DISABLE_CONTROL环境变量是否存在
found := false
for _, val := range currentCtr.Env {
// 如果环境变量是CUDA_DISABLE_CONTROL,则跳过
if strings.Compare(val.Name, "CUDA_DISABLE_CONTROL") == 0 {
// if env existed but is set to false or can not be parsed, ignore
// 将环境变量值转换为布尔值
t, _ := strconv.ParseBool(val.Value)
// 如果环境变量值为false,则跳过
if !t {
continue
}
// only env existed and set to true, we mark it "found"
found = true
break
}
}
// 如果CUDA_DISABLE_CONTROL环境变量不存在,则需要预加载链接库
if !found {
// 预加载链接库
response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/etc/ld.so.preload",
HostPath: hostHookPath + "/vgpu/ld.so.preload",
ReadOnly: true},
)
}
// 检查vgpu license是否存在
_, err = os.Stat(fmt.Sprintf("%s/vgpu/license", hostHookPath))
if err == nil {
// 挂载vgpu license
response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
ContainerPath: "/tmp/license",
HostPath: fmt.Sprintf("%s/vgpu/license", hostHookPath),
ReadOnly: true,
})
// 挂载vgpuvalidator
response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
ContainerPath: "/usr/bin/vgpuvalidator",
HostPath: fmt.Sprintf("%s/vgpu/vgpuvalidator", hostHookPath),
ReadOnly: true,
})
}
}
// 将响应添加到响应列表中
responses.ContainerResponses = append(responses.ContainerResponses, response)
}
}
klog.Infoln("Allocate Response", responses.ContainerResponses)
// 标记pod分配成功
device.PodAllocationTrySuccess(nodename, nvidia.NvidiaGPUDevice, NodeLockNvidia, current)
return &responses, nil
}
这些环境变量如何生效? cuda天生支持的?是HAMi-core能够解析自定义的CUDA_xxx
环境变量,而NVIDIA_VISIBLE_DEVICES
是nvidia-container-toolkit支持的
NVIDIA_VISIBLE_DEVICES -> GPU-a8243209-6b70-5b3d-de52-1aaafc1495fc
CUDA_DEVICE_MEMORY_LIMIT_0 -> 3000m
CUDA_DEVICE_SM_LIMIT -> 25
CUDA_DEVICE_MEMORY_SHARED_CACHE -> /usr/local/vgpu/55202346-8648-4a49-9037-51b28eeb74ba.cache
libvgpu.so替换完为什么就生效? 在Linux下预加载链接库有好几种方式,动态链接库加载优先级从高到低:
LD_PRELOAD > /etc/ld.so.preload > LD_LIBRARY_PATH > /etc/ld.so.conf.d/.conf > 默认系统路径(/lib, /usr/lib)
通过/etc/ld.so.preload定义,确保libvgpu.so肯定会被加载。cuda api如何劫持?这部分逻辑定义在HAMi-core
实现里
5.5 vGPUMonitor调试环境
远程gpu服务器上dlv启动vGPUMonitor,本地goland添加Go Remote Debugger指向:2347
KUBECONFIG=/root/.kube/config \
NODE_NAME=vgpu \
NVIDIA_VISIBLE_DEVICES=all \
NVIDIA_MIG_MONITOR_DEVICES=all \
HOOK_PATH=/usr/local/vgpu \
dlv exec bin/vGPUmonitor --headless -l 0.0.0.0:2347 --api-version=2
5.6 vGPUMonitor核心逻辑
启动函数,程序的主要功能是监控vGPU的使用情况,并提供相应的指标收集和反馈机制
func main() {
// 校验环境变量HOOK_PATH必须存在
if err := ValidateEnvVars(); err != nil {
klog.Fatalf("Failed to validate environment variables: %v", err)
}
// 创建一个容器列表器实例,用于跟踪和管理GPU容器Usage
containerLister, err := nvidia.NewContainerLister()
if err != nil {
klog.Fatalf("Failed to create container lister: %v", err)
}
cgroupDriver = 0
// 创建一个错误通道用于错误处理, 用于在不同goroutine间传递错误信息(看起来没有意义了,因为main函数里没有使用)
errchannel := make(chan error)
//go serveInfo(errchannel)
// 初始化vGPUmonitor metrics
go initMetrics(containerLister)
// 持续监控容器使用情况
go watchAndFeedback(containerLister)
// 等待错误通道传递错误信息,errchannel实际上没用到
for {
err := <-errchannel
klog.Errorf("failed to serve: %v", err)
}
}
主要使用到的结构体
// ContainerLister结构体,用于存储容器列表使用信息
type ContainerLister struct {
containerPath string
// map存储容器使用信息,key为<podUID_containerName>
containers map[string]*ContainerUsage
// 互斥锁,用于保护containers map的并发访问
mutex sync.Mutex
// k8s clientSet
clientset *kubernetes.Clientset
}
// ContainerUsage结构体,用于存储单个容器使用信息
type ContainerUsage struct {
// Pod 的唯一标识符
PodUID string
// 容器名称
ContainerName string
// 容器使用信息
data []byte
Info UsageInfo
}
// UsageInfo接口,定义了获取和设置NVIDIA GPU设备使用情况的方法
type UsageInfo interface {
// DeviceMax 返回系统中支持的最大GPU设备数量
DeviceMax() int
// DeviceNum 返回当前实际使用的GPU设备数量
DeviceNum() int
// DeviceMemoryContextSize 返回指定GPU设备的上下文内存大小
DeviceMemoryContextSize(idx int) uint64
// DeviceMemoryModuleSize 返回指定GPU设备的模块内存大小
DeviceMemoryModuleSize(idx int) uint64
// DeviceMemoryBufferSize 返回指定GPU设备的缓冲区内存大小
DeviceMemoryBufferSize(idx int) uint64
// DeviceMemoryOffset 返回指定GPU设备的内存偏移量
DeviceMemoryOffset(idx int) uint64
// DeviceMemoryTotal 返回指定GPU设备的总内存大小
DeviceMemoryTotal(idx int) uint64
// DeviceSmUtil 返回指定GPU设备的SM(Streaming Multiprocessor)利用率
DeviceSmUtil(idx int) uint64
// SetDeviceSmLimit 设置GPU设备的SM使用限制
SetDeviceSmLimit(l uint64)
// IsValidUUID 检查指定GPU设备的UUID是否有效
IsValidUUID(idx int) bool
// DeviceUUID 返回指定GPU设备的UUID
DeviceUUID(idx int) string
// DeviceMemoryLimit 返回指定GPU设备的内存使用限制
DeviceMemoryLimit(idx int) uint64
// SetDeviceMemoryLimit 设置GPU设备的内存使用限制
SetDeviceMemoryLimit(l uint64)
// LastKernelTime 返回最后一次内核执行的时间戳
LastKernelTime() int64
// UsedMemory 返回指定GPU设备的已使用内存(当前已注释)
//UsedMemory(idx int) (uint64, error)
// GetPriority 返回GPU任务的优先级
GetPriority() int
// GetRecentKernel 获取最近的内核执行状态
GetRecentKernel() int32
// SetRecentKernel 设置最近的内核执行状态
SetRecentKernel(v int32)
// GetUtilizationSwitch 获取利用率开关状态
GetUtilizationSwitch() int32
// SetUtilizationSwitch 设置利用率开关状态
SetUtilizationSwitch(v int32)
}
容器列表器实例,如何更新containers map?
func (l *ContainerLister) Update() error {
// 是否设置NODE_NAME环境变量
nodename := os.Getenv(util.NodeNameEnvName)
if nodename == "" {
return fmt.Errorf("env %s not set", util.NodeNameEnvName)
}
// 获取指定节点上的所有Pod
pods, err := l.clientset.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodename),
})
if err != nil {
return err
}
// 加锁,保护containers map的并发访问
l.mutex.Lock()
defer l.mutex.Unlock()
// 读取容器目录下的所有文件
entries, err := os.ReadDir(l.containerPath)
if err != nil {
return err
}
// 遍历容器目录下的所有文件
for _, entry := range entries {
if !entry.IsDir() {
continue
}
// 获取容器目录的完整路径
// containerPath: /usr/local/vgpu/containers
// entry.Name(): 305df464-2a9e-485d-9c80-f2901a9259cc_pytorch-container
dirName := filepath.Join(l.containerPath, entry.Name())
// 检查容器是否存在,如果不存在的话
if !isValidPod(entry.Name(), pods) {
// 获取容器目录的元数据
dirInfo, err := os.Stat(dirName)
// 如果容器目录存在,并且修改时间在300秒内,则跳过
if err == nil && dirInfo.ModTime().Add(time.Second*300).After(time.Now()) {
continue
}
klog.Infof("Removing dirname %s in monitorpath", dirName)
// 判断map中是否存在该容器
if c, ok := l.containers[entry.Name()]; ok {
// 如果存在,则释放内存
syscall.Munmap(c.data)
// 删除map中的该容器
delete(l.containers, entry.Name())
}
// 删除容器目录
_ = os.RemoveAll(dirName)
continue
}
// 检查容器是否存在,如果存在的话
// 判断map中是否存在该容器
if _, ok := l.containers[entry.Name()]; ok {
continue
}
// 加载容器使用信息
usage, err := loadCache(dirName)
if err != nil {
klog.Errorf("Failed to load cache: %s, error: %v", dirName, err)
continue
}
// 如果容器使用信息为空,则跳过
if usage == nil {
// no cuInit in container
continue
}
// 获取pod uid和container name
usage.PodUID = strings.Split(entry.Name(), "_")[0]
usage.ContainerName = strings.Split(entry.Name(), "_")[1]
// 将容器使用信息添加到map中
l.containers[entry.Name()] = usage
klog.Infof("Adding ctr dirname %s in monitorpath", dirName)
}
return nil
}
loadCache函数,如何加载容器使用信息?
func loadCache(fpath string) (*ContainerUsage, error) {
klog.Infof("Checking path %s", fpath)
// 读取容器目录下的所有文件,形如/usr/local/vgpu/containers/305df464-2a9e-485d-9c80-f2901a9259cc_pytorch-container/
files, err := os.ReadDir(fpath)
if err != nil {
return nil, err
}
// 检查文件数量,如果文件数量大于2,则返回错误
if len(files) > 2 {
return nil, errors.New("cache num not matched")
}
// 如果文件数量为0,则返回nil
if len(files) == 0 {
return nil, nil
}
cacheFile := ""
for _, val := range files {
// 检查文件名是否包含libvgpu.so
if strings.Contains(val.Name(), "libvgpu.so") {
continue
}
// 检查文件名是否包含.cache
if !strings.Contains(val.Name(), ".cache") {
continue
}
// 获取cache文件的完整路径
cacheFile = filepath.Join(fpath, val.Name())
break
}
// 如果cacheFile为空,则返回nil
if cacheFile == "" {
klog.Infof("No cache file in %s", fpath)
return nil, nil
}
// 获取cache文件的元数据
info, err := os.Stat(cacheFile)
if err != nil {
klog.Errorf("Failed to stat cache file: %s, error: %v", cacheFile, err)
return nil, err
}
// 检查cache文件大小,如果小于headerT结构体的大小,则返回错误
if info.Size() < int64(unsafe.Sizeof(headerT{})) {
return nil, fmt.Errorf("cache file size %d too small", info.Size())
}
// 打开cache文件
f, err := os.OpenFile(cacheFile, os.O_RDWR, 0666)
if err != nil {
klog.Errorf("Failed to open cache file: %s, error: %v", cacheFile, err)
return nil, err
}
defer func(f *os.File) {
_ = f.Close()
}(f)
usage := &ContainerUsage{}
// 将cache文件映射到内存
// - 内存映射文件
// 参数说明:
// - fd: 文件描述符
// - offset: 从文件开始处的偏移量
// - length: 要映射的字节数
// - prot: 内存保护标志 (读/写)
// - flags: 映射标志
usage.data, err = syscall.Mmap(int(f.Fd()), 0, int(info.Size()), syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
klog.Errorf("Failed to mmap cache file: %s, error: %v", cacheFile, err)
return nil, err
}
// 将[]byte转换为结构体指针
head := (*headerT)(unsafe.Pointer(&usage.data[0]))
// 校验initializedFlag,不清楚为什么SharedRegionMagicFlag=19920718
if head.initializedFlag != SharedRegionMagicFlag {
// 释放内存映射
_ = syscall.Munmap(usage.data)
return nil, fmt.Errorf("cache file magic flag not matched")
}
// 检查cache文件大小,为什么跟1197897比较,看起来是针对特定版本的NVIDIA GPU监控数据结构的大小
// usage.data 是通过 syscall.Mmap 映射的文件内容
// v0.CastSpec 和 v1.CastSpec 分别处理不同版本的数据格式
// 两个版本都实现了UsageInfo接口
if info.Size() == 1197897 {
usage.Info = v0.CastSpec(usage.data)
} else if head.majorVersion == 1 {
usage.Info = v1.CastSpec(usage.data)
} else {
_ = syscall.Munmap(usage.data)
return nil, fmt.Errorf("unknown cache file size %d version %d.%d", info.Size(), head.majorVersion, head.minorVersion)
}
return usage, nil
}
vGPUmonitor metrics,如何初始化?
// NewClusterManager first creates a Prometheus-ignorant ClusterManager
// instance. Then, it creates a ClusterManagerCollector for the just created
// ClusterManager. Finally, it registers the ClusterManagerCollector with a
// wrapping Registerer that adds the zone as a label. In this way, the metrics
// collected by different ClusterManagerCollectors do not collide.
func NewClusterManager(zone string, reg prometheus.Registerer, containerLister *nvidia.ContainerLister) *ClusterManager {
// 创建一个ClusterManager实例
c := &ClusterManager{
Zone: zone,
containerLister: containerLister,
}
// 创建一个PodLister实例,用于获取Pod信息
informerFactory := informers.NewSharedInformerFactoryWithOptions(containerLister.Clientset(), time.Hour*1)
c.PodLister = informerFactory.Core().V1().Pods().Lister()
stopCh := make(chan struct{})
// 启动informerFactory,用于获取Pod信息
informerFactory.Start(stopCh)
// 创建一个ClusterManagerCollector实例,用于收集metrics
cc := ClusterManagerCollector{ClusterManager: c}
// 将ClusterManagerCollector实例注册到prometheus
prometheus.WrapRegistererWith(prometheus.Labels{"zone": zone}, reg).MustRegister(cc)
return c
}
如何持续监控容器使用情况?
func watchAndFeedback(lister *nvidia.ContainerLister) {
// 初始化 NVIDIA Management Library
nvml.Init()
// 无限循环,持续监控
for {
// 每5秒执行一次
time.Sleep(time.Second * 5)
// 更新容器列表器实例的containers map
err := lister.Update()
if err != nil {
// 如果更新失败,记录错误并继续下一次循环
klog.Errorf("Failed to update container list: %v", err)
continue
}
// 观察并处理容器状态
Observe(lister)
}
}
// Observe 监控和管理容器的 GPU 使用状态
func Observe(lister *nvidia.ContainerLister) {
// 初始化设备利用率开关映射,key是GPU UUID,value是每个优先级的使用计数
utSwitchOn := map[string]UtilizationPerDevice{}
// 获取containers map
containers := lister.ListContainers()
// 第一次遍历:统计每个 GPU 设备的使用情况
for _, c := range containers {
// 获取容器的最近内核使用计数
recentKernel := c.Info.GetRecentKernel()
if recentKernel > 0 {
// 递减内核使用计数
recentKernel--
if recentKernel > 0 {
// 遍历容器使用的所有 GPU 设备
for i := 0; i < c.Info.DeviceMax(); i++ {
// 跳过无效的 GPU UUID
if !c.Info.IsValidUUID(i) {
continue
}
// 获取 GPU 的 UUID
uuid := c.Info.DeviceUUID(i)
// 如果是新设备,初始化优先级计数数组 [0, 0]
if len(utSwitchOn[uuid]) == 0 {
utSwitchOn[uuid] = []int{0, 0}
}
// 增加对应优先级的使用计数
utSwitchOn[uuid][c.Info.GetPriority()]++
}
}
// 更新容器的内核使用计数
c.Info.SetRecentKernel(recentKernel)
}
}
// 第二次遍历:根据统计结果更新容器状态
for idx, c := range containers {
priority := c.Info.GetPriority()
recentKernel := c.Info.GetRecentKernel()
utilizationSwitch := c.Info.GetUtilizationSwitch()
// 检查是否需要阻塞(有更高优先级的任务在使用)
if CheckBlocking(utSwitchOn, priority, c) {
if recentKernel >= 0 {
klog.Infof("utSwitchon=%v", utSwitchOn)
klog.Infof("Setting Blocking to on %v", idx)
// 设置阻塞状态
c.Info.SetRecentKernel(-1)
}
} else {
if recentKernel < 0 {
klog.Infof("utSwitchon=%v", utSwitchOn)
klog.Infof("Setting Blocking to off %v", idx)
// 解除阻塞状态
c.Info.SetRecentKernel(0)
}
}
// 检查优先级冲突(同优先级是否有多个任务)
if CheckPriority(utSwitchOn, priority, c) {
if utilizationSwitch != 1 {
klog.Infof("utSwitchon=%v", utSwitchOn)
klog.Infof("Setting UtilizationSwitch to on %v", idx)
// 开启利用率限制
c.Info.SetUtilizationSwitch(1)
}
} else {
if utilizationSwitch != 0 {
klog.Infof("utSwitchon=%v", utSwitchOn)
klog.Infof("Setting UtilizationSwitch to off %v", idx)
// 关闭利用率限制
c.Info.SetUtilizationSwitch(0)
}
}
}
}
这个函数实现了一个优先级基础的 GPU 资源管理机制:
- 统计每个 GPU 设备上不同优先级任务的数量
- 根据优先级规则决定是否阻塞低优先级任务
- 处理同优先级任务的资源竞争
- 通过日志记录状态变化
这种机制可以确保:
- 高优先级任务优先获得GPU资源
- 同优先级任务公平共享GPU资源
- 低优先级任务在资源充足时才能运行
参考链接
- HAMi官方部署文档
- Nvidia Container Toolkit
- GPU 环境搭建指南:如何在裸机、Docker、K8s 等环境中使用 GPU
- GPU 环境搭建指南:使用 GPU Operator 加速 Kubernetes GPU 环境搭建
- Kubernetes教程(二一)—自定义资源支持:K8s Device Plugin 从原理到实现
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付