Skip to content

训练任务调度

分布式训练策略

数据并行(Data Parallelism)

最常用的并行策略,每个 GPU 持有完整模型副本,处理不同数据:

数据并行示意(4 GPU):

Batch(1024 样本)
    ├── GPU0:样本 0-255   → 梯度0
    ├── GPU1:样本 256-511 → 梯度1
    ├── GPU2:样本 512-767 → 梯度2
    └── GPU3:样本 768-1023 → 梯度3

        AllReduce(梯度求和/平均)

        各 GPU 用相同梯度更新模型

适用:模型能放入单 GPU 显存

模型并行(Model Parallelism)

当模型太大无法放入单 GPU 时,将模型切分到多个 GPU:

python
# 张量并行(Tensor Parallelism):切分矩阵运算
# 以 Linear 层为例(输入 d_model=4096,输出 d_ff=16384)

# 单 GPU:
# W: [4096, 16384],显存 256MB(FP16)

# 4 GPU 张量并行:
# GPU0: W0: [4096, 4096],显存 64MB
# GPU1: W1: [4096, 4096],显存 64MB
# GPU2: W2: [4096, 4096],显存 64MB
# GPU3: W3: [4096, 4096],显存 64MB

# 使用 Megatron-LM 实现张量并行
from megatron.core import tensor_parallel

class ParallelMLP(torch.nn.Module):
    def __init__(self, hidden_size, ffn_hidden_size):
        super().__init__()
        # 列并行:权重按列切分
        self.dense_h_to_4h = tensor_parallel.ColumnParallelLinear(
            hidden_size, ffn_hidden_size,
            gather_output=False
        )
        # 行并行:权重按行切分
        self.dense_4h_to_h = tensor_parallel.RowParallelLinear(
            ffn_hidden_size, hidden_size,
            input_is_parallel=True
        )

流水线并行(Pipeline Parallelism)

将模型按层切分到不同 GPU,形成流水线:

流水线并行(4 GPU,每 GPU 负责 8 层):

微批次(Micro-batch)流水线:

时间步:  1    2    3    4    5    6    7
GPU0:  [F1] [F2] [F3] [F4] [B4] [B3] [B2]
GPU1:       [F1] [F2] [F3] [F4] [B4] [B3]
GPU2:            [F1] [F2] [F3] [F4] [B4]
GPU3:                 [F1] [F2] [F3] [F4]

F = 前向传播,B = 反向传播
流水线气泡(Bubble)= 等待时间,影响效率

3D 并行(数据 + 张量 + 流水线)

大规模训练通常组合使用三种并行:

1000 GPU 训练 GPT-3(175B):
  数据并行:8 路(8 个数据副本)
  张量并行:8 路(每层切分到 8 GPU)
  流水线并行:16 路(模型切分为 16 段)
  总 GPU:8 × 8 × 16 = 1024 GPU

DeepSpeed 集成

DeepSpeed 是微软开源的大模型训练优化库:

python
# deepspeed_config.json
{
  "train_batch_size": 2048,
  "train_micro_batch_size_per_gpu": 4,
  "gradient_accumulation_steps": 64,
  
  "optimizer": {
    "type": "AdamW",
    "params": {
      "lr": 1e-4,
      "betas": [0.9, 0.95],
      "eps": 1e-8,
      "weight_decay": 0.1
    }
  },
  
  "scheduler": {
    "type": "WarmupDecayLR",
    "params": {
      "warmup_min_lr": 0,
      "warmup_max_lr": 1e-4,
      "warmup_num_steps": 2000,
      "total_num_steps": 100000
    }
  },
  
  "zero_optimization": {
    "stage": 3,                    // ZeRO Stage 3:切分参数+梯度+优化器状态
    "offload_optimizer": {
      "device": "cpu",             // 优化器状态卸载到 CPU
      "pin_memory": true
    },
    "offload_param": {
      "device": "cpu"              // 参数卸载到 CPU(极大节省显存)
    },
    "overlap_comm": true,
    "contiguous_gradients": true,
    "reduce_bucket_size": 5e8,
    "stage3_prefetch_bucket_size": 5e8,
    "stage3_param_persistence_threshold": 1e6
  },
  
  "fp16": {
    "enabled": true,
    "loss_scale": 0,
    "loss_scale_window": 1000,
    "initial_scale_power": 16,
    "hysteresis": 2,
    "min_loss_scale": 1
  },
  
  "gradient_clipping": 1.0,
  "steps_per_print": 100,
  "wall_clock_breakdown": false
}
python
# 使用 DeepSpeed 训练
import deepspeed
import torch

model = MyLargeModel()
optimizer = torch.optim.AdamW(model.parameters())

# 初始化 DeepSpeed
model_engine, optimizer, _, _ = deepspeed.initialize(
    model=model,
    optimizer=optimizer,
    config="deepspeed_config.json"
)

# 训练循环
for batch in dataloader:
    inputs, labels = batch
    outputs = model_engine(inputs)
    loss = criterion(outputs, labels)
    
    model_engine.backward(loss)
    model_engine.step()

# 保存检查点
model_engine.save_checkpoint("./checkpoints", tag="step-1000")

ZeRO 显存优化

ZeRO(Zero Redundancy Optimizer)三个阶段:

Stage 1:切分优化器状态
  每 GPU 只保存 1/N 的优化器状态
  显存节省:4x(Adam 优化器)

Stage 2:切分梯度 + 优化器状态
  显存节省:8x

Stage 3:切分参数 + 梯度 + 优化器状态
  显存节省:N×(N = GPU 数量)
  代价:通信量增加

实际效果(以 GPT-2 1.5B 为例,8 GPU):
  无优化:每 GPU 需要 ~24GB
  ZeRO Stage 1:每 GPU ~12GB
  ZeRO Stage 2:每 GPU ~8GB
  ZeRO Stage 3:每 GPU ~3GB

训练监控

python
# 使用 TensorBoard 监控训练
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter(log_dir="runs/bert-finetune-v1")

for step, batch in enumerate(train_loader):
    loss, accuracy = train_step(batch)
    
    # 记录标量
    writer.add_scalar("Loss/train", loss, step)
    writer.add_scalar("Accuracy/train", accuracy, step)
    
    # 记录学习率
    writer.add_scalar("LR", optimizer.param_groups[0]["lr"], step)
    
    # 记录梯度范数(检测梯度爆炸/消失)
    total_norm = 0
    for p in model.parameters():
        if p.grad is not None:
            total_norm += p.grad.data.norm(2).item() ** 2
    total_norm = total_norm ** 0.5
    writer.add_scalar("GradNorm", total_norm, step)
    
    # 每 1000 步记录权重分布
    if step % 1000 == 0:
        for name, param in model.named_parameters():
            writer.add_histogram(name, param, step)

writer.close()

# 启动 TensorBoard
# tensorboard --logdir=runs --host=0.0.0.0 --port=6006

检查点管理

python
import os
import torch

def save_checkpoint(model, optimizer, scheduler, epoch, step, loss, save_dir):
    """保存训练检查点"""
    os.makedirs(save_dir, exist_ok=True)
    
    checkpoint = {
        "epoch": epoch,
        "step": step,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "scheduler_state_dict": scheduler.state_dict(),
        "loss": loss,
    }
    
    path = os.path.join(save_dir, f"checkpoint-step-{step}.pt")
    torch.save(checkpoint, path)
    
    # 只保留最近 3 个检查点
    checkpoints = sorted([f for f in os.listdir(save_dir) if f.endswith(".pt")])
    while len(checkpoints) > 3:
        os.remove(os.path.join(save_dir, checkpoints.pop(0)))
    
    print(f"检查点已保存: {path}")

def load_checkpoint(model, optimizer, scheduler, checkpoint_path):
    """恢复训练检查点"""
    checkpoint = torch.load(checkpoint_path, map_location="cpu")
    
    model.load_state_dict(checkpoint["model_state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
    scheduler.load_state_dict(checkpoint["scheduler_state_dict"])
    
    return checkpoint["epoch"], checkpoint["step"], checkpoint["loss"]

褚成志的云与计算笔记