1. vLLM是什么
vLLM是由伯克利大学LMSYS组织开源的大语言模型高速推理框架,旨在极大提升实时场景下语言模型服务的吞吐量和内存使用效率。vLLM采用了PagedAttention算法,通过引入操作系统中的虚拟内存和分页管理思想,有效管理注意力机制中的键值(KV cache),显著提高了显存利用率,减少了显存碎片。支持分布式推理,能够在多台GPU上并行运行模型。
2. 源码编译安装
以v0.7.2 tag的代码为例
(vllm) root@vgpu:/data# git clone https://github.com/vllm-project/vllm.git
(vllm) root@vgpu:/data# git checkout v0.7.3
(vllm) root@vgpu:/data# cd vllm
# export MAX_JOBS=6 # 编译并发数设置,默认值是系统CPU核数
(vllm) root@vgpu:/data/vllm# VLLM_USE_PRECOMPILED=1 pip install -e . # 只修改了Python代码,不需要重新编译
(vllm) root@vgpu:~# python
Python 3.12.9 | packaged by Anaconda, Inc. | (main, Feb 6 2025, 18:56:27) [GCC 11.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import vllm
>>> print(vllm.__file__)
/data/vllm/vllm/__init__.py # 从这个输出可以看到vllm的源码是在python包中,所以可以直接修改源码,不需要重新编译
如果要完整构建,包含编译的话
pip install -e .
vllm项目涉及多种开发语言,其中85%是python、10%是cuda、3.3%是C++,剩下是shell、cmake、dockerfile脚本相关.涉及到静态编译语言的修改,通过conda install ccache
或apt install ccache
来安装ccache可以加速编译。除了ccache外,sccache也可以实现类似功能,功能甚至更强大,还支持分布式编译。
3. 源码分析
以v0.7.3 tag的代码为例,使用vscode Remote SSH方式进行源码分析
vllm serve /data/Qwen2.5-0.5B/ --served-model-name qwen
要用到vscode IDE来调试代码,所以必须vscode来启动vllm apiserver服务,上述命令行启动方式转成vscode launch.json方式
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "vllm serve",
"type": "debugpy",
"request": "launch",
// "program": "/data/miniconda3/envs/vllm/bin/vllm",
"module": "vllm.entrypoints.cli.main",
"justMyCode": false,
"args": [
"serve",
"/data/Qwen1.5-0.5B-Chat/",
"--served-model-name",
"qwen"
],
"console": "integratedTerminal"
},
]
}
启动完vllm-apiserver后,查看显存使用情况
(vllm) root@vgpu:/data/vllm# nvidia-smi
Wed Feb 26 07:20:12 2025
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.135 Driver Version: 550.135 CUDA Version: 12.4 |
|-----------------------------------------+------------------------+----------------------+
| GPU Name Persistence-M | Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap | Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|=========================================+========================+======================|
| 0 NVIDIA GeForce RTX 3090 Off | 00000000:00:05.0 Off | N/A |
| 36% 40C P8 8W / 370W | 20854MiB / 24576MiB | 0% Default |
| | | N/A |
+-----------------------------------------+------------------------+----------------------+
+-----------------------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=========================================================================================|
| 0 N/A N/A 10257 C /data/miniconda3/envs/vllm/bin/python 20836MiB |
+-----------------------------------------------------------------------------------------+
待vllm apiserver完全启动后,可以通过curl命令行来测试
# model就是前面启动服务时的--served-model-name参数
(vllm) root@vgpu:/data/vllm# curl http://localhost:8000/v1/chat/completions -H "Content-Type: application/json" -d '{
"model": "qwen",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "你是谁?"}
]
}'
{"id":"chatcmpl-44c835030dcb4aaba493b051a716d269","object":"chat.completion","created":1740560485,"model":"qwen","choices":[{"index":0,"message":{"role":"assistant","reasoning_content":null,"content":"我是来自阿里云的超大规模语言模型。我叫通义千问,是阿里云自主研发的预训练语言模型。我与人类科学家们一同研究,努力达到最高的准确率和性能。","tool_calls":[]},"logprobs":null,"finish_reason":"stop","stop_reason":null}],"usage":{"prompt_tokens":22,"total_tokens":66,"completion_tokens":44,"prompt_tokens_details":null},"prompt_logprobs":null}
代码结构
vllm/
├── vllm/ # 主要源代码目录
│ ├── core/ # 核心功能实现
│ ├── engine/ # 推理引擎
│ ├── model_executor/ # 模型执行器
│ ├── executor/ # 执行器实现
│ ├── attention/ # 注意力机制实现
│ ├── distributed/ # 分布式计算支持
│ ├── worker/ # 工作进程实现
│ ├── device_allocator/ # 设备内存分配
│ ├── vllm_flash_attn/ # Flash Attention 优化
│ ├── triton_utils/ # Triton 相关工具
│ ├── transformers_utils/ # HuggingFace 工具
│ ├── lora/ # LoRA 适配器
│ ├── multimodal/ # 多模态支持
│ ├── prompt_adapter/ # 提示词适配器
│ ├── spec_decode/ # 特殊解码实现
│ ├── platforms/ # 平台特定实现
│ ├── plugins/ # 插件系统
│ ├── adapter_commons/ # 适配器公共组件
│ ├── inputs/ # 输入处理
│ ├── compilation/ # 编译相关
│ ├── logging_utils/ # 日志工具
│ ├── profiler/ # 性能分析工具
│ ├── usage/ # 使用统计
│ ├── entrypoints/ # 入口点定义
│ ├── third_party/ # 第三方代码
│ └── v1/ # v1 版本代码
│
├── csrc/ # C++源代码目录
│ ├── core/ # C++核心实现
│ ├── attention/ # 注意力机制CUDA实现
│ ├── mamba/ # Mamba架构支持
│ ├── quantization/ # 量化实现
│ ├── sparse/ # 稀疏计算支持
│ ├── moe/ # Mixture of Experts
│ ├── cutlass_extensions/ # CUTLASS库扩展
│ ├── prepare_inputs/ # 输入预处理
│ ├── cpu/ # CPU实现
│ └── rocm/ # AMD ROCm支持
│
├── examples/ # 示例代码目录
│ ├── online_serving/ # 在线服务示例
│ ├── offline_inference/ # 离线推理示例
│ ├── ice_test/ # ICE测试
│ └── other/ # 其他示例
│ └── *.jinja # 各种模型的对话模板
│
├── tests/ # 测试代码目录
│ ├── basic_correctness/ # 基础正确性测试
│ ├── kernels/ # 内核测试
│ ├── model_executor/ # 模型执行器测试
│ ├── multimodal/ # 多模态测试
│ ├── distributed/ # 分布式测试
│ ├── quantization/ # 量化测试
│ ├── lora/ # LoRA测试
│ ├── tool_use/ # 工具使用测试
│ ├── spec_decode/ # 特殊解码测试
│ ├── prompt_adapter/ # 提示词适配器测试
│ ├── prefix_caching/ # 前缀缓存测试
│ ├── mq_llm_engine/ # MQ LLM引擎测试
│ ├── neuron/ # AWS Neuron测试
│ ├── tpu/ # TPU支持测试
│ ├── worker/ # 工作进程测试
│ └── vllm_test_utils/ # 测试工具
│
├── benchmarks/ # 性能基准测试
│
├── tools/ # 工具脚本
│
├── cmake/ # CMake构建配置
│
├── requirements/ # 依赖配置文件
│ ├── requirements-common.txt # 通用依赖
│ ├── requirements-cuda.txt # CUDA依赖
│ ├── requirements-cpu.txt # CPU依赖
│ ├── requirements-rocm.txt # ROCm依赖
│ ├── requirements-tpu.txt # TPU依赖
│ ├── requirements-neuron.txt # AWS Neuron依赖
│ ├── requirements-hpu.txt # HPU依赖
│ ├── requirements-xpu.txt # Intel XPU依赖
│ ├── requirements-openvino.txt # OpenVINO依赖
│ └── requirements-test.txt # 测试依赖
vLLM启动流程
- 入口点分析:
vLLM的主要入口在
vllm/entrypoints/cli/main.py
中,它定义了两个主要的命令模块:
vllm.entrypoints.cli.openai
- OpenAI兼容的API命令vllm.entrypoints.cli.serve
- 服务器启动命令
- serve命令的实现:
serve命令的主要实现在
vllm/entrypoints/cli/serve.py
中的ServeSubcommand类:
class ServeSubcommand(CLISubcommand):
def cmd(args: argparse.Namespace) -> None:
# 将model_tag参数赋值给args.model
args.model = args.model_tag
# 使用uvloop运行服务器
uvloop.run(run_server(args))
- 服务器启动流程:
服务器的启动流程在
vllm/entrypoints/openai/api_server.py
中的run_server函数:
async def run_server(args, **uvicorn_kwargs) -> None:
# 1. 初始化engine client,建立zmp server
async with build_async_engine_client(args) as engine_client:
# 2. 构建FastAPI应用
app = build_app(args)
# 3. 获取模型配置
model_config = await engine_client.get_model_config()
# 4. 初始化应用状态
await init_app_state(engine_client, model_config, app.state, args)
# 5. 启动HTTP服务器
shutdown_task = await serve_http(
app,
host=args.host,
port=args.port,
...
)
-
应用状态初始化:
在
init_app_state
函数中,会初始化各种服务组件:- OpenAIServingModels - 管理模型加载
- OpenAIServingChat - 处理聊天完成请求
- OpenAIServingCompletion - 处理文本完成请求
- OpenAIServingPooling - 处理池化请求
- OpenAIServingEmbedding - 处理嵌入请求
- OpenAIServingScores - 处理评分请求
- OpenAIServingTokenization - 处理分词请求
-
主要API接口:
vLLM提供了多个OpenAI兼容的API端点:
- /v1/completions - 文本生成API
- /v1/chat/completions - 聊天完成API
- /v1/embeddings - 文本嵌入API
- /v1/audio/transcriptions - 语音转写API
- /tokenize, /detokenize - 分词API
总结一下vLLM的启动流程:
命令行解析(解析serve命令及其参数) -> 引擎初始化(初始化LLM引擎和相关组件) -> 应用构建(构建FastAPI应用并注册路由) -> 状态初始化(初始化各种服务组件) -> 服务启动(启动HTTP服务器处理请求)
vLLM整体结构
以下展示了vLLM的类层次结构:
+-------------------------+
| LLM Engine(vllm_config)|
+-------------------------+
|
v
+-------------------------+
| Executor(vllm_config) |
+-------------------------+
|
+-----------------------------+
| |
Rank 0 v ... Rank N-1 v
+-------------------------+ +-------------------------+
| Worker(vllm_config) | | Worker(vllm_config) |
+-------------------------+ +-------------------------+
| |
v v
+-------------------------+ +-------------------------+
| Model Runner(vllm_config)| | Model Runner(vllm_config)|
+-------------------------+ +-------------------------+
| |
v v
+-------------------------+ +-------------------------+
| Model(vllm_config,prefix)| | Model(vllm_config,prefix)|
+-------------------------+ +-------------------------+
这个展示了vLLM的主要组件架构:
- 最顶层是LLM Engine
- 然后是Executor
- 接着分成多个Worker (从Rank 0到Rank N-1)
- 每个Worker下面都有Model Runner
- 最底层是Model实例
LLM Engine
LLMEngine
和AsyncLLMEngine
类是vLLM系统运行的核心,负责处理模型推理和异步请求处理。
LLM Engine
+-------------+ +--------------------+
| LLM class | | OpenAI-compatible |
+-------------+ | API Server |
| +--------------------+
v |
+------------------+ ← +------------------+
| LLMEngine | <--------------------------- | AsyncLLMEngine |
+------------------+ +------------------+
|
|
+------+------------+------------+------------+
| | | |
v v v v
+----------+ +-----------+ +-----------+ +-----------+
| Input | | | | Model | | Output |
|Processing| |Scheduling | |Execution | |Processing |
+----------+ +-----------+ +-----------+ +-----------+
LLMEngine
包括Input Processing、Scheduling、Model Execution和Output Processing.
- Input Processing:使用指定的分词器处理输入文本的分词
- Scheduling:选择在每个步骤中处理哪些请求
- Model Execution:管理大语言模型的执行,包括在多个GPU上的分布式执行
- Output Processing:对模型生成的输出进行处理,将大语言模型中的令牌ID解码为人类可读的文本
核心数据结构:
class LLMEngine:
"""An LLM engine that receives requests and generates texts.
This is the main class for the vLLM engine. It receives requests
from clients and generates texts from the LLM. It includes a tokenizer, a
language model (possibly distributed across multiple GPUs), and GPU memory
space allocated for intermediate states (aka KV cache). This class utilizes
iteration-level scheduling and efficient memory management to maximize the
serving throughput.
"""
LLM Engine在初始化时创建多个关键组件:
- Model Executor:负责执行模型的前向传播
- Scheduler:负责调度序列组和管理资源
- Cache Engine:管理KV缓存的分配和释放
LLM Engine关键方法分析
add_request方法
add_request方法是添加新推理请求的入口点:
def add_request(
self,
request_id: str,
prompt: str,
sampling_params: SamplingParams,
prompt_token_ids: Optional[List[int]] = None,
arrival_time: Optional[float] = None,
lora_request: Optional[LoRARequest] = None,
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
pooling_params: Optional[PoolingParams] = None,
encoder_input: Optional[str] = None,
encoder_input_token_ids: Optional[List[int]] = None,
priority: int = 0,
) -> None:
"""Add a request to the engine's request pool.
Args:
request_id: The unique ID of the request.
prompt: The prompt string.
sampling_params: The sampling parameters for text generation.
prompt_token_ids: The token IDs of the prompt. If None, we
use the tokenizer to convert the prompt to token IDs.
arrival_time: The arrival time of the request. If None, we use
the current monotonic time.
lora_request: The LoRA request. If None, we don't use LoRA.
trace_headers: HTTP tracing headers. If None, we don't use tracing.
prompt_adapter_request: The prompt adapter request. If None,
we don't use prompt adapter.
pooling_params: The pooling parameters for the request. If
provided with a valid pooling parameter, the request will
not return text and will instead return pooled values.
encoder_input: The encoder input. Only valid for encoder-decoder
models.
encoder_input_token_ids: The token IDs of the encoder input.
If None, we use the tokenizer to convert the encoder input
to token IDs. Only valid for encoder-decoder models.
priority: The priority of the request. Lower number means
higher priority.
"""
该方法的主要步骤包括:
- 检查请求参数的有效性
- 构造Sequence对象
- 创建SequenceGroup对象
- 通过调度器将请求添加到等待队列中
step方法
step方法是推理循环的核心,负责执行一步推理:
def step(self) -> List[Union[RequestOutput, PoolingRequestOutput]]:
"""Performs one decoding iteration and returns newly generated results.
.. figure:: https://i.imgur.com/sv2HssD.png
:alt: Overview of the step function
:align: center
Overview of the step function.
"""
该方法的主要步骤包括:
- 调用调度器的schedule方法,获取要执行的序列组和资源分配信息
- 通过Model Executor执行模型推理
- 处理模型输出并更新序列状态
- 生成并返回结果
# 关键步骤1: 调度序列组
(seq_group_metadata_list, scheduler_outputs, allow_async_output_proc) = self.scheduler[virtual_engine].schedule()
# 关键步骤2: 执行模型
execute_model_req = ExecuteModelRequest(
seq_group_metadata_list=seq_group_metadata_list,
blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
blocks_to_copy=scheduler_outputs.blocks_to_copy,
num_lookahead_slots=scheduler_outputs.num_lookahead_slots,
running_queue_size=scheduler_outputs.running_queue_size,
finished_requests_ids=finished_requests_ids,
last_sampled_token_ids=last_sampled_token_ids)
outputs = self.model_executor.execute_model(execute_model_req=execute_model_req)
# 关键步骤3: 处理输出
self._process_model_outputs(ctx=ctx)
Scheduler
Scheduler是vLLM中负责调度序列组和管理资源的组件。它实现了迭代级调度,这是vLLM高性能的关键技术之一。调度器维护了三个主要队列:
# 调度器初始化
def __init__(self, ...):
# 正在运行的序列组
self.running: Deque[SequenceGroup] = deque()
# 在CPU中交换的序列组
self.swapped: Deque[SequenceGroup] = deque()
# 等待执行的序列组
self.waiting: Deque[SequenceGroup] = deque()
调度策略实现细节,调度器实现了多种调度策略,其中最核心的是schedule方法:
def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs, bool]:
"""Schedule sequence groups for execution.
Returns a list of sequence group metadata, scheduling decisions,
and whether to allow async output processing.
"""
# 调用_schedule方法获取调度决策
scheduler_outputs = self._schedule()
# 构建元数据列表
seq_group_metadata_list = []
for scheduled_seq_group in scheduler_outputs.scheduled_seq_groups:
seq_group = scheduled_seq_group.seq_group
# ...构建元数据...
seq_group_metadata_list.append(seq_group_metadata)
# 决定是否允许异步输出处理
allow_async_output_proc = self._prepare_allow_async_output_proc(...)
return seq_group_metadata_list, scheduler_outputs, allow_async_output_proc
调度器在每次迭代中使用以下策略决定要执行的序列:
- 优先调度已在运行的序列(继续生成)
- 其次调度被换出的序列(从CPU内存恢复)
- 最后调度等待队列中的序列(开始新的推理)
分块预填充实现,vLLM支持分块预填充(Chunked Prefill),允许长序列分多次完成预填充:
def _schedule_chunked_prefill(self) -> SchedulerOutputs:
"""Schedule with chunked prefill enabled."""
# 创建调度预算
budget = self._create_scheduling_budget(...)
# 收集当前LoRA ID
curr_loras = self._collect_curr_loras()
# 计算部分预填充元数据
partial_prefill_metadata = PartialPrefillMetadata.from_queues(
self.running, self.waiting, self.scheduler_config)
# 调度正在运行的序列
running_outputs = self._schedule_running(
budget, curr_loras, enable_chunking=True,
partial_prefill_metadata=partial_prefill_metadata)
# 调度已交换的序列
swapped_outputs = self._schedule_swapped(
budget, curr_loras, enable_chunking=True)
# 调度等待的序列
prefill_outputs = self._schedule_prefills(
budget, curr_loras, enable_chunking=True,
partial_prefill_metadata=partial_prefill_metadata)
# ...合并调度结果...
Worker
Worker是实际执行模型计算的组件,通常与一个GPU关联,负责维护KV缓存和执行模型前向传播。
Worker类继承结构
class WorkerBase:
"""Base class for worker implementations."""
class LocalOrDistributedWorkerBase(WorkerBase):
"""Base class for workers that can run locally or in distributed mode."""
class Worker(LocalOrDistributedWorkerBase):
"""A worker class that executes (a partition of) the model on a GPU."""
# 其他特定平台的Worker实现
class CPUWorker(WorkerBase): ...
class TPUWorker(WorkerBase): ...
class HPUWorker(WorkerBase): ...
class XPUWorker(WorkerBase): ...
class OpenVINOWorker(WorkerBase): ...
Worker核心方法分析
class Worker(LocalOrDistributedWorkerBase):
"""A worker class that executes (a partition of) the model on a GPU.
Each worker is associated with a single GPU. The worker is responsible for
maintaining the KV cache and executing the model on the GPU. In case of
distributed inference, each worker is assigned a partition of the model.
"""
def __init__(self, vllm_config: VllmConfig, local_rank: int, rank: int, ...):
# 初始化Worker,设置设备、加载模型等
def execute_model(self, execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
"""Execute the model with the given input sequences."""
# 1. 处理输入请求
# 2. 进行块交换(swap in/out)操作
# 3. 调用model_runner执行模型
# 4. 返回采样输出
execute_model方法的核心实现:
def execute_model(self, execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
# 在执行模型前进行内存管理
self._process_blocks_to_swap_in(execute_model_req.blocks_to_swap_in)
self._process_blocks_to_swap_out(execute_model_req.blocks_to_swap_out)
self._process_blocks_to_copy(execute_model_req.blocks_to_copy)
# 准备执行请求
model_inputs = self._prepare_model_inputs(
execute_model_req.seq_group_metadata_list)
# 利用model_runner执行模型
if execute_model_req.last_sampled_token_ids:
sampled_token_ids = execute_model_req.last_sampled_token_ids
else:
# 获取model_runner
model_runner = self._get_model_runner()
# 执行模型并获取输出
outputs = model_runner.execute(model_inputs)
return outputs
Model Runner
Model Runner负责管理模型执行的具体细节,包括准备输入张量、执行模型前向传播,以及处理输出。
Model Runner类继承结构
class ModelRunnerBase:
"""Base class for model runner implementations."""
class GPUModelRunnerBase(ModelRunnerBase):
"""Helper class for shared methods between GPU model runners."""
class ModelRunner(GPUModelRunnerBase):
"""GPU model runner with sampling step."""
# 其他特定类型的Model Runner
class MultiStepModelRunner(GPUModelRunnerBase): ...
class EncoderDecoderModelRunner(GPUModelRunnerBase): ...
class PoolingModelRunner(GPUModelRunnerBase): ...
class CPUModelRunner(ModelRunnerBase): ...
class NeuronModelRunner(ModelRunnerBase): ...
class TPUModelRunner(ModelRunnerBase): ...
Model Runner执行模型的核心实现
def execute(self, model_input: ModelInputForGPUWithSamplingMetadata) -> List[SamplerOutput]:
"""Execute model on the given input.
Args:
model_input: The input to the model, including tokens, positions,
attention metadata, etc.
Returns:
The output from the model's sampling step.
"""
# 获取批次大小和虚拟引擎索引
batch_size = model_input.batch_size
virtual_engine = model_input.virtual_engine
# 获取模型输入
input_tokens = model_input.input_tokens
input_positions = model_input.input_positions
# 获取注意力元数据
attn_metadata = model_input.attn_metadata
seq_lens = attn_metadata.seq_lens_tensor
# 使用CUDA图捕获执行
if use_cuda_graph:
# 获取或创建CUDA图运行器
graph_runner = self._get_graph_runner(...)
# 使用CUDA图执行模型
logits = graph_runner(
input_tokens, input_positions, intermediate_inputs,
self.kv_caches[virtual_engine], attn_metadata, self.graph_memory_pool)
else:
# 直接执行模型
with set_forward_context(attn_metadata, self.vllm_config, virtual_engine):
logits = self.model(...model_inputs...)
# 执行采样
sampler_output = self._run_sampling(
logits, sampling_metadata, sampling_metadata_cache)
return sampler_output
CUDA图优化,vLLM使用CUDA图来优化重复执行的操作,显著提高推理性能:
class CUDAGraphRunner(nn.Module):
"""A model wrapper that uses CUDA graphs to accelerate inference."""
def __init__(self, model, attn_backend_name, attn_state, is_enc_dec_model=False):
super().__init__()
self.model = model
self.attn_backend_name = attn_backend_name
self.attn_state = attn_state
self.is_enc_dec_model = is_enc_dec_model
self.graph = None
def capture(self, input_ids, positions, intermediate_inputs, kv_caches, attn_metadata, memory_pool, stream):
"""Capture the model execution as a CUDA graph."""
# 创建CUDA图并记录模型执行
self.graph = torch.cuda.CUDAGraph()
# 保存静态输入
self.static_input_ids = input_ids
self.static_positions = positions
self.static_kv_caches = kv_caches
self.static_attn_metadata = attn_metadata
# 记录CUDA图
with torch.cuda.graph(self.graph, pool=memory_pool, stream=stream):
self.static_output = self.model(...)
def forward(self, input_ids, positions, intermediate_inputs, kv_caches, attn_metadata, memory_pool):
"""Run the captured CUDA graph."""
# 更新静态输入
self._update_static_inputs(input_ids, positions, kv_caches, attn_metadata)
# 运行CUDA图
self.graph.replay()
# 返回输出
return self.static_output
PagedAttention
PagedAttention是vLLM的核心创新,通过分页管理KV缓存,显著减少了内存碎片并提高了GPU内存利用率。vLLM的FlashAttention实现在vllm/attention/backends/flash_attn.py文件中
class FlashAttentionBackend(AttentionBackend):
"""Flash Attention backend implementation."""
accept_output_buffer: bool = True
@staticmethod
def get_supported_head_sizes() -> List[int]:
return [32, 64, 96, 128, 160, 192, 224, 256]
@staticmethod
def get_name() -> str:
return "FLASH_ATTN"
@staticmethod
def get_kv_cache_shape(
num_blocks: int,
block_size: int,
num_kv_heads: int,
head_size: int,
) -> Tuple[int, ...]:
if block_size % 16 != 0:
raise ValueError("Block size must be a multiple of 16.")
return (2, num_blocks, block_size, num_kv_heads, head_size)
PagedAttention关键算法实现,PagedAttention的核心是将大块连续的KV缓存分成更小的块(block),并使用块表(block_table)来进行管理:
class FlashAttentionImpl(AttentionImpl):
"""Flash attention implementation."""
def forward(
self,
layer: AttentionLayer,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
kv_cache: torch.Tensor,
attn_metadata: FlashAttentionMetadata,
output: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""Compute attention with flash attention."""
# 获取注意力类型
attn_type = layer.attn_type
# 获取查询和键的序列元数据
(query_lens, key_lens, max_query_len, max_key_len,
query_start_loc, key_start_loc, causal) = _get_query_key_seq_metadata(
attn_metadata, query.shape[0] > 1, attn_type)
# 获取头部维度
num_heads = self.num_heads
head_size = self.head_size
# 获取slot映射
slot_mapping = attn_metadata.slot_mapping
# 执行PagedAttention操作
if slot_mapping is not None and not is_block_tables_empty(attn_metadata.block_tables):
# 使用Paged Attention
output = flash_attn_with_kvcache(
query, key, value, kv_cache, attn_metadata.block_tables,
slot_mapping, self.alibi_slopes, layer.scale, causal, max_query_len,
layer.rotary_emb_dim, layer.rotary_emb_scale_base,
layer.rope_theta, layer.level_param, ...)
else:
# 使用普通的Flash Attention
output = flash_attn_varlen_func(
query, key, value, query_start_loc, key_start_loc,
max_query_len, max_key_len, causal, ...)
return output
多GPU分布式推理
vLLM支持多种并行策略,包括张量并行和流水线并行,使其能够高效地利用多GPU资源。
分布式环境初始化
def init_distributed_environment(
vllm_config: VllmConfig,
local_rank: Optional[int] = None,
rank: Optional[int] = None,
world_size: Optional[int] = None,
) -> Tuple[int, int, int]:
"""Initialize the distributed environment."""
# 设置环境变量
parallel_config = vllm_config.parallel_config
tensor_parallel_size = parallel_config.tensor_parallel_size
pipeline_parallel_size = parallel_config.pipeline_parallel_size
# 初始化分布式环境
if local_rank is None:
local_rank = int(os.getenv("LOCAL_RANK", "0"))
if rank is None:
rank = int(os.getenv("RANK", "0"))
if world_size is None:
world_size = int(os.getenv("WORLD_SIZE", "1"))
# 初始化进程组
if world_size > 1:
# 初始化底层通信库
if not torch.distributed.is_initialized():
torch.distributed.init_process_group(backend="nccl")
# 确保张量并行和流水线并行组被初始化
if tensor_parallel_size > 1 or pipeline_parallel_size > 1:
ensure_model_parallel_initialized(
tensor_parallel_size, pipeline_parallel_size)
return local_rank, rank, world_size
张量并行实现,LLM使用PyTorch的分布式原语来实现张量并行
def initialize_model_parallel(
tensor_parallel_size: int = 1,
pipeline_parallel_size: int = 1,
) -> None:
"""Initialize model parallel groups."""
# 确保分布式环境已初始化
if not torch.distributed.is_initialized():
return
# 获取world size
world_size = torch.distributed.get_world_size()
if world_size == 1:
return
# 验证并行配置
if tensor_parallel_size * pipeline_parallel_size > world_size:
raise ValueError(...)
# 初始化型号并行组
ranks = list(range(world_size))
tp_groups = []
pp_groups = []
# 创建张量并行和流水线并行组
for i in range(pipeline_parallel_size):
start_rank = i * tensor_parallel_size
end_rank = (i + 1) * tensor_parallel_size
tp_group = ranks[start_rank:end_rank]
tp_groups.append(tp_group)
for i in range(tensor_parallel_size):
pp_group = ranks[i::tensor_parallel_size]
pp_groups.append(pp_group)
# 注册并行组
_set_tensor_parallel_group(tp_groups)
_set_pipeline_parallel_group(pp_groups)
LoRA和Prompt Adapter支持
vLLM支持多种高级功能,如LoRA(低秩适配)和Prompt Adapter,使其能够高效地应用参数高效微调(PEFT)技术。
LoRA支持实现
class LRUCacheWorkerLoRAManager:
"""LRU cache for managing LoRA adapters in the worker."""
def __init__(self, max_loras: int):
self.max_loras = max_loras
self.lora_id_to_lora = {} # 已加载的LoRA
self.lora_usage = deque() # 使用顺序(LRU queue)
def add_lora(self, lora_request: LoRARequest, lora: Any) -> None:
"""Add a LoRA to the cache."""
# 如果缓存已满,移除最少使用的LoRA
if len(self.lora_id_to_lora) >= self.max_loras:
self._evict_lru_lora()
# 添加新LoRA
self.lora_id_to_lora[lora_request.lora_int_id] = lora
self.lora_usage.append(lora_request.lora_int_id)
def _evict_lru_lora(self) -> None:
"""Evict the least recently used LoRA."""
lora_to_remove = self.lora_usage.popleft()
del self.lora_id_to_lora[lora_to_remove]
vLLM工作流程
通过一个具体的例子来详细说明vLLM的工作流程
- 添加请求:客户端通过add_request将请求添加到LLM Engine。
engine.add_request(
request_id="123",
prompt="Tell me a joke",
sampling_params=SamplingParams(temperature=0.7, max_tokens=100)
)
- 调度器处理:调度器将请求添加到等待队列
# 在LLM Engine内部调用
scheduler.add_seq_group(seq_group)
-
执行步骤:调用step方法执行一步推理
outputs = engine.step()
a. 调度器选择要执行的序列组:
seq_group_metadata_list, scheduler_outputs, allow_async_output_proc = scheduler.schedule()
b. Model Executor准备模型请求:
execute_model_req = ExecuteModelRequest( seq_group_metadata_list=seq_group_metadata_list, blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in, blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out, blocks_to_copy=scheduler_outputs.blocks_to_copy, ... )
c. Worker执行模型:
# 在Model Executor内部调用 worker.execute_model(execute_model_req)
d. Model Runner准备输入并执行模型:
# 在Worker内部调用 model_runner.execute(model_inputs)
e. 使用PagedAttention计算注意力:
# 在模型内部调用 flash_attn_with_kvcache(q, k, v, kv_cache, block_tables, slot_mapping, ...)
f. 处理模型输出:
# 在LLM Engine内部调用 engine._process_model_outputs(outputs, seq_group_metadata_list, scheduler_outputs)
-
重复执行步骤:重复执行step方法直到所有请求完成。
总结
vLLM通过精心设计的架构和创新的优化技术,实现了高性能的LLM推理。从代码分析可以看出,其关键优势在于:
- PagedAttention:通过分页管理KV缓存,显著提高内存利用率
- 迭代级调度:允许动态调度序列,提高GPU利用率
- CUDA图优化:减少CPU开销,提高推理速度
- 连续批处理:支持请求动态加入和离开,提高吞吐量
- 分布式推理:支持张量并行和流水线并行,扩展到多GPU
参考链接
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付
