HAMi vGPU学习笔记

Posted by iceyao on Wednesday, December 11, 2024

1. HAMi是什么

想象一下你是一位繁忙的AI研究员,手头有好几个实验要跑,但实验室里的GPU显卡资源有限。这时候,HAMi就像一位智慧的资源管理员,帮你合理分配和调度这些宝贵的GPU资源。它就像是给GPU装上了"分身术",让多个AI任务能够和谐共处在同一块显卡上。HAMi目前已经加入了云原生界的"黄埔军校"CNCF(云原生计算基金会),作为一个充满潜力的沙箱项目茁壮成长。

让我们通过几个生动的场景,来了解HAMi的神奇之处:

  1. 设备共享有妙招,就像一个灵活的"设备调度大师":

    • 不管是GPU还是NPU,都能轻松管理,就像训练有素的管家
    • 一个AI任务想要多个显卡?没问题!就像给你配备多位得力助手
  2. 显存管理很智能,犹如一位精明的"内存管家":

    • 每个任务的显存使用都有明确的"配额",不会互相争抢
    • 需要更多显存?可以随时调整,就像弹性伸缩的口袋
    • 想用具体数值还是百分比来分配?都可以,就像点菜可以按份量也可以按比例
  3. 设备挑选很贴心,像个专业的"设备配对专家":

    • 只想用RTX 4090?没问题,就像指定特定型号的跑车
    • 还能通过UUID精确定位设备,就像每个显卡都有自己的身份证
  4. 上手特别容易,就像一个"零门槛"的好帮手:

    • AI程序完全不用改代码,就像无缝衔接的魔法
    • 安装卸载像玩积木一样简单,helm工具轻松搞定
  5. 开放合作大家庭,像一个温暖的"开源社区":

    • 各行各业的大咖都在参与,从互联网到金融,从制造业到云服务
    • 加入CNCF大家庭,让更多朋友一起来建设,就像一个开放的创新工坊

2. HAMi架构

HAMi架构图

HAMi架构图 HAMi这套架构就像一个高效运转的智能办公大楼,由四个关键人员协同工作:

  1. 前台接待员(HAMi MutatingWebhook)就像大楼的智能前台,当新的AI任务来访时:

    • 会仔细检查访客的"需求清单"(资源申请)
    • 如果发现这位访客只需要基础设施(CPU、内存)或HAMi特供服务,就会贴上"HAMi特别通行证"
    • 相当于一位细心的门卫,确保每个来访者都能得到合适的接待方式
  2. 调度主管(HAMi scheduler-extender)犹如大楼的总调度室管理员:

    • 负责给每位访客安排最合适的办公位置(节点)和工作设备(GPU/NPU)
    • 就像有一个实时更新的"全楼设备状态大屏",随时掌握每个设备的使用情况
    • 能够根据访客需求和设备状态,做出最优的分配决策
  3. 设备管家(Device-plugin)像是一位神通广大的设备连接专家:

    • 看到调度主管的工位安排单后,立即着手准备相应的设备
    • 负责将显卡等设备"插上电源",确保设备随时可用
    • 就像是在访客和设备之间搭建一座便捷的桥梁
  4. 资源管控员(HAMi-Core)就像一位尽职的资源监管员:

    • 时刻关注每个访客的资源使用情况,确保不会超出预定配额
    • 设置明确的资源使用界限,防止互相干扰
    • 当某个任务试图占用过多资源时,会及时进行管控

3. HAMi应用场景

3.1 设备共享

HAMi设备共享

HAMi的设备共享就像是一个智能的资源分配系统,让一块显卡能够同时服务多个AI任务:

  1. 灵活的显存分配

    • 可以精确指定每个任务使用多少显存
    • 就像给每个租户分配固定大小的"储物柜"
  2. 算力精确控制

    • 对计算单元(流处理器)进行严格限制
    • 就像给每个租户分配固定数量的"工作人员"
  3. 核心使用率管理

    • 支持按需分配设备核心使用率
    • 就像给不同任务分配不同的"工作时间"
  4. 无缝适配已有程序

    • 完全不需要修改现有的程序代码
    • 就像租客入住时不需要重新装修,直接拎包入住

3.2 设备资源隔离

resources:
  limits:
    nvidia.com/gpu: 1    # 请求1个虚拟GPU
    nvidia.com/gpumem: 3000    # 每个虚拟GPU包含3000MB设备内存

在容器内部看到的1张卡,显存3GB

4. Helm部署HAMi

4.1 前置条件

4.2 配置nvidia runtime

安装nvidia-container-toolkit,配置nvidia-container-runtime

distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
curl -s -L https://nvidia.github.io/libnvidia-container/gpgkey | sudo apt-key add -
curl -s -L https://nvidia.github.io/libnvidia-container/$distribution/libnvidia-container.list | sudo tee /etc/apt/sources.list.d/libnvidia-container.list

sudo apt-get update && sudo apt-get install -y nvidia-container-toolkit
# 如果container runtime使用的是docker
vim /etc/docker/daemon.json
{
    "default-runtime": "nvidia",
    "runtimes": {
        "nvidia": {
            "path": "/usr/bin/nvidia-container-runtime",
            "runtimeArgs": []
        }
    }
}

sudo systemctl daemon-reload && systemctl restart docker
# 如果container runtime使用的是containerd
(base) root@vgpu:/root# nvidia-ctk runtime configure --runtime=containerd
INFO[0000] Wrote updated config to /etc/containerd/config.toml
INFO[0000] It is recommended that containerd daemon be restarted.
# 如果container runtime使用的是containerd
vim /etc/containerd/config.toml

version = 2
[plugins]
  [plugins."io.containerd.grpc.v1.cri"]
    [plugins."io.containerd.grpc.v1.cri".containerd]
      default_runtime_name = "nvidia"

      [plugins."io.containerd.grpc.v1.cri".containerd.runtimes]
        [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.nvidia]
          privileged_without_host_devices = false
          runtime_engine = ""
          runtime_root = ""
          runtime_type = "io.containerd.runc.v2"
          [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.nvidia.options]
            BinaryName = "/usr/bin/nvidia-container-runtime"

sudo systemctl daemon-reload && systemctl restart containerd

为containerd配置代理(docker daemon同理)

vim  /etc/systemd/system/containerd.service

[Service]
Environment="HTTP_PROXY=http://<代理地址>:<代理端口>"
Environment="HTTPS_PROXY=http://<代理地址>:<代理端口>"
Environment="NO_PROXY=localhost,127.0.0.1"

4.3 GPU节点标签

kubectl label nodes {nodeid} gpu=on

4.4 部署HAMi

image_version=$(kubectl version 2>/dev/null | grep GitVersion | head -1 | awk -F'"' '{print $6}')
helm repo add hami-charts https://project-hami.github.io/HAMi/
helm install hami hami-charts/hami --set scheduler.kubeScheduler.imageTag=${image_version} -n kube-system

4.5 验证HAMi

创建gpu pod

# 创建一个测试用的Pod,用于验证HAMi的基本功能
cat << 'EOF' | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
  name: gpu-pod
spec:
  containers:
    - name: ubuntu-container
      image: ubuntu:18.04
      command: ["bash", "-c", "sleep 86400"]
      resources:
        limits:
          nvidia.com/gpu: 1 # requesting 1 vGPUs
          nvidia.com/gpumem: 10240 # Each vGPU contains 3000m device memory (Optional,Integer)
EOF

进入到容器里查看,可以看到1张卡,显存1GB

(base) root@vgpu:~# kubectl exec -it gpu-pod -- nvidia-smi
[HAMI-core Msg(24:139957573396288:libvgpu.c:836)]: Initializing.....
Fri Dec 13 05:55:05 2024
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.135                Driver Version: 550.135        CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|=========================================+========================+======================|
|   0  NVIDIA GeForce RTX 3090        Off |   00000000:00:05.0 Off |                  N/A |
|  0%   26C    P8              7W /  370W |       0MiB /  10240MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+

+-----------------------------------------------------------------------------------------+
| Processes:                                                                              |
|  GPU   GI   CI        PID   Type   Process name                              GPU Memory |
|        ID   ID                                                               Usage      |
|=========================================================================================|
|  No running processes found                                                             |
+-----------------------------------------------------------------------------------------+
[HAMI-core Msg(24:139957573396288:multiprocess_memory_limit.c:497)]: Calling exit handler 24

4.6 多pod共享GPU

# 创建一个包含3个Pod的Deployment,用于测试GPU共享功能
cat << 'EOF' | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
  name: hami-npod-1gpu
spec:
  replicas: 3  # 创建三个相同的 Pod,可根据需要修改数量
  selector:
    matchLabels:
      app: pytorch
  template:
    metadata:
      labels:
        app: pytorch
    spec:
      containers:
      - name: pytorch-container
        image: uhub.service.ucloud.cn/gpu-share/gpu_pytorch_test:latest
        command: ["/bin/sh", "-c"]
        args: ["cd /app/pytorch_code && python3 2.py"]
        resources:
          limits:
            nvidia.com/gpu: 1
            nvidia.com/gpumem: 3000
            nvidia.com/gpucores: 25
EOF
# 查看创建的Pod状态
(base) root@vgpu:~# kubectl get pod
NAME                              READY   STATUS    RESTARTS   AGE
hami-npod-1gpu-6f65f668b7-6k2jh   1/1     Running   0          5s
hami-npod-1gpu-6f65f668b7-f48gm   1/1     Running   0          5s
hami-npod-1gpu-6f65f668b7-kpnbg   1/1     Running   0          5s
# 查看主机上的GPU使用情况
(base) root@vgpu:~# nvidia-smi
Fri Dec 13 09:32:47 2024
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.135                Driver Version: 550.135        CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|=========================================+========================+======================|
|   0  NVIDIA GeForce RTX 3090        Off |   00000000:00:05.0 Off |                  N/A |
| 34%   33C    P2            103W /  370W |    1320MiB /  24576MiB |      2%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+

+-----------------------------------------------------------------------------------------+
| Processes:                                                                              |
|  GPU   GI   CI        PID   Type   Process name                              GPU Memory |
|        ID   ID                                                               Usage      |
|=========================================================================================|
|    0   N/A  N/A     19635      C   python3                                       478MiB |
|    0   N/A  N/A     19713      C   python3                                       478MiB |
|    0   N/A  N/A     19900      C   python3                                       352MiB |
+-----------------------------------------------------------------------------------------+

5. HAMi代码分析

以代码commit为例:96adaec66c4472383108de2d18698b47f5e6e553

5.1 hami-scheduler调试环境

因为gpu环境在远程服务器端,所以利用dlv+golang来搭建远程调试环境.大体思路是:

  • 在远程gpu服务器上用dlv启动hami-scheduler
  • 把hami-scheduler相关的配置指向本地本地dlv启动的hami-scheduler服务
  1. 在远程gpu服务器上用dlv启动hami-scheduler
root@vgpu:~# nerdctl -n k8s.io cp <hami-scheduler-container-id>:/tls/tls.crt /root/tls/
root@vgpu:~# nerdctl -n k8s.io cp <hami-scheduler-container-id>:/tls/tls.key /root/tls/
dlv exec bin/scheduler --headless -l 0.0.0.0:2345 --api-version=2 -- \
--http_bind=0.0.0.0:9443 \
--cert_file=/root/tls/tls.crt \
--key_file=/root/tls/tls.key \
--scheduler-name=hami-scheduler \
--node-scheduler-policy=binpack \
--gpu-scheduler-policy=spread \
--device-config-file=/root/device-config.yaml

启动参数通过kubectl -n kube-system edit deployments.apps hami-scheduler可以查看,device-config.yaml文件通过nerdctl -n k8s.io cp <hami-scheduler-container-id>:/root/device-config.yaml获取。dlv debug直接调试源代码并自动编译(适合开发环境),而dlv exec调试已编译的二进制文件(适合生产环境,但需要编译时包含调试信息 -gcflags=“all=-N -l”),dlv exec直接作用于二进制,启动较快。HAMi代码库中有提供Makefile需要移除下-s -w参数,make build可以编译出包含调试信息的二进制文件。

  1. hami-webhook指向本地dlv启动的hami-scheduler服务
(base) root@vgpu:~# kubectl edit mutatingwebhookconfigurations.admissionregistration.k8s.io hami-webhook
# webhook限制强制使用https,复用证书,用127.0.0.1的地址
...
webhooks:
  - admissionReviewVersions:
    - v1beta1
    clientConfig:
      url: https://127.0.0.1:9443/webhook
...      
  1. hami-scheduler extender指向本地dlv启动的hami-scheduler服务
(base) root@vgpu:~# kubectl -n kube-system edit cm hami-scheduler-newversion
...
apiVersion: v1
data:
  config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1beta2
    kind: KubeSchedulerConfiguration
    leaderElection:
      leaderElect: false
    profiles:
    - schedulerName: hami-scheduler
    extenders:
    - urlPrefix: "https://<dlv-local-ip>:9443"
      filterVerb: filter
      bindVerb: bind
      nodeCacheCapable: true
      weight: 1
      httpTimeout: 30s
      enableHTTPS: false
      tlsConfig:
        insecure: true
...

hami-scheduler deployment实际上包含两个container:一个是kube-scheduler原生调度器,一个是hami-scheduler自定义调度器;扩展调度器用的还是废弃的extender机制,并非scheduler framework。exterder机制虽有性能问题,但是使用起来比较简单。

  1. goland添加Go Remote Debugger指向:2345

5.2 hami-scheduler核心逻辑

hami-scheduler带的命令行参数

func init() {
    // 禁用了命令行参数的自动排序,保持参数的定义顺序
	rootCmd.Flags().SortFlags = false
	rootCmd.PersistentFlags().SortFlags = false

	rootCmd.Flags().StringVar(&config.HTTPBind, "http_bind", "127.0.0.1:8080", "http server bind address")
	rootCmd.Flags().StringVar(&tlsCertFile, "cert_file", "", "tls cert file")
	rootCmd.Flags().StringVar(&tlsKeyFile, "key_file", "", "tls key file")
	rootCmd.Flags().StringVar(&config.SchedulerName, "scheduler-name", "", "the name to be added to pod.spec.schedulerName if not empty")
	rootCmd.Flags().Int32Var(&config.DefaultMem, "default-mem", 0, "default gpu device memory to allocate")
	rootCmd.Flags().Int32Var(&config.DefaultCores, "default-cores", 0, "default gpu core percentage to allocate")
	rootCmd.Flags().Int32Var(&config.DefaultResourceNum, "default-gpu", 1, "default gpu to allocate")
	rootCmd.Flags().StringVar(&config.NodeSchedulerPolicy, "node-scheduler-policy", util.NodeSchedulerPolicyBinpack.String(), "node scheduler policy")
	rootCmd.Flags().StringVar(&config.GPUSchedulerPolicy, "gpu-scheduler-policy", util.GPUSchedulerPolicySpread.String(), "GPU scheduler policy")
	rootCmd.Flags().StringVar(&config.MetricsBindAddress, "metrics-bind-address", ":9395", "The TCP address that the scheduler should bind to for serving prometheus metrics(e.g. 127.0.0.1:9395, :9395)")
    // 区分了 Flags() 和 PersistentFlags():
    // Flags():仅对当前命令有效的参数
    // PersistentFlags():对当前命令及其子命令都有效的参数
	rootCmd.Flags().StringToStringVar(&config.NodeLabelSelector, "node-label-selector", nil, "key=value pairs separated by commas")
	rootCmd.PersistentFlags().AddGoFlagSet(device.GlobalFlagSet())
	rootCmd.AddCommand(version.VersionCmd)
	rootCmd.Flags().AddGoFlagSet(util.InitKlogFlags())
}

// 其它的全局命令行参数
func GlobalFlagSet() *flag.FlagSet {
	fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
	ascend.ParseConfig(fs)
	cambricon.ParseConfig(fs)
	hygon.ParseConfig(fs)
	iluvatar.ParseConfig(fs)
	nvidia.ParseConfig(fs)
	mthreads.ParseConfig(fs)
	metax.ParseConfig(fs)
	fs.BoolVar(&DebugMode, "debug", false, "debug mode")
	fs.StringVar(&configFile, "device-config-file", "", "device config file")
	klog.InitFlags(fs)
	return fs
}

scheduler启动函数

func start() {
    // 根据device-config初始化devices map
	device.InitDevices()
    // 初始化hami-scheduelr调度器
	sher = scheduler.NewScheduler()
    // 启动hami-scheduelr调度器
	sher.Start()
    // 停止调度器
	defer sher.Stop()

	// 这个函数的主要职责是:
    // 1. 持续监控:
    // 通过channel和定时器实现持续监控
    // 响应节点变更事件和定时检查
    // 2. 设备健康检查:
    // 定期检查每个节点上的设备健康状态
    // 处理不健康的设备(清理和移除)
    // 3. 设备信息更新:
    // 维护最新的节点设备信息
    // 处理设备握手机制
    // 更新调度器的设备缓存
    // 4. 资源使用统计:
    // 追踪并更新节点资源使用情况
    // 这是调度器的核心组件之一,确保调度决策基于最新的节点和设备状态信息。
	go sher.RegisterFromNodeAnnotations()
    // 暴露metrics监控指标
	go initMetrics(config.MetricsBindAddress)

	// 启动一个http server,用于处理调度器请求
	router := httprouter.New()
    // 调度filter阶段请求
	router.POST("/filter", routes.PredicateRoute(sher))
    // 调度bind阶段请求
	router.POST("/bind", routes.Bind(sher))
    // webhook请求,实测没有捕获到请求
	router.POST("/webhook", routes.WebHookRoute())
    // 健康检查请求
	router.GET("/healthz", routes.HealthzRoute())
	klog.Info("listen on ", config.HTTPBind)
	if len(tlsCertFile) == 0 || len(tlsKeyFile) == 0 {
		if err := http.ListenAndServe(config.HTTPBind, router); err != nil {
			klog.Fatal("Listen and Serve error, ", err)
		}
	} else {
		if err := http.ListenAndServeTLS(config.HTTPBind, tlsCertFile, tlsKeyFile, router); err != nil {
			klog.Fatal("Listen and Serve error, ", err)
		}
	}
}

为什么要初始化GPU设备devices map?通过这种初始化机制,HAMi可以统一管理不同类型的GPU,为上层调度提供统一的设备抽象。需要维护一个设备注册表,记录所有支持的GPU。每个设备都需要实现Devices接口,提供以下核心功能:

  • 资源分配和管理
  • 设备健康检查
  • 节点打分和筛选
  • 设备锁定和释放
  • 资源使用统计
// Devices接口定义了GPU设备的核心功能,从目前实现来看支持7种设备:nvidia、ascend、cambricon、hygon、iluvatar、mthreads、metax
type Devices interface {
	CommonWord() string
	MutateAdmission(ctr *corev1.Container, pod *corev1.Pod) (bool, error)
	CheckHealth(devType string, n *corev1.Node) (bool, bool)
	NodeCleanUp(nn string) error
	GetNodeDevices(n corev1.Node) ([]*util.DeviceInfo, error)
	CheckType(annos map[string]string, d util.DeviceUsage, n util.ContainerDeviceRequest) (bool, bool, bool)
	// CheckUUID is check current device id whether in GPUUseUUID or GPUNoUseUUID set, return true is check success.
	CheckUUID(annos map[string]string, d util.DeviceUsage) bool
	LockNode(n *corev1.Node, p *corev1.Pod) error
	ReleaseNodeLock(n *corev1.Node, p *corev1.Pod) error
	GenerateResourceRequests(ctr *corev1.Container) util.ContainerDeviceRequest
	PatchAnnotations(annoinput *map[string]string, pd util.PodDevices) map[string]string
	CustomFilterRule(allocated *util.PodDevices, request util.ContainerDeviceRequest, toAllicate util.ContainerDevices, device *util.DeviceUsage) bool
	ScoreNode(node *corev1.Node, podDevices util.PodSingleDevice, policy string) float32
	AddResourceUsage(n *util.DeviceUsage, ctr *util.ContainerDevice) error
	// This should not be associated with a specific device object
	//ParseConfig(fs *flag.FlagSet)
}
func InitDevicesWithConfig(config *Config) {
    // 初始化devices map
	devices = make(map[string]Devices)
	DevicesToHandle = []string{}
    // 初始化nvidia设备
	devices[nvidia.NvidiaGPUDevice] = nvidia.InitNvidiaDevice(config.NvidiaConfig)
    // 初始化cambricon设备
	devices[cambricon.CambriconMLUDevice] = cambricon.InitMLUDevice(config.CambriconConfig)
    // 初始化hygon设备
	devices[hygon.HygonDCUDevice] = hygon.InitDCUDevice(config.HygonConfig)
    // 初始化iluvatar设备
	devices[iluvatar.IluvatarGPUDevice] = iluvatar.InitIluvatarDevice(config.IluvatarConfig)
    // 初始化mthreads设备
	devices[mthreads.MthreadsGPUDevice] = mthreads.InitMthreadsDevice(config.MthreadsConfig)
    // 初始化metax设备
	devices[metax.MetaxGPUDevice] = metax.InitMetaxDevice(config.MetaxConfig)
    // CommonWord函数,看起来像是设备别名转换
	DevicesToHandle = append(DevicesToHandle, nvidia.NvidiaGPUCommonWord)
	DevicesToHandle = append(DevicesToHandle, cambricon.CambriconMLUCommonWord)
	DevicesToHandle = append(DevicesToHandle, hygon.HygonDCUCommonWord)
	DevicesToHandle = append(DevicesToHandle, iluvatar.IluvatarGPUCommonWord)
	DevicesToHandle = append(DevicesToHandle, mthreads.MthreadsGPUCommonWord)
	DevicesToHandle = append(DevicesToHandle, metax.MetaxGPUCommonWord)
	for _, dev := range ascend.InitDevices(config.VNPUs) {
		devices[dev.CommonWord()] = dev
		DevicesToHandle = append(DevicesToHandle, dev.CommonWord())
	}
}

hami也会在集群中运行一个hami-scheduler的调度器,pod调度时,如何区分是集群内原有的调度器还是hami-scheduler?mutatingwebhook hami-webhook会指定pod调度器为hami-scheduler

// 创建一个webhook,用于处理pod调度请求
func NewWebHook() (*admission.Webhook, error) {
	logf.SetLogger(klog.NewKlogr())
	schema := runtime.NewScheme()
	if err := clientgoscheme.AddToScheme(schema); err != nil {
		return nil, err
	}
	decoder := admission.NewDecoder(schema)
	wh := &admission.Webhook{Handler: &webhook{decoder: decoder}}
	return wh, nil
}

// webhook结构体实现了Handler interface
func (h *webhook) Handle(_ context.Context, req admission.Request) admission.Response {
	pod := &corev1.Pod{}
    // 解码请求,获取pod信息
	err := h.decoder.Decode(req, pod)
	if err != nil {
		klog.Errorf("Failed to decode request: %v", err)
		return admission.Errored(http.StatusBadRequest, err)
	}
    // 检查pod是否有容器
	if len(pod.Spec.Containers) == 0 {
		klog.Warningf(template+" - Denying admission as pod has no containers", req.Namespace, req.Name, req.UID)
		return admission.Denied("pod has no containers")
	}
	klog.Infof(template, req.Namespace, req.Name, req.UID)
	hasResource := false

	for idx, ctr := range pod.Spec.Containers {
		c := &pod.Spec.Containers[idx]
        // 检查容器是否为特权容器
		if ctr.SecurityContext != nil {
			if ctr.SecurityContext.Privileged != nil && *ctr.SecurityContext.Privileged {
				klog.Warningf(template+" - Denying admission as container %s is privileged", req.Namespace, req.Name, req.UID, c.Name)
				continue
			}
		}
        // 检查容器是否为vgpu容器
		for _, val := range device.GetDevices() {
            // 调用设备接口的MutateAdmission方法,检查容器是否为vgpu容器
			found, err := val.MutateAdmission(c, pod)
			if err != nil {
				klog.Errorf("validating pod failed:%s", err.Error())
				return admission.Errored(http.StatusInternalServerError, err)
			}
			hasResource = hasResource || found
		}
	}

	if !hasResource {
		klog.Infof(template+" - Allowing admission for pod: no resource found", req.Namespace, req.Name, req.UID)
		//return admission.Allowed("no resource found")
	} else if len(config.SchedulerName) > 0 {
        // 如果配置了scheduler-name,则设置pod调度器为hami-scheduler
		pod.Spec.SchedulerName = config.SchedulerName
		if pod.Spec.NodeName != "" {
			klog.Infof(template+" - Pod already has node assigned", req.Namespace, req.Name, req.UID)
			return admission.Denied("pod has node assigned")
		}
	}
	marshaledPod, err := json.Marshal(pod)
	if err != nil {
		klog.Errorf(template+" - Failed to marshal pod, error: %v", req.Namespace, req.Name, req.UID, err)
		return admission.Errored(http.StatusInternalServerError, err)
	}
	return admission.PatchResponseFromRaw(req.Object.Raw, marshaledPod)
}

vgpu pod的annotation是什么时候添加的?在scheduler filter阶段添加

// filter核心函数
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
	klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace)
	// 用于统计Pod中容器的设备资源请求
    nums := k8sutil.Resourcereqs(args.Pod)
	total := 0
	for _, n := range nums {
		for _, k := range n {
			total += int(k.Nums)
		}
	}
	if total == 0 {
        // 如果Pod中没有请求任何资源,则记录调度失败事件
		klog.V(1).Infof("pod %v not find resource", args.Pod.Name)
		s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
		return &extenderv1.ExtenderFilterResult{
			NodeNames:   args.NodeNames,
			FailedNodes: nil,
			Error:       "",
		}, nil
	}
	annos := args.Pod.Annotations
    // 从podManager维护的pods map中删除pod
	s.delPod(args.Pod)
    // 调度器中的重要函数,用于获取节点资源使用情况
	nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)
	if err != nil {
		s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
		return nil, err
	}
	if len(failedNodes) != 0 {
		klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes)
	}
    // 计算节点得分的函数
    // 1. 获取节点调度策略
    // 2. 遍历所有可用节点
    // 3. 计算每个节点的基础得分
    // 4. 检查容器的设备需求适配性
    // 5. 根据策略调整最终得分
    // 6. 记录符合要求的节点得分

    // 计算逻辑说明:
    // 分别计算三个维度的使用率:设备、核心、显存
    // 每个维度的得分是使用量与总量的比值
    // 最终得分是三个维度得分之和乘以权重

    // 适配性检查,检查节点上的设备是否能满足容器的资源请求;如果适配成功,会在score.Devices中记录设备的分配方案
    //     fit, _ := fitInDevices(
    //     node,           // 节点信息(包含该节点上的设备状态)
    //     n,              // 容器的设备请求数量
    //     annos,          // Pod 的注解信息
    //     task,           // 待调度的 Pod
    //     &score.Devices  // 输出参数:记录设备分配结果
    // )
	nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod, failedNodes)
	if err != nil {
		err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
		s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
		return nil, err
	}
    // 如果所有节点得分都不符合要求,则记录调度失败事件
	if len((*nodeScores).NodeList) == 0 {
		klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name)
		s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
		return &extenderv1.ExtenderFilterResult{
			FailedNodes: failedNodes,
		}, nil
	}
	klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
    // 对节点得分进行排序
	sort.Sort(nodeScores)
    // 获取得分最高的节点
	m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
	klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)
	annotations := make(map[string]string)
    // 添加调度节点和调度时间到pod的annotation中
	annotations[util.AssignedNodeAnnotations] = m.NodeID
    // 添加调度时间到pod的annotation中
	annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)

	for _, val := range device.GetDevices() {
		val.PatchAnnotations(&annotations, m.Devices)
	}

	//InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
	//supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
	//maps.Copy(annotations, InRequestDevices)
	//maps.Copy(annotations, supportDevices)
    // 将pod添加到podManager维护的pods map中
	s.addPod(args.Pod, m.NodeID, m.Devices)
    // 更新pod的annotation
	err = util.PatchPodAnnotations(args.Pod, annotations)
	if err != nil {
		s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
		s.delPod(args.Pod)
		return nil, err
	}
    // 记录调度成功事件
	s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
    // 返回调度结果
	res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
	return &res, nil
}

除了filter阶段,scheduler还有bind阶段,bind阶段是带有资源锁定的安全绑定过程,确保设备资源不会被重复分配,并在失败时能够正确清理资源。

hami-scheduler本身也是个controller,这个controller监听什么资源,然后reconcile处理什么逻辑?

func NewScheduler() *Scheduler {
	klog.Info("New Scheduler")
    // 初始化scheduler
	s := &Scheduler{
		stopCh:       make(chan struct{}),
        // 维护一个缓存map,保存节点GPU设备使用情况
		cachedstatus: make(map[string]*NodeUsage),
		nodeNotify:   make(chan struct{}, 1),
	}
    // 初始化nodeManager,维护一个nodes map,key为node_id,value为节点GPU设备的信息
	s.nodeManager.init()
    // 初始化podManager,维护一个pods map,key为pod_id,value为pod GPU设备信息
	s.podManager.init()
	return s
}
// 启动controller
func (s *Scheduler) Start() {
    // 初始化k8s clientSet
	kubeClient, err := k8sutil.NewClient()
	check(err)
	s.kubeClient = kubeClient
    // 初始化informerFactory,用于监听k8s资源变化   
	informerFactory := informers.NewSharedInformerFactoryWithOptions(s.kubeClient, time.Hour*1)
    // 初始化podLister,用于获取pod信息
	s.podLister = informerFactory.Core().V1().Pods().Lister()
    // 初始化nodeLister,用于获取node信息
	s.nodeLister = informerFactory.Core().V1().Nodes().Lister()
    // 初始化podInformer,用于监听pod资源变化
	informer := informerFactory.Core().V1().Pods().Informer()
    // 添加pod资源事件处理函数
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        // onAddPod 处理 Pod 添加事件
        // 1. 验证对象类型是否为 Pod
        // 2. 检查 Pod 是否有指定节点的注解
        // 3. 如果 Pod 处于终止状态则删除
        // 4. 解析 Pod 的设备注解并添加到podManager维护的pods map中
		AddFunc:    s.onAddPod,
        // onUpdatePod 处理 Pod 更新事件,走的还是onAddPod逻辑
		UpdateFunc: s.onUpdatePod,
        // onDelPod 处理 Pod 删除事件,从podManager维护的pods map中删除pod
		DeleteFunc: s.onDelPod,
	})
    // 初始化nodeInformer,用于监听node资源变化
	informerFactory.Core().V1().Nodes().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        // 这三个事件函数都是向nodeNotify channel发送一个空结构体信号
        // 这个设计的关键点:
        // 1. 通知机制:
        // nodeNotify是一个 channel,用于发送通知信号
        // struct{}{}是一个空结构体,在Go中常用作信号传递(因为它不占用内存)
        // 2. 触发更新:
        // 这些函数与RegisterFromNodeAnnotations()方法配合工作
        // 当节点发生任何变化时,都会触发重新扫描和同步节点状态
        // 3. 统一处理:
        // 所有节点事件(添加/更新/删除)都触发相同的通知
        // 实际的节点状态更新逻辑在接收通知的地方统一处理
		AddFunc:    s.onAddNode,
		UpdateFunc: s.onUpdateNode,
		DeleteFunc: s.onDelNode,
	})
    // 启动informerFactory,开始监听k8s资源变化
	informerFactory.Start(s.stopCh)
    // 等待informerFactory缓存同步完成
	informerFactory.WaitForCacheSync(s.stopCh)
    // 添加其它的事件处理函数
	s.addAllEventHandlers()
}

5.3 hami-device-plugin调试环境

远程gpu服务器上dlv启动hami-device-plugin,本地goland添加Go Remote Debugger指向:2346

NODE_NAME=vgpu NVIDIA_MIG_MONITOR_DEVICES=all HOOK_PATH=/usr/local \
dlv exec bin/nvidia-device-plugin --headless -l 0.0.0.0:2346 --api-version=2 -- \
--config-file=/root/device-config.yaml \
--mig-strategy=none \
--disable-core-limit=false \
-v=false

5.4 hami-device-plugin核心逻辑

在了解hami nvidia-device-plugin前,需要了解下nvidia-runtime、GPU Operator、k8s device plugin等

  • nvidia-runtime:对应Docker环境,安装nvidia-container-toolkit组件,docker配置使用nvidia-runtime,启动容器时增加--gpu参数;nvidia-container-toolkit根据NVIDIA_VISIBLE_DEVICES环境变量将GPU、驱动等相关文件挂载到容器里。
  • k8s device plugin:对应k8s环境,Nvidia提供的一种k8s GPU实现方案
  • GPU Operator:对应k8s环境,Nvidia提供的一种在Kubernetes环境中简化使用GPU的部署方案

device-plugin原理:device plugin学习笔记

实现一个device-plugin大致分为两部分:插件注册、kubelet调用插件(grpc调用,device-plugin做为grpc server启动)

// DevicePluginServer是device-plugin的server接口,定义了device-plugin需要实现的方法
// DevicePluginServer is the server API for DevicePlugin service.
type DevicePluginServer interface {
	// GetDevicePluginOptions returns options to be communicated with Device
	// Manager
	GetDevicePluginOptions(context.Context, *Empty) (*DevicePluginOptions, error)
	// ListAndWatch returns a stream of List of Devices
	// Whenever a Device state change or a Device disappears, ListAndWatch
	// returns the new list
	ListAndWatch(*Empty, DevicePlugin_ListAndWatchServer) error
	// GetPreferredAllocation returns a preferred set of devices to allocate
	// from a list of available ones. The resulting preferred allocation is not
	// guaranteed to be the allocation ultimately performed by the
	// devicemanager. It is only designed to help the devicemanager make a more
	// informed allocation decision when possible.
	GetPreferredAllocation(context.Context, *PreferredAllocationRequest) (*PreferredAllocationResponse, error)
	// Allocate is called during container creation so that the Device
	// Plugin can run device specific operations and instruct Kubelet
	// of the steps to make the Device available in the container
	Allocate(context.Context, *AllocateRequest) (*AllocateResponse, error)
	// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
	// before each container start. Device plugin can run device specific operations
	// such as resetting the device before making devices available to the container
	PreStartContainer(context.Context, *PreStartContainerRequest) (*PreStartContainerResponse, error)
}

虽然接口定义了5个方法,但是HAMi nvidia-device-plugin只实现了Allocate、ListAndWatch,外加一个Register方法,所以重点关注这三个实现

  • Register:插件注册到kubelet
  • ListAndWatch:获取GPU设备信息,并上报给kubelet
  • Allocate:分配GPU设备给容器

device-plugin启动函数

func start(c *cli.Context, flags []cli.Flag) error {
	klog.Info("Starting FS watcher.")
    // 获取节点名称
	util.NodeName = os.Getenv(util.NodeNameEnvName)
    // 创建一个文件系统监视器,用于监听kubelet设备插件路径的变化
	watcher, err := newFSWatcher(kubeletdevicepluginv1beta1.DevicePluginPath)
	if err != nil {
		return fmt.Errorf("failed to create FS watcher: %v", err)
	}
	defer watcher.Close()
	//device.InitDevices()

	/*Loading config files*/
	klog.Infof("Start working on node %s", util.NodeName)
	klog.Info("Starting OS watcher.")
    // 创建一个信号监听器,用于监听SIGHUP、SIGINT、SIGTERM、SIGQUIT信号
	sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

	var restarting bool
	var restartTimeout <-chan time.Time
	var plugins []plugin.Interface
restart:
	// If we are restarting, stop plugins from previous run.
	if restarting {
		err := stopPlugins(plugins)
		if err != nil {
			return fmt.Errorf("error stopping plugins from previous run: %v", err)
		}
	}

	klog.Info("Starting Plugins.")
    // 启动插件
	plugins, restartPlugins, err := startPlugins(c, flags, restarting)
	if err != nil {
		return fmt.Errorf("error starting plugins: %v", err)
	}

	if restartPlugins {
		klog.Info("Failed to start one or more plugins. Retrying in 30s...")
        // 设置重启超时时间
		restartTimeout = time.After(30 * time.Second)
	}
    // 设置重启标志
	restarting = true

	// Start an infinite loop, waiting for several indicators to either log
	// some messages, trigger a restart of the plugins, or exit the program.
	for {
		select {
		// If the restart timeout has expired, then restart the plugins
		case <-restartTimeout:
			goto restart

		// Detect a kubelet restart by watching for a newly created
		// 'kubeletdevicepluginv1beta1.KubeletSocket' file. When this occurs, restart this loop,
		// restarting all of the plugins in the process.
		case event := <-watcher.Events:
            // 监听kubelet设备插件路径的变化
			if event.Name == kubeletdevicepluginv1beta1.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
				klog.Infof("inotify: %s created, restarting.", kubeletdevicepluginv1beta1.KubeletSocket)
				goto restart
			}

		// Watch for any other fs errors and log them.
		case err := <-watcher.Errors:
			klog.Errorf("inotify: %s", err)

		// Watch for any signals from the OS. On SIGHUP, restart this loop,
		// restarting all of the plugins in the process. On all other
		// signals, exit the loop and exit the program.
		case s := <-sigs:
			switch s {
			case syscall.SIGHUP:
				klog.Info("Received SIGHUP, restarting.")
				goto restart
			default:
				klog.Infof("Received signal \"%v\", shutting down.", s)
				goto exit
			}
		}
	}
exit:
    // 停止插件
	err = stopPlugins(plugins)
	if err != nil {
		return fmt.Errorf("error stopping plugins: %v", err)
	}
	return nil
}

启动函数除了启动插件外,还启动监视器来监听/var/lib/kubelet/device-plugins/kubelet.sock的变化,每次重启kubelet,kubelet.sock都会被重新创建,kubelet是通过map来存储device-plugin的注册信息,所以每次kubelet重启,device-plugin都需要重新注册(也跟着重启)

HAMi作为GPU管理框架,针对插件的启动、停止,自身抽象了一套接口:

// Interface defines the API for the plugin package
type Interface interface {
	Devices() rm.Devices
	Start() error
	Stop() error
}

插件Start实现包含:plugin.Serve、plugin.Register、plugin.WatchAndRegister

// 在插件注册前,plugin.Serve会先启动插件grpc server
// Serve starts the gRPC server of the device plugin.
func (plugin *NvidiaDevicePlugin) Serve() error {
    // 删除已存在的socket文件
	os.Remove(plugin.socket)
    // 监听socket文件   
	sock, err := net.Listen("unix", plugin.socket)
	if err != nil {
		return err
	}
    // 注册设备插件的gRPC服务器
	kubeletdevicepluginv1beta1.RegisterDevicePluginServer(plugin.server, plugin)
    // 启动grpc server
	go func() {
		lastCrashTime := time.Now()
		restartCount := 0
		for {
			klog.Infof("Starting GRPC server for '%s'", plugin.rm.Resource())
			err := plugin.server.Serve(sock)
			if err == nil {
				break
			}

			klog.Infof("GRPC server for '%s' crashed with error: %v", plugin.rm.Resource(), err)

			// restart if it has not been too often
			// i.e. if server has crashed more than 5 times and it didn't last more than one hour each time
			if restartCount > 5 {
				// quit
				klog.Fatalf("GRPC server for '%s' has repeatedly crashed recently. Quitting", plugin.rm.Resource())
			}
			timeSinceLastCrash := time.Since(lastCrashTime).Seconds()
			lastCrashTime = time.Now()
			if timeSinceLastCrash > 3600 {
				// it has been one hour since the last crash.. reset the count
				// to reflect on the frequency
				restartCount = 1
			} else {
				restartCount++
			}
		}
	}()

    // 验证连接到device-plugin
	conn, err := plugin.dial(plugin.socket, 5*time.Second)
	if err != nil {
		return err
	}
    // 关闭连接
	conn.Close()

	return nil
}

plugin.Register实现

// Register registers the device plugin for the given resourceName with Kubelet.
func (plugin *NvidiaDevicePlugin) Register() error {
    // 连接到kubelet
	conn, err := plugin.dial(kubeletdevicepluginv1beta1.KubeletSocket, 5*time.Second)
	if err != nil {
		return err
	}
	defer conn.Close()
    // 创建一个注册客户端
	client := kubeletdevicepluginv1beta1.NewRegistrationClient(conn)
    // 创建一个注册请求
	reqt := &kubeletdevicepluginv1beta1.RegisterRequest{
		Version:      kubeletdevicepluginv1beta1.Version,
		Endpoint:     path.Base(plugin.socket),
        // 资源名称
		ResourceName: string(plugin.rm.Resource()),
        // 设备插件选项
		Options: &kubeletdevicepluginv1beta1.DevicePluginOptions{
			GetPreferredAllocationAvailable: false,
		},
	}
    // 发送注册请求
	_, err = client.Register(context.Background(), reqt)
	if err != nil {
		return err
	}
	return nil
}

plugin.WatchAndRegister实现

func (plugin *NvidiaDevicePlugin) WatchAndRegister() {
	klog.Info("Starting WatchAndRegister")
	errorSleepInterval := time.Second * 5
	successSleepInterval := time.Second * 30
	for {
        // annotation格式形如:
        // annotations:
        //   hami.io/node-nvidia-register: 'GPU-a8243209-6b70-5b3d-de52-1aaafc1495fc,10,24576,100,NVIDIA-NVIDIA GeForce RTX 3090,0,true,0,:'

        // 通过nvml库获取GPU信息,并更新到Node的annotation上
		err := plugin.RegistrInAnnotation()
		if err != nil {
			klog.Errorf("Failed to register annotation: %v", err)
			klog.Infof("Retrying in %v seconds...", errorSleepInterval)
			time.Sleep(errorSleepInterval)
		} else {
			klog.Infof("Successfully registered annotation. Next check in %v seconds...", successSleepInterval)
			time.Sleep(successSleepInterval)
		}
	}
}

这里device-plugin是直接和k8s-apiserver交互,更新Node信息,hami.io/node-nvidia-register的annotation信息,后面hami-scheduler的RegisterFromNodeAnnotations函数会根据annotation信息进行调度

插件ListAndWatch实现(启动nvidi-device-plugin时进入)

// ListAndWatch lists devices and update that list according to the health status
func (plugin *NvidiaDevicePlugin) ListAndWatch(e *kubeletdevicepluginv1beta1.Empty, s kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
    // 发送GPU设备信息给kubelet
	s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})

	for {
		select {
		case <-plugin.stop:
			return nil
		case d := <-plugin.health:
			// FIXME: there is no way to recover from the Unhealthy state.
			d.Health = kubeletdevicepluginv1beta1.Unhealthy
			klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
            // 发送GPU设备信息给kubelet
			s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
		}
	}
}

// plugin.apiDevices最终会进入到GetPluginDevices函数
// GetPluginDevices returns the plugin Devices from all devices in the Devices
func (ds Devices) GetPluginDevices(count uint) []*kubeletdevicepluginv1beta1.Device {
	var res []*kubeletdevicepluginv1beta1.Device

    // 如果GPU不是MIG模式,则需要根据count,复制多个device,每个device的ID不同
	if !strings.Contains(ds.GetIDs()[0], "MIG") {
		for _, dev := range ds {
            // count取值来自--device-split-count参数,单卡同时运行多少个任务,一般推荐10以上
			for i := uint(0); i < count; i++ {
				id := fmt.Sprintf("%v-%v", dev.ID, i)
                // 根据count,复制多个device,每个device的ID不同
				res = append(res, &kubeletdevicepluginv1beta1.Device{
					ID:       id,
					Health:   dev.Health,
					Topology: nil,
				})
			}
		}
	} else {
        // 如果GPU是MIG模式,则直接返回所有GPU设备信息
		for _, d := range ds {
			res = append(res, &d.Device)
		}
	}
    // 返回GPU设备信息
	return res
}

这里ds Devices参数是从哪里传入的?在pluginManager初始化的时候,最终b.buildGPUDeviceMap传入的

# mig-strategies为none时
pluginManager.GetPlugins -> rm.NewNVMLResourceManagers -> NewDeviceMap -> b.build
-> b.buildDeviceMapFromConfigResources -> b.buildGPUDeviceMap

NVIDIA设备插件中用于构建GPU设备映射的函数

// buildGPUDeviceMap builds a map of resource names to GPU devices
func (b *deviceMapBuilder) buildGPUDeviceMap() (DeviceMap, error) {
    // 创建一个空的设备映射
    devices := make(DeviceMap)

    // 遍历所有 GPU 设备
    b.VisitDevices(func(i int, gpu device.Device) error {
        // 获取 GPU 的产品名称
        name, ret := gpu.GetName()
        if ret != nvml.SUCCESS {
            return fmt.Errorf("error getting product name for GPU: %v", ret)
        }

        // 检查 GPU 是否启用了 MIG (Multi-Instance GPU) 功能
        migEnabled, err := gpu.IsMigEnabled()
        if err != nil {
            return fmt.Errorf("error checking if MIG is enabled on GPU: %v", err)
        }

        // 如果 GPU 启用了 MIG 且 MIG 策略不是 "None",则跳过这个 GPU
        // 因为 MIG 启用的 GPU 将由 buildMigDeviceMap 函数处理
        if migEnabled && *b.config.Flags.MigStrategy != spec.MigStrategyNone {
            return nil
        }

        // 遍历配置中定义的 GPU 资源
        for _, resource := range b.config.Resources.GPUs {
            // 检查 GPU 名称是否匹配资源模式
            if resource.Pattern.Matches(name) {
                // 如果匹配,创建新的 GPU 设备条目
                index, info := newGPUDevice(i, gpu)
                return devices.setEntry(resource.Name, index, info)
            }
        }

        // 如果 GPU 名称不匹配任何资源模式,返回错误
        return fmt.Errorf("GPU name '%v' does not match any resource patterns", name)
    })

    return devices, nil
}

插件Allocate实现(pod挂载GPU设备时进入)

// Allocate which return list of devices.
func (plugin *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *kubeletdevicepluginv1beta1.AllocateRequest) (*kubeletdevicepluginv1beta1.AllocateResponse, error) {
	klog.InfoS("Allocate", "request", reqs)
    // 创建一个AllocateResponse对象
	responses := kubeletdevicepluginv1beta1.AllocateResponse{}
    // 获取当前节点名称
	nodename := os.Getenv(util.NodeNameEnvName)
    // 获取当前挂载的pod
	current, err := util.GetPendingPod(ctx, nodename)
	if err != nil {
		//nodelock.ReleaseNodeLock(nodename, NodeLockNvidia, current)
		return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
	}
	klog.Infof("Allocate pod name is %s/%s, annotation is %+v", current.Namespace, current.Name, current.Annotations)
    // 遍历pod的容器请求
	for idx, req := range reqs.ContainerRequests {
		// If the devices being allocated are replicas, then (conditionally)
		// error out if more than one resource is being allocated.
        // 如果请求的设备ID包含MIG,则需要检查是否启用了时间切片,并且请求的设备ID数量大于1
		if strings.Contains(req.DevicesIDs[0], "MIG") {
			if plugin.config.Sharing.TimeSlicing.FailRequestsGreaterThanOne && rm.AnnotatedIDs(req.DevicesIDs).AnyHasAnnotations() {
				if len(req.DevicesIDs) > 1 {
					device.PodAllocationFailed(nodename, current, NodeLockNvidia)
					return nil, fmt.Errorf("request for '%v: %v' too large: maximum request size for shared resources is 1", plugin.rm.Resource(), len(req.DevicesIDs))
				}
			}

			for _, id := range req.DevicesIDs {
				if !plugin.rm.Devices().Contains(id) {
					device.PodAllocationFailed(nodename, current, NodeLockNvidia)
					return nil, fmt.Errorf("invalid allocation request for '%s': unknown device: %s", plugin.rm.Resource(), id)
				}
			}

			response, err := plugin.getAllocateResponse(req.DevicesIDs)
			if err != nil {
				device.PodAllocationFailed(nodename, current, NodeLockNvidia)
				return nil, fmt.Errorf("failed to get allocate response: %v", err)
			}
			responses.ContainerResponses = append(responses.ContainerResponses, response)
		} else {
            // 如果请求的设备ID不包含MIG,则需要获取下一个设备请求
			currentCtr, devreq, err := GetNextDeviceRequest(nvidia.NvidiaGPUDevice, *current)
			klog.Infoln("deviceAllocateFromAnnotation=", devreq)
			if err != nil {
				device.PodAllocationFailed(nodename, current, NodeLockNvidia)
				return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
			}
            // 检查设备数量是否匹配
			if len(devreq) != len(reqs.ContainerRequests[idx].DevicesIDs) {
				device.PodAllocationFailed(nodename, current, NodeLockNvidia)
				return &kubeletdevicepluginv1beta1.AllocateResponse{}, errors.New("device number not matched")
			}
            // 获取容器设备字符串数组,通过设置NVIDIA_VISIBLE_DEVICES环境变量,将GPU设备ID传递给容器
			response, err := plugin.getAllocateResponse(plugin.GetContainerDeviceStrArray(devreq))
			if err != nil {
				return nil, fmt.Errorf("failed to get allocate response: %v", err)
			}
            // 清除下一个设备类型从annotation
			err = EraseNextDeviceTypeFromAnnotation(nvidia.NvidiaGPUDevice, *current)
			if err != nil {
				device.PodAllocationFailed(nodename, current, NodeLockNvidia)
				return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
			}
            // 如果操作模式不是MIG,则需要设置CUDA_DEVICE_MEMORY_LIMIT、CUDA_DEVICE_SM_LIMIT、CUDA_DEVICE_MEMORY_SHARED_CACHE、CUDA_OVERSUBSCRIBE、CUDA_DISABLE_CONTROL等环境变量
			if plugin.operatingMode != "mig" {
				for i, dev := range devreq {
					limitKey := fmt.Sprintf("CUDA_DEVICE_MEMORY_LIMIT_%v", i)
					response.Envs[limitKey] = fmt.Sprintf("%vm", dev.Usedmem)
				}
				response.Envs["CUDA_DEVICE_SM_LIMIT"] = fmt.Sprint(devreq[0].Usedcores)
				response.Envs["CUDA_DEVICE_MEMORY_SHARED_CACHE"] = fmt.Sprintf("%s/vgpu/%v.cache", hostHookPath, uuid.New().String())
				if plugin.schedulerConfig.DeviceMemoryScaling > 1 {
					response.Envs["CUDA_OVERSUBSCRIBE"] = "true"
				}
				if plugin.schedulerConfig.DisableCoreLimit {
					response.Envs[util.CoreLimitSwitch] = "disable"
				}
                // 删除容器缓存目录
				cacheFileHostDirectory := fmt.Sprintf("%s/vgpu/containers/%s_%s", hostHookPath, current.UID, currentCtr.Name)
				os.RemoveAll(cacheFileHostDirectory)
                // 创建容器缓存目录
				os.MkdirAll(cacheFileHostDirectory, 0777)
				os.Chmod(cacheFileHostDirectory, 0777)
                // 创建vgpulock目录
				os.MkdirAll("/tmp/vgpulock", 0777)
                // 设置vgpulock目录权限
				os.Chmod("/tmp/vgpulock", 0777)

				response.Mounts = append(response.Mounts,
                    // 挂载libvgpu.so
					&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu/libvgpu.so", hostHookPath),
						HostPath: hostHookPath + "/vgpu/libvgpu.so",
						ReadOnly: true},
                    // 挂载容器缓存目录 
					&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu", hostHookPath),
						HostPath: cacheFileHostDirectory,
						ReadOnly: false},
                    // 挂载vgpulock目录
					&kubeletdevicepluginv1beta1.Mount{ContainerPath: "/tmp/vgpulock",
						HostPath: "/tmp/vgpulock",
						ReadOnly: false},
				)
                // 检查CUDA_DISABLE_CONTROL环境变量是否存在
				found := false
				for _, val := range currentCtr.Env {
                    // 如果环境变量是CUDA_DISABLE_CONTROL,则跳过
					if strings.Compare(val.Name, "CUDA_DISABLE_CONTROL") == 0 {
						// if env existed but is set to false or can not be parsed, ignore
                        // 将环境变量值转换为布尔值
						t, _ := strconv.ParseBool(val.Value)
                        // 如果环境变量值为false,则跳过
						if !t {
							continue
						}
						// only env existed and set to true, we mark it "found"
						found = true
						break
					}
				}
                // 如果CUDA_DISABLE_CONTROL环境变量不存在,则需要预加载链接库
				if !found {
                    // 预加载链接库
					response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/etc/ld.so.preload",
						HostPath: hostHookPath + "/vgpu/ld.so.preload",
						ReadOnly: true},
					)
				}
                // 检查vgpu license是否存在
				_, err = os.Stat(fmt.Sprintf("%s/vgpu/license", hostHookPath))
				if err == nil {
                    // 挂载vgpu license
					response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
						ContainerPath: "/tmp/license",
						HostPath:      fmt.Sprintf("%s/vgpu/license", hostHookPath),
						ReadOnly:      true,
					})
                    // 挂载vgpuvalidator
					response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
						ContainerPath: "/usr/bin/vgpuvalidator",
						HostPath:      fmt.Sprintf("%s/vgpu/vgpuvalidator", hostHookPath),
						ReadOnly:      true,
					})
				}
			}
            // 将响应添加到响应列表中
			responses.ContainerResponses = append(responses.ContainerResponses, response)
		}
	}
	klog.Infoln("Allocate Response", responses.ContainerResponses)
    // 标记pod分配成功
	device.PodAllocationTrySuccess(nodename, nvidia.NvidiaGPUDevice, NodeLockNvidia, current)
	return &responses, nil
}

这些环境变量如何生效? cuda天生支持的?是HAMi-core能够解析自定义的CUDA_xxx环境变量,而NVIDIA_VISIBLE_DEVICES是nvidia-container-toolkit支持的

NVIDIA_VISIBLE_DEVICES -> GPU-a8243209-6b70-5b3d-de52-1aaafc1495fc
CUDA_DEVICE_MEMORY_LIMIT_0 -> 3000m
CUDA_DEVICE_SM_LIMIT -> 25
CUDA_DEVICE_MEMORY_SHARED_CACHE -> /usr/local/vgpu/55202346-8648-4a49-9037-51b28eeb74ba.cache

libvgpu.so替换完为什么就生效? 在Linux下预加载链接库有好几种方式,动态链接库加载优先级从高到低:

LD_PRELOAD > /etc/ld.so.preload > LD_LIBRARY_PATH > /etc/ld.so.conf.d/.conf > 默认系统路径(/lib, /usr/lib)

通过/etc/ld.so.preload定义,确保libvgpu.so肯定会被加载。cuda api如何劫持?这部分逻辑定义在HAMi-core实现里

5.5 vGPUMonitor调试环境

远程gpu服务器上dlv启动vGPUMonitor,本地goland添加Go Remote Debugger指向:2347

KUBECONFIG=/root/.kube/config \
NODE_NAME=vgpu \
NVIDIA_VISIBLE_DEVICES=all \
NVIDIA_MIG_MONITOR_DEVICES=all \
HOOK_PATH=/usr/local/vgpu \
dlv exec bin/vGPUmonitor --headless -l 0.0.0.0:2347 --api-version=2

5.6 vGPUMonitor核心逻辑

启动函数,程序的主要功能是监控vGPU的使用情况,并提供相应的指标收集和反馈机制

func main() {
    // 校验环境变量HOOK_PATH必须存在
	if err := ValidateEnvVars(); err != nil {
		klog.Fatalf("Failed to validate environment variables: %v", err)
	}
    // 创建一个容器列表器实例,用于跟踪和管理GPU容器Usage
	containerLister, err := nvidia.NewContainerLister()
	if err != nil {
		klog.Fatalf("Failed to create container lister: %v", err)
	}
	cgroupDriver = 0
    // 创建一个错误通道用于错误处理, 用于在不同goroutine间传递错误信息(看起来没有意义了,因为main函数里没有使用)
	errchannel := make(chan error)
	//go serveInfo(errchannel)
    // 初始化vGPUmonitor metrics
	go initMetrics(containerLister)
    // 持续监控容器使用情况
	go watchAndFeedback(containerLister)
    // 等待错误通道传递错误信息,errchannel实际上没用到
	for {
		err := <-errchannel
		klog.Errorf("failed to serve: %v", err)
	}
}

主要使用到的结构体

// ContainerLister结构体,用于存储容器列表使用信息
type ContainerLister struct {
	containerPath string
  // map存储容器使用信息,key为<podUID_containerName>
	containers    map[string]*ContainerUsage
  // 互斥锁,用于保护containers map的并发访问
	mutex         sync.Mutex
  // k8s clientSet
	clientset     *kubernetes.Clientset
}

// ContainerUsage结构体,用于存储单个容器使用信息
type ContainerUsage struct {
  // Pod 的唯一标识符
	PodUID        string
  // 容器名称
	ContainerName string
  // 容器使用信息
	data          []byte
	Info          UsageInfo
}

// UsageInfo接口,定义了获取和设置NVIDIA GPU设备使用情况的方法
type UsageInfo interface {
    // DeviceMax 返回系统中支持的最大GPU设备数量
    DeviceMax() int
    
    // DeviceNum 返回当前实际使用的GPU设备数量
    DeviceNum() int
    
    // DeviceMemoryContextSize 返回指定GPU设备的上下文内存大小
    DeviceMemoryContextSize(idx int) uint64
    
    // DeviceMemoryModuleSize 返回指定GPU设备的模块内存大小
    DeviceMemoryModuleSize(idx int) uint64
    
    // DeviceMemoryBufferSize 返回指定GPU设备的缓冲区内存大小
    DeviceMemoryBufferSize(idx int) uint64
    
    // DeviceMemoryOffset 返回指定GPU设备的内存偏移量
    DeviceMemoryOffset(idx int) uint64
    
    // DeviceMemoryTotal 返回指定GPU设备的总内存大小
    DeviceMemoryTotal(idx int) uint64
    
    // DeviceSmUtil 返回指定GPU设备的SM(Streaming Multiprocessor)利用率
    DeviceSmUtil(idx int) uint64
    
    // SetDeviceSmLimit 设置GPU设备的SM使用限制
    SetDeviceSmLimit(l uint64)
    
    // IsValidUUID 检查指定GPU设备的UUID是否有效
    IsValidUUID(idx int) bool
    
    // DeviceUUID 返回指定GPU设备的UUID
    DeviceUUID(idx int) string
    
    // DeviceMemoryLimit 返回指定GPU设备的内存使用限制
    DeviceMemoryLimit(idx int) uint64
    
    // SetDeviceMemoryLimit 设置GPU设备的内存使用限制
    SetDeviceMemoryLimit(l uint64)
    
    // LastKernelTime 返回最后一次内核执行的时间戳
    LastKernelTime() int64
    
    // UsedMemory 返回指定GPU设备的已使用内存(当前已注释)
    //UsedMemory(idx int) (uint64, error)
    
    // GetPriority 返回GPU任务的优先级
    GetPriority() int
    
    // GetRecentKernel 获取最近的内核执行状态
    GetRecentKernel() int32
    
    // SetRecentKernel 设置最近的内核执行状态
    SetRecentKernel(v int32)
    
    // GetUtilizationSwitch 获取利用率开关状态
    GetUtilizationSwitch() int32
    
    // SetUtilizationSwitch 设置利用率开关状态
    SetUtilizationSwitch(v int32)
}

容器列表器实例,如何更新containers map?

func (l *ContainerLister) Update() error {
    // 是否设置NODE_NAME环境变量
	nodename := os.Getenv(util.NodeNameEnvName)
	if nodename == "" {
		return fmt.Errorf("env %s not set", util.NodeNameEnvName)
	}
    // 获取指定节点上的所有Pod
	pods, err := l.clientset.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
		FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodename),
	})
	if err != nil {
		return err
	}
    // 加锁,保护containers map的并发访问
	l.mutex.Lock()
	defer l.mutex.Unlock()
    // 读取容器目录下的所有文件
	entries, err := os.ReadDir(l.containerPath)
	if err != nil {
		return err
	}
    // 遍历容器目录下的所有文件
	for _, entry := range entries {
		if !entry.IsDir() {
			continue
		}
        // 获取容器目录的完整路径
        // containerPath: /usr/local/vgpu/containers
        // entry.Name(): 305df464-2a9e-485d-9c80-f2901a9259cc_pytorch-container
		dirName := filepath.Join(l.containerPath, entry.Name())
        // 检查容器是否存在,如果不存在的话
		if !isValidPod(entry.Name(), pods) {
            // 获取容器目录的元数据
			dirInfo, err := os.Stat(dirName)
            // 如果容器目录存在,并且修改时间在300秒内,则跳过
			if err == nil && dirInfo.ModTime().Add(time.Second*300).After(time.Now()) {
				continue
			}
			klog.Infof("Removing dirname %s in monitorpath", dirName)
            // 判断map中是否存在该容器
			if c, ok := l.containers[entry.Name()]; ok {
                // 如果存在,则释放内存
				syscall.Munmap(c.data)
                // 删除map中的该容器
				delete(l.containers, entry.Name())
			}
            // 删除容器目录
			_ = os.RemoveAll(dirName)
			continue
		}
        // 检查容器是否存在,如果存在的话
        // 判断map中是否存在该容器
		if _, ok := l.containers[entry.Name()]; ok {
			continue
		}
        // 加载容器使用信息
		usage, err := loadCache(dirName)
		if err != nil {
			klog.Errorf("Failed to load cache: %s, error: %v", dirName, err)
			continue
		}
        // 如果容器使用信息为空,则跳过
		if usage == nil {
			// no cuInit in container
			continue
		}
        // 获取pod uid和container name
		usage.PodUID = strings.Split(entry.Name(), "_")[0]
		usage.ContainerName = strings.Split(entry.Name(), "_")[1]
        // 将容器使用信息添加到map中
		l.containers[entry.Name()] = usage
		klog.Infof("Adding ctr dirname %s in monitorpath", dirName)
	}
	return nil
}

loadCache函数,如何加载容器使用信息?

func loadCache(fpath string) (*ContainerUsage, error) {
	klog.Infof("Checking path %s", fpath)
    // 读取容器目录下的所有文件,形如/usr/local/vgpu/containers/305df464-2a9e-485d-9c80-f2901a9259cc_pytorch-container/
	files, err := os.ReadDir(fpath)
	if err != nil {
		return nil, err
	}
    // 检查文件数量,如果文件数量大于2,则返回错误
	if len(files) > 2 {
		return nil, errors.New("cache num not matched")
	}
    // 如果文件数量为0,则返回nil
	if len(files) == 0 {
		return nil, nil
	}
	cacheFile := ""
	for _, val := range files {
        // 检查文件名是否包含libvgpu.so
		if strings.Contains(val.Name(), "libvgpu.so") {
			continue
		}
        // 检查文件名是否包含.cache
		if !strings.Contains(val.Name(), ".cache") {
			continue
		}
        // 获取cache文件的完整路径
		cacheFile = filepath.Join(fpath, val.Name())
		break
	}
    // 如果cacheFile为空,则返回nil
	if cacheFile == "" {
		klog.Infof("No cache file in %s", fpath)
		return nil, nil
	}
    // 获取cache文件的元数据
	info, err := os.Stat(cacheFile)
	if err != nil {
		klog.Errorf("Failed to stat cache file: %s, error: %v", cacheFile, err)
		return nil, err
	}
    // 检查cache文件大小,如果小于headerT结构体的大小,则返回错误
	if info.Size() < int64(unsafe.Sizeof(headerT{})) {
		return nil, fmt.Errorf("cache file size %d too small", info.Size())
	}
    // 打开cache文件
	f, err := os.OpenFile(cacheFile, os.O_RDWR, 0666)
	if err != nil {
		klog.Errorf("Failed to open cache file: %s, error: %v", cacheFile, err)
		return nil, err
	}

	defer func(f *os.File) {
		_ = f.Close()
	}(f)
	usage := &ContainerUsage{}
    // 将cache文件映射到内存
    // - 内存映射文件
    // 参数说明:
    // - fd: 文件描述符
    // - offset: 从文件开始处的偏移量
    // - length: 要映射的字节数
    // - prot: 内存保护标志 (读/写)
    // - flags: 映射标志
	usage.data, err = syscall.Mmap(int(f.Fd()), 0, int(info.Size()), syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED)
	if err != nil {
		klog.Errorf("Failed to mmap cache file: %s, error: %v", cacheFile, err)
		return nil, err
	}
    // 将[]byte转换为结构体指针
	head := (*headerT)(unsafe.Pointer(&usage.data[0]))
    // 校验initializedFlag,不清楚为什么SharedRegionMagicFlag=19920718
	if head.initializedFlag != SharedRegionMagicFlag {
        // 释放内存映射
		_ = syscall.Munmap(usage.data)
		return nil, fmt.Errorf("cache file magic flag not matched")
	}
    // 检查cache文件大小,为什么跟1197897比较,看起来是针对特定版本的NVIDIA GPU监控数据结构的大小
    // usage.data 是通过 syscall.Mmap 映射的文件内容
    // v0.CastSpec 和 v1.CastSpec 分别处理不同版本的数据格式
    // 两个版本都实现了UsageInfo接口
	if info.Size() == 1197897 {
		usage.Info = v0.CastSpec(usage.data)
	} else if head.majorVersion == 1 {
		usage.Info = v1.CastSpec(usage.data)
	} else {
		_ = syscall.Munmap(usage.data)
		return nil, fmt.Errorf("unknown cache file size %d version %d.%d", info.Size(), head.majorVersion, head.minorVersion)
	}
	return usage, nil
}

vGPUmonitor metrics,如何初始化?

// NewClusterManager first creates a Prometheus-ignorant ClusterManager
// instance. Then, it creates a ClusterManagerCollector for the just created
// ClusterManager. Finally, it registers the ClusterManagerCollector with a
// wrapping Registerer that adds the zone as a label. In this way, the metrics
// collected by different ClusterManagerCollectors do not collide.
func NewClusterManager(zone string, reg prometheus.Registerer, containerLister *nvidia.ContainerLister) *ClusterManager {
    // 创建一个ClusterManager实例
	c := &ClusterManager{
		Zone:            zone,
		containerLister: containerLister,
	}
    // 创建一个PodLister实例,用于获取Pod信息
	informerFactory := informers.NewSharedInformerFactoryWithOptions(containerLister.Clientset(), time.Hour*1)
	c.PodLister = informerFactory.Core().V1().Pods().Lister()
	stopCh := make(chan struct{})
    // 启动informerFactory,用于获取Pod信息
	informerFactory.Start(stopCh)
    // 创建一个ClusterManagerCollector实例,用于收集metrics
	cc := ClusterManagerCollector{ClusterManager: c}
    // 将ClusterManagerCollector实例注册到prometheus
	prometheus.WrapRegistererWith(prometheus.Labels{"zone": zone}, reg).MustRegister(cc)
	return c
}

如何持续监控容器使用情况?

func watchAndFeedback(lister *nvidia.ContainerLister) {
    // 初始化 NVIDIA Management Library
    nvml.Init()

    // 无限循环,持续监控
    for {
        // 每5秒执行一次
        time.Sleep(time.Second * 5)

        // 更新容器列表器实例的containers map
        err := lister.Update()
        if err != nil {
            // 如果更新失败,记录错误并继续下一次循环
            klog.Errorf("Failed to update container list: %v", err)
            continue
        }

        // 观察并处理容器状态
        Observe(lister)
    }
}
// Observe 监控和管理容器的 GPU 使用状态
func Observe(lister *nvidia.ContainerLister) {
    // 初始化设备利用率开关映射,key是GPU UUID,value是每个优先级的使用计数
    utSwitchOn := map[string]UtilizationPerDevice{}
    // 获取containers map
    containers := lister.ListContainers()

    // 第一次遍历:统计每个 GPU 设备的使用情况
    for _, c := range containers {
        // 获取容器的最近内核使用计数
        recentKernel := c.Info.GetRecentKernel()
        if recentKernel > 0 {
            // 递减内核使用计数
            recentKernel--
            if recentKernel > 0 {
                // 遍历容器使用的所有 GPU 设备
                for i := 0; i < c.Info.DeviceMax(); i++ {
                    // 跳过无效的 GPU UUID
                    if !c.Info.IsValidUUID(i) {
                        continue
                    }
                    // 获取 GPU 的 UUID
                    uuid := c.Info.DeviceUUID(i)
                    // 如果是新设备,初始化优先级计数数组 [0, 0]
                    if len(utSwitchOn[uuid]) == 0 {
                        utSwitchOn[uuid] = []int{0, 0}
                    }
                    // 增加对应优先级的使用计数
                    utSwitchOn[uuid][c.Info.GetPriority()]++
                }
            }
            // 更新容器的内核使用计数
            c.Info.SetRecentKernel(recentKernel)
        }
    }

    // 第二次遍历:根据统计结果更新容器状态
    for idx, c := range containers {
        priority := c.Info.GetPriority()
        recentKernel := c.Info.GetRecentKernel()
        utilizationSwitch := c.Info.GetUtilizationSwitch()

        // 检查是否需要阻塞(有更高优先级的任务在使用)
        if CheckBlocking(utSwitchOn, priority, c) {
            if recentKernel >= 0 {
                klog.Infof("utSwitchon=%v", utSwitchOn)
                klog.Infof("Setting Blocking to on %v", idx)
                // 设置阻塞状态
                c.Info.SetRecentKernel(-1)
            }
        } else {
            if recentKernel < 0 {
                klog.Infof("utSwitchon=%v", utSwitchOn)
                klog.Infof("Setting Blocking to off %v", idx)
                // 解除阻塞状态
                c.Info.SetRecentKernel(0)
            }
        }

        // 检查优先级冲突(同优先级是否有多个任务)
        if CheckPriority(utSwitchOn, priority, c) {
            if utilizationSwitch != 1 {
                klog.Infof("utSwitchon=%v", utSwitchOn)
                klog.Infof("Setting UtilizationSwitch to on %v", idx)
                // 开启利用率限制
                c.Info.SetUtilizationSwitch(1)
            }
        } else {
            if utilizationSwitch != 0 {
                klog.Infof("utSwitchon=%v", utSwitchOn)
                klog.Infof("Setting UtilizationSwitch to off %v", idx)
                // 关闭利用率限制
                c.Info.SetUtilizationSwitch(0)
            }
        }
    }
}

这个函数实现了一个优先级基础的 GPU 资源管理机制:

  • 统计每个 GPU 设备上不同优先级任务的数量
  • 根据优先级规则决定是否阻塞低优先级任务
  • 处理同优先级任务的资源竞争
  • 通过日志记录状态变化

这种机制可以确保:

  • 高优先级任务优先获得GPU资源
  • 同优先级任务公平共享GPU资源
  • 低优先级任务在资源充足时才能运行

参考链接

「真诚赞赏,手留余香」

爱折腾的工程师

真诚赞赏,手留余香

使用微信扫描二维码完成支付