训练任务调度
分布式训练策略
数据并行(Data Parallelism)
最常用的并行策略,每个 GPU 持有完整模型副本,处理不同数据:
数据并行示意(4 GPU):
Batch(1024 样本)
├── GPU0:样本 0-255 → 梯度0
├── GPU1:样本 256-511 → 梯度1
├── GPU2:样本 512-767 → 梯度2
└── GPU3:样本 768-1023 → 梯度3
↓
AllReduce(梯度求和/平均)
↓
各 GPU 用相同梯度更新模型
适用:模型能放入单 GPU 显存模型并行(Model Parallelism)
当模型太大无法放入单 GPU 时,将模型切分到多个 GPU:
python
# 张量并行(Tensor Parallelism):切分矩阵运算
# 以 Linear 层为例(输入 d_model=4096,输出 d_ff=16384)
# 单 GPU:
# W: [4096, 16384],显存 256MB(FP16)
# 4 GPU 张量并行:
# GPU0: W0: [4096, 4096],显存 64MB
# GPU1: W1: [4096, 4096],显存 64MB
# GPU2: W2: [4096, 4096],显存 64MB
# GPU3: W3: [4096, 4096],显存 64MB
# 使用 Megatron-LM 实现张量并行
from megatron.core import tensor_parallel
class ParallelMLP(torch.nn.Module):
def __init__(self, hidden_size, ffn_hidden_size):
super().__init__()
# 列并行:权重按列切分
self.dense_h_to_4h = tensor_parallel.ColumnParallelLinear(
hidden_size, ffn_hidden_size,
gather_output=False
)
# 行并行:权重按行切分
self.dense_4h_to_h = tensor_parallel.RowParallelLinear(
ffn_hidden_size, hidden_size,
input_is_parallel=True
)流水线并行(Pipeline Parallelism)
将模型按层切分到不同 GPU,形成流水线:
流水线并行(4 GPU,每 GPU 负责 8 层):
微批次(Micro-batch)流水线:
时间步: 1 2 3 4 5 6 7
GPU0: [F1] [F2] [F3] [F4] [B4] [B3] [B2]
GPU1: [F1] [F2] [F3] [F4] [B4] [B3]
GPU2: [F1] [F2] [F3] [F4] [B4]
GPU3: [F1] [F2] [F3] [F4]
F = 前向传播,B = 反向传播
流水线气泡(Bubble)= 等待时间,影响效率3D 并行(数据 + 张量 + 流水线)
大规模训练通常组合使用三种并行:
1000 GPU 训练 GPT-3(175B):
数据并行:8 路(8 个数据副本)
张量并行:8 路(每层切分到 8 GPU)
流水线并行:16 路(模型切分为 16 段)
总 GPU:8 × 8 × 16 = 1024 GPUDeepSpeed 集成
DeepSpeed 是微软开源的大模型训练优化库:
python
# deepspeed_config.json
{
"train_batch_size": 2048,
"train_micro_batch_size_per_gpu": 4,
"gradient_accumulation_steps": 64,
"optimizer": {
"type": "AdamW",
"params": {
"lr": 1e-4,
"betas": [0.9, 0.95],
"eps": 1e-8,
"weight_decay": 0.1
}
},
"scheduler": {
"type": "WarmupDecayLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 1e-4,
"warmup_num_steps": 2000,
"total_num_steps": 100000
}
},
"zero_optimization": {
"stage": 3, // ZeRO Stage 3:切分参数+梯度+优化器状态
"offload_optimizer": {
"device": "cpu", // 优化器状态卸载到 CPU
"pin_memory": true
},
"offload_param": {
"device": "cpu" // 参数卸载到 CPU(极大节省显存)
},
"overlap_comm": true,
"contiguous_gradients": true,
"reduce_bucket_size": 5e8,
"stage3_prefetch_bucket_size": 5e8,
"stage3_param_persistence_threshold": 1e6
},
"fp16": {
"enabled": true,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
"gradient_clipping": 1.0,
"steps_per_print": 100,
"wall_clock_breakdown": false
}python
# 使用 DeepSpeed 训练
import deepspeed
import torch
model = MyLargeModel()
optimizer = torch.optim.AdamW(model.parameters())
# 初始化 DeepSpeed
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
optimizer=optimizer,
config="deepspeed_config.json"
)
# 训练循环
for batch in dataloader:
inputs, labels = batch
outputs = model_engine(inputs)
loss = criterion(outputs, labels)
model_engine.backward(loss)
model_engine.step()
# 保存检查点
model_engine.save_checkpoint("./checkpoints", tag="step-1000")ZeRO 显存优化
ZeRO(Zero Redundancy Optimizer)三个阶段:
Stage 1:切分优化器状态
每 GPU 只保存 1/N 的优化器状态
显存节省:4x(Adam 优化器)
Stage 2:切分梯度 + 优化器状态
显存节省:8x
Stage 3:切分参数 + 梯度 + 优化器状态
显存节省:N×(N = GPU 数量)
代价:通信量增加
实际效果(以 GPT-2 1.5B 为例,8 GPU):
无优化:每 GPU 需要 ~24GB
ZeRO Stage 1:每 GPU ~12GB
ZeRO Stage 2:每 GPU ~8GB
ZeRO Stage 3:每 GPU ~3GB训练监控
python
# 使用 TensorBoard 监控训练
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter(log_dir="runs/bert-finetune-v1")
for step, batch in enumerate(train_loader):
loss, accuracy = train_step(batch)
# 记录标量
writer.add_scalar("Loss/train", loss, step)
writer.add_scalar("Accuracy/train", accuracy, step)
# 记录学习率
writer.add_scalar("LR", optimizer.param_groups[0]["lr"], step)
# 记录梯度范数(检测梯度爆炸/消失)
total_norm = 0
for p in model.parameters():
if p.grad is not None:
total_norm += p.grad.data.norm(2).item() ** 2
total_norm = total_norm ** 0.5
writer.add_scalar("GradNorm", total_norm, step)
# 每 1000 步记录权重分布
if step % 1000 == 0:
for name, param in model.named_parameters():
writer.add_histogram(name, param, step)
writer.close()
# 启动 TensorBoard
# tensorboard --logdir=runs --host=0.0.0.0 --port=6006检查点管理
python
import os
import torch
def save_checkpoint(model, optimizer, scheduler, epoch, step, loss, save_dir):
"""保存训练检查点"""
os.makedirs(save_dir, exist_ok=True)
checkpoint = {
"epoch": epoch,
"step": step,
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"scheduler_state_dict": scheduler.state_dict(),
"loss": loss,
}
path = os.path.join(save_dir, f"checkpoint-step-{step}.pt")
torch.save(checkpoint, path)
# 只保留最近 3 个检查点
checkpoints = sorted([f for f in os.listdir(save_dir) if f.endswith(".pt")])
while len(checkpoints) > 3:
os.remove(os.path.join(save_dir, checkpoints.pop(0)))
print(f"检查点已保存: {path}")
def load_checkpoint(model, optimizer, scheduler, checkpoint_path):
"""恢复训练检查点"""
checkpoint = torch.load(checkpoint_path, map_location="cpu")
model.load_state_dict(checkpoint["model_state_dict"])
optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
scheduler.load_state_dict(checkpoint["scheduler_state_dict"])
return checkpoint["epoch"], checkpoint["step"], checkpoint["loss"]