vLLM学习笔记(AI编程工具分析)

Posted by iceyao on Monday, February 10, 2025

1. vLLM是什么

vLLM是由伯克利大学LMSYS组织开源的大语言模型高速推理框架,旨在极大提升实时场景下语言模型服务的吞吐量和内存使用效率。vLLM采用了PagedAttention算法,通过引入操作系统中的虚拟内存和分页管理思想,有效管理注意力机制中的键值(KV cache),显著提高了显存利用率,减少了显存碎片。支持分布式推理,能够在多台GPU上并行运行模型。

2. 源码编译安装

以v0.7.2 tag的代码为例

(vllm) root@vgpu:/data# git clone https://github.com/vllm-project/vllm.git
(vllm) root@vgpu:/data# git checkout v0.7.3
(vllm) root@vgpu:/data# cd vllm

# export MAX_JOBS=6 # 编译并发数设置,默认值是系统CPU核数
(vllm) root@vgpu:/data/vllm# VLLM_USE_PRECOMPILED=1 pip install -e .  # 只修改了Python代码,不需要重新编译

(vllm) root@vgpu:~# python
Python 3.12.9 | packaged by Anaconda, Inc. | (main, Feb  6 2025, 18:56:27) [GCC 11.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import vllm
>>> print(vllm.__file__)
/data/vllm/vllm/__init__.py   # 从这个输出可以看到vllm的源码是在python包中,所以可以直接修改源码,不需要重新编译

如果要完整构建,包含编译的话

pip install -e .

vllm项目涉及多种开发语言,其中85%是python、10%是cuda、3.3%是C++,剩下是shell、cmake、dockerfile脚本相关.涉及到静态编译语言的修改,通过conda install ccacheapt install ccache来安装ccache可以加速编译。除了ccache外,sccache也可以实现类似功能,功能甚至更强大,还支持分布式编译。

3. 源码分析

以v0.7.3 tag的代码为例,使用vscode Remote SSH方式进行源码分析

vllm serve /data/Qwen2.5-0.5B/ --served-model-name qwen

要用到vscode IDE来调试代码,所以必须vscode来启动vllm apiserver服务,上述命令行启动方式转成vscode launch.json方式

{
    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "name": "vllm serve",
            "type": "debugpy",
            "request": "launch",
            // "program": "/data/miniconda3/envs/vllm/bin/vllm",
            "module": "vllm.entrypoints.cli.main",
            "justMyCode": false,
            "args": [
                "serve",
                "/data/Qwen1.5-0.5B-Chat/",
                "--served-model-name",
                "qwen"
            ],
            "console": "integratedTerminal"
        },
    ]
}

启动完vllm-apiserver后,查看显存使用情况

(vllm) root@vgpu:/data/vllm# nvidia-smi 
Wed Feb 26 07:20:12 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.135                Driver Version: 550.135        CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|=========================================+========================+======================|
|   0  NVIDIA GeForce RTX 3090        Off |   00000000:00:05.0 Off |                  N/A |
| 36%   40C    P8              8W /  370W |   20854MiB /  24576MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                                                         
+-----------------------------------------------------------------------------------------+
| Processes:                                                                              |
|  GPU   GI   CI        PID   Type   Process name                              GPU Memory |
|        ID   ID                                                               Usage      |
|=========================================================================================|
|    0   N/A  N/A     10257      C   /data/miniconda3/envs/vllm/bin/python       20836MiB |
+-----------------------------------------------------------------------------------------+

待vllm apiserver完全启动后,可以通过curl命令行来测试

# model就是前面启动服务时的--served-model-name参数
(vllm) root@vgpu:/data/vllm# curl http://localhost:8000/v1/chat/completions -H "Content-Type: application/json" -d '{
        "model": "qwen",
        "messages": [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "你是谁?"}
        ]
    }'
{"id":"chatcmpl-44c835030dcb4aaba493b051a716d269","object":"chat.completion","created":1740560485,"model":"qwen","choices":[{"index":0,"message":{"role":"assistant","reasoning_content":null,"content":"我是来自阿里云的超大规模语言模型。我叫通义千问,是阿里云自主研发的预训练语言模型。我与人类科学家们一同研究,努力达到最高的准确率和性能。","tool_calls":[]},"logprobs":null,"finish_reason":"stop","stop_reason":null}],"usage":{"prompt_tokens":22,"total_tokens":66,"completion_tokens":44,"prompt_tokens_details":null},"prompt_logprobs":null}

代码结构

vllm/
├── vllm/                          # 主要源代码目录
│   ├── core/                      # 核心功能实现
│   ├── engine/                    # 推理引擎
│   ├── model_executor/           # 模型执行器
│   ├── executor/                 # 执行器实现
│   ├── attention/                # 注意力机制实现
│   ├── distributed/              # 分布式计算支持
│   ├── worker/                   # 工作进程实现
│   ├── device_allocator/         # 设备内存分配
│   ├── vllm_flash_attn/         # Flash Attention 优化
│   ├── triton_utils/            # Triton 相关工具
│   ├── transformers_utils/       # HuggingFace 工具
│   ├── lora/                    # LoRA 适配器
│   ├── multimodal/              # 多模态支持
│   ├── prompt_adapter/          # 提示词适配器
│   ├── spec_decode/             # 特殊解码实现
│   ├── platforms/               # 平台特定实现
│   ├── plugins/                 # 插件系统
│   ├── adapter_commons/         # 适配器公共组件
│   ├── inputs/                  # 输入处理
│   ├── compilation/             # 编译相关
│   ├── logging_utils/           # 日志工具
│   ├── profiler/               # 性能分析工具
│   ├── usage/                  # 使用统计
│   ├── entrypoints/           # 入口点定义
│   ├── third_party/           # 第三方代码
│   └── v1/                    # v1 版本代码
├── csrc/                        # C++源代码目录
│   ├── core/                    # C++核心实现
│   ├── attention/              # 注意力机制CUDA实现
│   ├── mamba/                  # Mamba架构支持
│   ├── quantization/           # 量化实现
│   ├── sparse/                 # 稀疏计算支持
│   ├── moe/                    # Mixture of Experts
│   ├── cutlass_extensions/     # CUTLASS库扩展
│   ├── prepare_inputs/         # 输入预处理
│   ├── cpu/                    # CPU实现
│   └── rocm/                   # AMD ROCm支持
├── examples/                    # 示例代码目录
│   ├── online_serving/         # 在线服务示例
│   ├── offline_inference/      # 离线推理示例
│   ├── ice_test/              # ICE测试
│   └── other/                 # 其他示例
│   └── *.jinja                # 各种模型的对话模板
├── tests/                      # 测试代码目录
│   ├── basic_correctness/     # 基础正确性测试
│   ├── kernels/              # 内核测试
│   ├── model_executor/       # 模型执行器测试
│   ├── multimodal/          # 多模态测试
│   ├── distributed/         # 分布式测试
│   ├── quantization/        # 量化测试
│   ├── lora/               # LoRA测试
│   ├── tool_use/           # 工具使用测试
│   ├── spec_decode/        # 特殊解码测试
│   ├── prompt_adapter/     # 提示词适配器测试
│   ├── prefix_caching/     # 前缀缓存测试
│   ├── mq_llm_engine/      # MQ LLM引擎测试
│   ├── neuron/            # AWS Neuron测试
│   ├── tpu/               # TPU支持测试
│   ├── worker/            # 工作进程测试
│   └── vllm_test_utils/   # 测试工具
├── benchmarks/              # 性能基准测试
├── tools/                   # 工具脚本
├── cmake/                   # CMake构建配置
├── requirements/            # 依赖配置文件
│   ├── requirements-common.txt    # 通用依赖
│   ├── requirements-cuda.txt      # CUDA依赖
│   ├── requirements-cpu.txt       # CPU依赖
│   ├── requirements-rocm.txt      # ROCm依赖
│   ├── requirements-tpu.txt       # TPU依赖
│   ├── requirements-neuron.txt    # AWS Neuron依赖
│   ├── requirements-hpu.txt       # HPU依赖
│   ├── requirements-xpu.txt       # Intel XPU依赖
│   ├── requirements-openvino.txt  # OpenVINO依赖
│   └── requirements-test.txt      # 测试依赖

vLLM启动流程

  1. 入口点分析: vLLM的主要入口在vllm/entrypoints/cli/main.py中,它定义了两个主要的命令模块:
  • vllm.entrypoints.cli.openai - OpenAI兼容的API命令
  • vllm.entrypoints.cli.serve - 服务器启动命令
  1. serve命令的实现: serve命令的主要实现在vllm/entrypoints/cli/serve.py中的ServeSubcommand类:
class ServeSubcommand(CLISubcommand):
    def cmd(args: argparse.Namespace) -> None:
        # 将model_tag参数赋值给args.model
        args.model = args.model_tag
        # 使用uvloop运行服务器
        uvloop.run(run_server(args))
  1. 服务器启动流程: 服务器的启动流程在vllm/entrypoints/openai/api_server.py中的run_server函数:
async def run_server(args, **uvicorn_kwargs) -> None:
    # 1. 初始化engine client,建立zmp server
    async with build_async_engine_client(args) as engine_client:
        # 2. 构建FastAPI应用
        app = build_app(args)
        
        # 3. 获取模型配置
        model_config = await engine_client.get_model_config()
        
        # 4. 初始化应用状态
        await init_app_state(engine_client, model_config, app.state, args)
        
        # 5. 启动HTTP服务器
        shutdown_task = await serve_http(
            app,
            host=args.host,
            port=args.port,
            ...
        )
  1. 应用状态初始化:

    init_app_state函数中,会初始化各种服务组件:

    • OpenAIServingModels - 管理模型加载
    • OpenAIServingChat - 处理聊天完成请求
    • OpenAIServingCompletion - 处理文本完成请求
    • OpenAIServingPooling - 处理池化请求
    • OpenAIServingEmbedding - 处理嵌入请求
    • OpenAIServingScores - 处理评分请求
    • OpenAIServingTokenization - 处理分词请求
  2. 主要API接口:

    vLLM提供了多个OpenAI兼容的API端点:

    • /v1/completions - 文本生成API
    • /v1/chat/completions - 聊天完成API
    • /v1/embeddings - 文本嵌入API
    • /v1/audio/transcriptions - 语音转写API
    • /tokenize, /detokenize - 分词API

总结一下vLLM的启动流程:

命令行解析(解析serve命令及其参数) -> 引擎初始化(初始化LLM引擎和相关组件) -> 应用构建(构建FastAPI应用并注册路由) -> 状态初始化(初始化各种服务组件) -> 服务启动(启动HTTP服务器处理请求)

vLLM整体结构

以下展示了vLLM的类层次结构:

                        +-------------------------+
                        |  LLM Engine(vllm_config)|
                        +-------------------------+
                                   |
                                   v
                        +-------------------------+
                        |  Executor(vllm_config)  |
                        +-------------------------+
                                   |
                    +-----------------------------+
                    |                             |
        Rank 0      v             ...        Rank N-1 v
+-------------------------+            +-------------------------+
|   Worker(vllm_config)   |            |   Worker(vllm_config)   |
+-------------------------+            +-------------------------+
            |                                      |
            v                                      v
+-------------------------+            +-------------------------+
| Model Runner(vllm_config)|           | Model Runner(vllm_config)|
+-------------------------+            +-------------------------+
            |                                      |
            v                                      v
+-------------------------+            +-------------------------+
| Model(vllm_config,prefix)|           | Model(vllm_config,prefix)|
+-------------------------+            +-------------------------+

这个展示了vLLM的主要组件架构:

  • 最顶层是LLM Engine
  • 然后是Executor
  • 接着分成多个Worker (从Rank 0到Rank N-1)
  • 每个Worker下面都有Model Runner
  • 最底层是Model实例

LLM Engine

LLMEngineAsyncLLMEngine类是vLLM系统运行的核心,负责处理模型推理和异步请求处理。

                                LLM Engine
                                   
    +-------------+                                   +--------------------+
    |  LLM class  |                                   | OpenAI-compatible  |
    +-------------+                                   |    API Server      |
           |                                          +--------------------+
           v                                                   |
    +------------------+              ←               +------------------+
    |    LLMEngine     | <--------------------------- | AsyncLLMEngine   |
    +------------------+                              +------------------+
           |
           |
    +------+------------+------------+------------+
    |                   |            |            |
    v                   v            v            v
+----------+    +-----------+  +-----------+ +-----------+
|  Input   |    |           |  |   Model   | |  Output   |
|Processing|    |Scheduling |  |Execution  | |Processing |
+----------+    +-----------+  +-----------+ +-----------+

LLMEngine包括Input Processing、Scheduling、Model Execution和Output Processing.

  • Input Processing:使用指定的分词器处理输入文本的分词
  • Scheduling:选择在每个步骤中处理哪些请求
  • Model Execution:管理大语言模型的执行,包括在多个GPU上的分布式执行
  • Output Processing:对模型生成的输出进行处理,将大语言模型中的令牌ID解码为人类可读的文本

核心数据结构:

class LLMEngine:
    """An LLM engine that receives requests and generates texts.

    This is the main class for the vLLM engine. It receives requests
    from clients and generates texts from the LLM. It includes a tokenizer, a
    language model (possibly distributed across multiple GPUs), and GPU memory
    space allocated for intermediate states (aka KV cache). This class utilizes
    iteration-level scheduling and efficient memory management to maximize the
    serving throughput.
    """

LLM Engine在初始化时创建多个关键组件:

  1. Model Executor:负责执行模型的前向传播
  2. Scheduler:负责调度序列组和管理资源
  3. Cache Engine:管理KV缓存的分配和释放

LLM Engine关键方法分析

add_request方法

add_request方法是添加新推理请求的入口点:

def add_request(
    self,
    request_id: str,
    prompt: str,
    sampling_params: SamplingParams,
    prompt_token_ids: Optional[List[int]] = None,
    arrival_time: Optional[float] = None,
    lora_request: Optional[LoRARequest] = None,
    trace_headers: Optional[Mapping[str, str]] = None,
    prompt_adapter_request: Optional[PromptAdapterRequest] = None,
    pooling_params: Optional[PoolingParams] = None,
    encoder_input: Optional[str] = None,
    encoder_input_token_ids: Optional[List[int]] = None,
    priority: int = 0,
) -> None:
    """Add a request to the engine's request pool.

    Args:
        request_id: The unique ID of the request.
        prompt: The prompt string.
        sampling_params: The sampling parameters for text generation.
        prompt_token_ids: The token IDs of the prompt. If None, we
            use the tokenizer to convert the prompt to token IDs.
        arrival_time: The arrival time of the request. If None, we use
            the current monotonic time.
        lora_request: The LoRA request. If None, we don't use LoRA.
        trace_headers: HTTP tracing headers. If None, we don't use tracing.
        prompt_adapter_request: The prompt adapter request. If None,
            we don't use prompt adapter.
        pooling_params: The pooling parameters for the request. If
            provided with a valid pooling parameter, the request will
            not return text and will instead return pooled values.
        encoder_input: The encoder input. Only valid for encoder-decoder
            models.
        encoder_input_token_ids: The token IDs of the encoder input.
            If None, we use the tokenizer to convert the encoder input 
            to token IDs. Only valid for encoder-decoder models.
        priority: The priority of the request. Lower number means
            higher priority.
    """

该方法的主要步骤包括:

  1. 检查请求参数的有效性
  2. 构造Sequence对象
  3. 创建SequenceGroup对象
  4. 通过调度器将请求添加到等待队列中

step方法

step方法是推理循环的核心,负责执行一步推理:

def step(self) -> List[Union[RequestOutput, PoolingRequestOutput]]:
    """Performs one decoding iteration and returns newly generated results.

    .. figure:: https://i.imgur.com/sv2HssD.png
        :alt: Overview of the step function
        :align: center

        Overview of the step function.
    """

该方法的主要步骤包括:

  1. 调用调度器的schedule方法,获取要执行的序列组和资源分配信息
  2. 通过Model Executor执行模型推理
  3. 处理模型输出并更新序列状态
  4. 生成并返回结果
# 关键步骤1: 调度序列组
(seq_group_metadata_list, scheduler_outputs, allow_async_output_proc) = self.scheduler[virtual_engine].schedule()

# 关键步骤2: 执行模型
execute_model_req = ExecuteModelRequest(
    seq_group_metadata_list=seq_group_metadata_list,
    blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
    blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
    blocks_to_copy=scheduler_outputs.blocks_to_copy,
    num_lookahead_slots=scheduler_outputs.num_lookahead_slots,
    running_queue_size=scheduler_outputs.running_queue_size,
    finished_requests_ids=finished_requests_ids,
    last_sampled_token_ids=last_sampled_token_ids)

outputs = self.model_executor.execute_model(execute_model_req=execute_model_req)

# 关键步骤3: 处理输出
self._process_model_outputs(ctx=ctx)

Scheduler

Scheduler是vLLM中负责调度序列组和管理资源的组件。它实现了迭代级调度,这是vLLM高性能的关键技术之一。调度器维护了三个主要队列:

# 调度器初始化
def __init__(self, ...):
    # 正在运行的序列组
    self.running: Deque[SequenceGroup] = deque()
    # 在CPU中交换的序列组
    self.swapped: Deque[SequenceGroup] = deque()
    # 等待执行的序列组
    self.waiting: Deque[SequenceGroup] = deque()

调度策略实现细节,调度器实现了多种调度策略,其中最核心的是schedule方法:

def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs, bool]:
    """Schedule sequence groups for execution.
    
    Returns a list of sequence group metadata, scheduling decisions,
    and whether to allow async output processing.
    """
    # 调用_schedule方法获取调度决策
    scheduler_outputs = self._schedule()
    
    # 构建元数据列表
    seq_group_metadata_list = []
    for scheduled_seq_group in scheduler_outputs.scheduled_seq_groups:
        seq_group = scheduled_seq_group.seq_group
        # ...构建元数据...
        seq_group_metadata_list.append(seq_group_metadata)
    
    # 决定是否允许异步输出处理
    allow_async_output_proc = self._prepare_allow_async_output_proc(...)
    
    return seq_group_metadata_list, scheduler_outputs, allow_async_output_proc

调度器在每次迭代中使用以下策略决定要执行的序列:

  1. 优先调度已在运行的序列(继续生成)
  2. 其次调度被换出的序列(从CPU内存恢复)
  3. 最后调度等待队列中的序列(开始新的推理)

分块预填充实现,vLLM支持分块预填充(Chunked Prefill),允许长序列分多次完成预填充:

def _schedule_chunked_prefill(self) -> SchedulerOutputs:
    """Schedule with chunked prefill enabled."""
    # 创建调度预算
    budget = self._create_scheduling_budget(...)
    
    # 收集当前LoRA ID
    curr_loras = self._collect_curr_loras()
    
    # 计算部分预填充元数据
    partial_prefill_metadata = PartialPrefillMetadata.from_queues(
        self.running, self.waiting, self.scheduler_config)
    
    # 调度正在运行的序列
    running_outputs = self._schedule_running(
        budget, curr_loras, enable_chunking=True,
        partial_prefill_metadata=partial_prefill_metadata)
    
    # 调度已交换的序列
    swapped_outputs = self._schedule_swapped(
        budget, curr_loras, enable_chunking=True)
    
    # 调度等待的序列
    prefill_outputs = self._schedule_prefills(
        budget, curr_loras, enable_chunking=True,
        partial_prefill_metadata=partial_prefill_metadata)
    
    # ...合并调度结果...

Worker

Worker是实际执行模型计算的组件,通常与一个GPU关联,负责维护KV缓存和执行模型前向传播。

Worker类继承结构

class WorkerBase:
    """Base class for worker implementations."""
    
class LocalOrDistributedWorkerBase(WorkerBase):
    """Base class for workers that can run locally or in distributed mode."""
    
class Worker(LocalOrDistributedWorkerBase):
    """A worker class that executes (a partition of) the model on a GPU."""
    
# 其他特定平台的Worker实现
class CPUWorker(WorkerBase): ...
class TPUWorker(WorkerBase): ...
class HPUWorker(WorkerBase): ...
class XPUWorker(WorkerBase): ...
class OpenVINOWorker(WorkerBase): ...

Worker核心方法分析

class Worker(LocalOrDistributedWorkerBase):
    """A worker class that executes (a partition of) the model on a GPU.

    Each worker is associated with a single GPU. The worker is responsible for
    maintaining the KV cache and executing the model on the GPU. In case of
    distributed inference, each worker is assigned a partition of the model.
    """

    def __init__(self, vllm_config: VllmConfig, local_rank: int, rank: int, ...):
        # 初始化Worker,设置设备、加载模型等
        
    def execute_model(self, execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
        """Execute the model with the given input sequences."""
        # 1. 处理输入请求
        # 2. 进行块交换(swap in/out)操作
        # 3. 调用model_runner执行模型
        # 4. 返回采样输出

execute_model方法的核心实现:

def execute_model(self, execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
    # 在执行模型前进行内存管理
    self._process_blocks_to_swap_in(execute_model_req.blocks_to_swap_in)
    self._process_blocks_to_swap_out(execute_model_req.blocks_to_swap_out)
    self._process_blocks_to_copy(execute_model_req.blocks_to_copy)
    
    # 准备执行请求
    model_inputs = self._prepare_model_inputs(
        execute_model_req.seq_group_metadata_list)
    
    # 利用model_runner执行模型
    if execute_model_req.last_sampled_token_ids:
        sampled_token_ids = execute_model_req.last_sampled_token_ids
    else:
        # 获取model_runner
        model_runner = self._get_model_runner()
        
        # 执行模型并获取输出
        outputs = model_runner.execute(model_inputs)
        
    return outputs

Model Runner

Model Runner负责管理模型执行的具体细节,包括准备输入张量、执行模型前向传播,以及处理输出。

Model Runner类继承结构

class ModelRunnerBase:
    """Base class for model runner implementations."""
    
class GPUModelRunnerBase(ModelRunnerBase):
    """Helper class for shared methods between GPU model runners."""
    
class ModelRunner(GPUModelRunnerBase):
    """GPU model runner with sampling step."""
    
# 其他特定类型的Model Runner
class MultiStepModelRunner(GPUModelRunnerBase): ...
class EncoderDecoderModelRunner(GPUModelRunnerBase): ...
class PoolingModelRunner(GPUModelRunnerBase): ...
class CPUModelRunner(ModelRunnerBase): ...
class NeuronModelRunner(ModelRunnerBase): ...
class TPUModelRunner(ModelRunnerBase): ...

Model Runner执行模型的核心实现

def execute(self, model_input: ModelInputForGPUWithSamplingMetadata) -> List[SamplerOutput]:
    """Execute model on the given input.
    
    Args:
        model_input: The input to the model, including tokens, positions,
                     attention metadata, etc.
                     
    Returns:
        The output from the model's sampling step.
    """
    # 获取批次大小和虚拟引擎索引
    batch_size = model_input.batch_size
    virtual_engine = model_input.virtual_engine
    
    # 获取模型输入
    input_tokens = model_input.input_tokens
    input_positions = model_input.input_positions
    
    # 获取注意力元数据
    attn_metadata = model_input.attn_metadata
    seq_lens = attn_metadata.seq_lens_tensor
    
    # 使用CUDA图捕获执行
    if use_cuda_graph:
        # 获取或创建CUDA图运行器
        graph_runner = self._get_graph_runner(...)
        # 使用CUDA图执行模型
        logits = graph_runner(
            input_tokens, input_positions, intermediate_inputs,
            self.kv_caches[virtual_engine], attn_metadata, self.graph_memory_pool)
    else:
        # 直接执行模型
        with set_forward_context(attn_metadata, self.vllm_config, virtual_engine):
            logits = self.model(...model_inputs...)
    
    # 执行采样
    sampler_output = self._run_sampling(
        logits, sampling_metadata, sampling_metadata_cache)
    
    return sampler_output

CUDA图优化,vLLM使用CUDA图来优化重复执行的操作,显著提高推理性能:

class CUDAGraphRunner(nn.Module):
    """A model wrapper that uses CUDA graphs to accelerate inference."""
    
    def __init__(self, model, attn_backend_name, attn_state, is_enc_dec_model=False):
        super().__init__()
        self.model = model
        self.attn_backend_name = attn_backend_name
        self.attn_state = attn_state
        self.is_enc_dec_model = is_enc_dec_model
        self.graph = None
        
    def capture(self, input_ids, positions, intermediate_inputs, kv_caches, attn_metadata, memory_pool, stream):
        """Capture the model execution as a CUDA graph."""
        # 创建CUDA图并记录模型执行
        self.graph = torch.cuda.CUDAGraph()
        
        # 保存静态输入
        self.static_input_ids = input_ids
        self.static_positions = positions
        self.static_kv_caches = kv_caches
        self.static_attn_metadata = attn_metadata
        
        # 记录CUDA图
        with torch.cuda.graph(self.graph, pool=memory_pool, stream=stream):
            self.static_output = self.model(...)
            
    def forward(self, input_ids, positions, intermediate_inputs, kv_caches, attn_metadata, memory_pool):
        """Run the captured CUDA graph."""
        # 更新静态输入
        self._update_static_inputs(input_ids, positions, kv_caches, attn_metadata)
        
        # 运行CUDA图
        self.graph.replay()
        
        # 返回输出
        return self.static_output

PagedAttention

PagedAttention是vLLM的核心创新,通过分页管理KV缓存,显著减少了内存碎片并提高了GPU内存利用率。vLLM的FlashAttention实现在vllm/attention/backends/flash_attn.py文件中

class FlashAttentionBackend(AttentionBackend):
    """Flash Attention backend implementation."""
    
    accept_output_buffer: bool = True
    
    @staticmethod
    def get_supported_head_sizes() -> List[int]:
        return [32, 64, 96, 128, 160, 192, 224, 256]
    
    @staticmethod
    def get_name() -> str:
        return "FLASH_ATTN"
    
    @staticmethod
    def get_kv_cache_shape(
        num_blocks: int,
        block_size: int,
        num_kv_heads: int,
        head_size: int,
    ) -> Tuple[int, ...]:
        if block_size % 16 != 0:
            raise ValueError("Block size must be a multiple of 16.")
        return (2, num_blocks, block_size, num_kv_heads, head_size)

PagedAttention关键算法实现,PagedAttention的核心是将大块连续的KV缓存分成更小的块(block),并使用块表(block_table)来进行管理:

class FlashAttentionImpl(AttentionImpl):
    """Flash attention implementation."""
    
    def forward(
        self,
        layer: AttentionLayer,
        query: torch.Tensor,
        key: torch.Tensor,
        value: torch.Tensor,
        kv_cache: torch.Tensor,
        attn_metadata: FlashAttentionMetadata,
        output: Optional[torch.Tensor] = None,
    ) -> torch.Tensor:
        """Compute attention with flash attention."""
        # 获取注意力类型
        attn_type = layer.attn_type
        
        # 获取查询和键的序列元数据
        (query_lens, key_lens, max_query_len, max_key_len,
         query_start_loc, key_start_loc, causal) = _get_query_key_seq_metadata(
            attn_metadata, query.shape[0] > 1, attn_type)
        
        # 获取头部维度
        num_heads = self.num_heads
        head_size = self.head_size
        
        # 获取slot映射
        slot_mapping = attn_metadata.slot_mapping
        
        # 执行PagedAttention操作
        if slot_mapping is not None and not is_block_tables_empty(attn_metadata.block_tables):
            # 使用Paged Attention
            output = flash_attn_with_kvcache(
                query, key, value, kv_cache, attn_metadata.block_tables,
                slot_mapping, self.alibi_slopes, layer.scale, causal, max_query_len, 
                layer.rotary_emb_dim, layer.rotary_emb_scale_base, 
                layer.rope_theta, layer.level_param, ...)
        else:
            # 使用普通的Flash Attention
            output = flash_attn_varlen_func(
                query, key, value, query_start_loc, key_start_loc, 
                max_query_len, max_key_len, causal, ...)
                
        return output

多GPU分布式推理

vLLM支持多种并行策略,包括张量并行和流水线并行,使其能够高效地利用多GPU资源。

分布式环境初始化

def init_distributed_environment(
    vllm_config: VllmConfig,
    local_rank: Optional[int] = None,
    rank: Optional[int] = None,
    world_size: Optional[int] = None,
) -> Tuple[int, int, int]:
    """Initialize the distributed environment."""
    # 设置环境变量
    parallel_config = vllm_config.parallel_config
    tensor_parallel_size = parallel_config.tensor_parallel_size
    pipeline_parallel_size = parallel_config.pipeline_parallel_size
    
    # 初始化分布式环境
    if local_rank is None:
        local_rank = int(os.getenv("LOCAL_RANK", "0"))
    if rank is None:
        rank = int(os.getenv("RANK", "0"))
    if world_size is None:
        world_size = int(os.getenv("WORLD_SIZE", "1"))
    
    # 初始化进程组
    if world_size > 1:
        # 初始化底层通信库
        if not torch.distributed.is_initialized():
            torch.distributed.init_process_group(backend="nccl")
        
        # 确保张量并行和流水线并行组被初始化
        if tensor_parallel_size > 1 or pipeline_parallel_size > 1:
            ensure_model_parallel_initialized(
                tensor_parallel_size, pipeline_parallel_size)
    
    return local_rank, rank, world_size

张量并行实现,LLM使用PyTorch的分布式原语来实现张量并行

def initialize_model_parallel(
    tensor_parallel_size: int = 1,
    pipeline_parallel_size: int = 1,
) -> None:
    """Initialize model parallel groups."""
    # 确保分布式环境已初始化
    if not torch.distributed.is_initialized():
        return
    
    # 获取world size
    world_size = torch.distributed.get_world_size()
    if world_size == 1:
        return
    
    # 验证并行配置
    if tensor_parallel_size * pipeline_parallel_size > world_size:
        raise ValueError(...)
    
    # 初始化型号并行组
    ranks = list(range(world_size))
    tp_groups = []
    pp_groups = []
    
    # 创建张量并行和流水线并行组
    for i in range(pipeline_parallel_size):
        start_rank = i * tensor_parallel_size
        end_rank = (i + 1) * tensor_parallel_size
        tp_group = ranks[start_rank:end_rank]
        tp_groups.append(tp_group)
    
    for i in range(tensor_parallel_size):
        pp_group = ranks[i::tensor_parallel_size]
        pp_groups.append(pp_group)
    
    # 注册并行组
    _set_tensor_parallel_group(tp_groups)
    _set_pipeline_parallel_group(pp_groups)

LoRA和Prompt Adapter支持

vLLM支持多种高级功能,如LoRA(低秩适配)和Prompt Adapter,使其能够高效地应用参数高效微调(PEFT)技术。

LoRA支持实现

class LRUCacheWorkerLoRAManager:
    """LRU cache for managing LoRA adapters in the worker."""
    
    def __init__(self, max_loras: int):
        self.max_loras = max_loras
        self.lora_id_to_lora = {}  # 已加载的LoRA
        self.lora_usage = deque()  # 使用顺序(LRU queue)
        
    def add_lora(self, lora_request: LoRARequest, lora: Any) -> None:
        """Add a LoRA to the cache."""
        # 如果缓存已满,移除最少使用的LoRA
        if len(self.lora_id_to_lora) >= self.max_loras:
            self._evict_lru_lora()
        
        # 添加新LoRA
        self.lora_id_to_lora[lora_request.lora_int_id] = lora
        self.lora_usage.append(lora_request.lora_int_id)
        
    def _evict_lru_lora(self) -> None:
        """Evict the least recently used LoRA."""
        lora_to_remove = self.lora_usage.popleft()
        del self.lora_id_to_lora[lora_to_remove]

vLLM工作流程

通过一个具体的例子来详细说明vLLM的工作流程

  1. 添加请求:客户端通过add_request将请求添加到LLM Engine。
   engine.add_request(
       request_id="123",
       prompt="Tell me a joke",
       sampling_params=SamplingParams(temperature=0.7, max_tokens=100)
   )
  1. 调度器处理:调度器将请求添加到等待队列
   # 在LLM Engine内部调用
   scheduler.add_seq_group(seq_group)
  1. 执行步骤:调用step方法执行一步推理

    outputs = engine.step()
    

    a. 调度器选择要执行的序列组:

    seq_group_metadata_list, scheduler_outputs, allow_async_output_proc = scheduler.schedule()
    

    b. Model Executor准备模型请求:

    execute_model_req = ExecuteModelRequest(
          seq_group_metadata_list=seq_group_metadata_list,
          blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
          blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
          blocks_to_copy=scheduler_outputs.blocks_to_copy,
          ...
      )
    

    c. Worker执行模型:

    # 在Model Executor内部调用
    worker.execute_model(execute_model_req)
    

    d. Model Runner准备输入并执行模型:

    # 在Worker内部调用
    model_runner.execute(model_inputs)
    

    e. 使用PagedAttention计算注意力:

    # 在模型内部调用
    flash_attn_with_kvcache(q, k, v, kv_cache, block_tables, slot_mapping, ...)
    

    f. 处理模型输出:

    # 在LLM Engine内部调用
    engine._process_model_outputs(outputs, seq_group_metadata_list, scheduler_outputs)
    
  2. 重复执行步骤:重复执行step方法直到所有请求完成。

总结

vLLM通过精心设计的架构和创新的优化技术,实现了高性能的LLM推理。从代码分析可以看出,其关键优势在于:

  • PagedAttention:通过分页管理KV缓存,显著提高内存利用率
  • 迭代级调度:允许动态调度序列,提高GPU利用率
  • CUDA图优化:减少CPU开销,提高推理速度
  • 连续批处理:支持请求动态加入和离开,提高吞吐量
  • 分布式推理:支持张量并行和流水线并行,扩展到多GPU

参考链接

「真诚赞赏,手留余香」

爱折腾的工程师

真诚赞赏,手留余香

使用微信扫描二维码完成支付