模型服务化
模型生命周期管理
模型生命周期:
开发阶段:
数据准备 → 模型训练 → 模型评估 → 模型注册
部署阶段:
模型注册 → 格式转换 → 部署测试 → 灰度发布 → 全量上线
运营阶段:
性能监控 → 数据漂移检测 → 模型更新 → A/B 测试模型注册中心(Model Registry)
python
import mlflow
# 注册模型到 Model Registry
with mlflow.start_run():
# 训练模型...
# 记录模型
mlflow.pytorch.log_model(
model,
"bert-sentiment",
registered_model_name="bert-sentiment-classifier"
)
# 模型版本管理
client = mlflow.tracking.MlflowClient()
# 将模型版本提升到 Staging
client.transition_model_version_stage(
name="bert-sentiment-classifier",
version=3,
stage="Staging"
)
# 验证通过后提升到 Production
client.transition_model_version_stage(
name="bert-sentiment-classifier",
version=3,
stage="Production"
)
# 获取当前生产版本
production_model = client.get_latest_versions(
"bert-sentiment-classifier",
stages=["Production"]
)[0]
print(f"生产版本: {production_model.version}")
print(f"模型路径: {production_model.source}")模型格式转换
PyTorch → ONNX → TensorRT
python
import torch
import onnx
import tensorrt as trt
# 步骤1:PyTorch 导出 ONNX
model = load_model("bert-sentiment-v3")
model.eval()
# 准备示例输入
dummy_input = {
"input_ids": torch.randint(0, 30000, (1, 128)),
"attention_mask": torch.ones(1, 128, dtype=torch.long)
}
torch.onnx.export(
model,
(dummy_input["input_ids"], dummy_input["attention_mask"]),
"bert_sentiment.onnx",
opset_version=17,
input_names=["input_ids", "attention_mask"],
output_names=["logits"],
dynamic_axes={
"input_ids": {0: "batch_size"},
"attention_mask": {0: "batch_size"},
"logits": {0: "batch_size"}
}
)
# 步骤2:验证 ONNX 模型
onnx_model = onnx.load("bert_sentiment.onnx")
onnx.checker.check_model(onnx_model)
print("ONNX 模型验证通过")
# 步骤3:转换为 TensorRT(命令行)
# trtexec \
# --onnx=bert_sentiment.onnx \
# --saveEngine=bert_sentiment_fp16.trt \
# --fp16 \
# --minShapes=input_ids:1x128,attention_mask:1x128 \
# --optShapes=input_ids:16x128,attention_mask:16x128 \
# --maxShapes=input_ids:64x128,attention_mask:64x128 \
# --workspace=4096 # 4GB workspace在线推理服务
FastAPI 推理服务
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import numpy as np
from typing import List
import uvicorn
app = FastAPI(title="H3C AI 推理服务", version="1.0.0")
# 全局模型(启动时加载)
model = None
tokenizer = None
@app.on_event("startup")
async def load_model():
global model, tokenizer
from transformers import AutoTokenizer, AutoModelForSequenceClassification
model_path = "/models/bert-sentiment-v3"
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForSequenceClassification.from_pretrained(model_path)
model.eval()
model.cuda()
print("模型加载完成")
class InferenceRequest(BaseModel):
texts: List[str]
max_length: int = 128
class InferenceResponse(BaseModel):
predictions: List[str]
probabilities: List[List[float]]
latency_ms: float
@app.post("/predict", response_model=InferenceResponse)
async def predict(request: InferenceRequest):
import time
start = time.time()
try:
# Tokenize
inputs = tokenizer(
request.texts,
max_length=request.max_length,
padding=True,
truncation=True,
return_tensors="pt"
).to("cuda")
# 推理
with torch.no_grad():
outputs = model(**inputs)
probs = torch.softmax(outputs.logits, dim=-1)
probs_np = probs.cpu().numpy().tolist()
predictions = ["positive" if p[1] > 0.5 else "negative" for p in probs_np]
latency = (time.time() - start) * 1000
return InferenceResponse(
predictions=predictions,
probabilities=probs_np,
latency_ms=round(latency, 2)
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy", "model_loaded": model is not None}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080, workers=1)Kubernetes 部署
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: bert-sentiment-service
labels:
app: bert-sentiment
spec:
replicas: 3
selector:
matchLabels:
app: bert-sentiment
template:
metadata:
labels:
app: bert-sentiment
spec:
containers:
- name: inference
image: registry.example.com/ai/bert-sentiment:v3
ports:
- containerPort: 8080
resources:
limits:
nvidia.com/gpu: 1
memory: "16Gi"
cpu: "4"
requests:
nvidia.com/gpu: 1
memory: "8Gi"
cpu: "2"
# 健康检查
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
# 模型挂载
volumeMounts:
- name: model-storage
mountPath: /models
volumes:
- name: model-storage
persistentVolumeClaim:
claimName: model-pvc
---
apiVersion: v1
kind: Service
metadata:
name: bert-sentiment-svc
spec:
selector:
app: bert-sentiment
ports:
- port: 80
targetPort: 8080
type: ClusterIP
---
# HPA:基于 GPU 利用率自动扩缩容
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: bert-sentiment-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: bert-sentiment-service
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70模型监控
python
# 推理监控:记录请求延迟、吞吐量、错误率
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# 定义指标
REQUEST_COUNT = Counter('inference_requests_total', '推理请求总数', ['model', 'status'])
REQUEST_LATENCY = Histogram('inference_latency_seconds', '推理延迟', ['model'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0])
GPU_MEMORY_USED = Gauge('gpu_memory_used_bytes', 'GPU 显存使用量', ['gpu_id'])
# 在推理函数中记录指标
def predict_with_metrics(texts, model_name="bert-sentiment"):
start = time.time()
try:
result = model.predict(texts)
REQUEST_COUNT.labels(model=model_name, status="success").inc()
return result
except Exception as e:
REQUEST_COUNT.labels(model=model_name, status="error").inc()
raise
finally:
latency = time.time() - start
REQUEST_LATENCY.labels(model=model_name).observe(latency)
# 启动 Prometheus metrics 端点
start_http_server(9090)