Chapter 09

工程化部署

从 notebook 到生产的鸿沟:推理要服务化、索引要流水线化、冷热数据要分层、更新要平滑。本章是部署 ColPali 的最后一公里手册。

架构蓝图

┌──────────────┐ ┌─────────────┐ │ Upload API │──→ │ Redis Queue │ └──────────────┘ └─────┬───────┘ ▼ ┌──────────────┐ │ Worker pool │ (GPU 索引节点) │ PDF → img │ │ img → 向量 │ └──────┬───────┘ ▼ ┌──────────────┐ │ Qdrant │ (热:内存 binary) │ + NVMe │ (冷:full precision) └──────┬───────┘ ▲ │ ┌──────────────┐ ┌─────┴───────┐ │ Query API │──→ │ Retriever │ (CPU/GPU 共用) └──────────────┘ └─────┬───────┘ ▼ ┌──────────────┐ │ VLM 生成 │ (Claude/GPT) └──────────────┘

推理服务化选项

自写 FastAPI + transformers
最简单,写一个 /embed 端点,内部 batch 调用模型。适合中小流量。
HuggingFace TEI(Text Embeddings Inference)
Rust 写的推理容器,支持多向量输出。启动:docker run --gpus all -p 8080:80 ghcr.io/huggingface/text-embeddings-inference --model-id vidore/colpali-v1.2。比 Python 快 30%。
vLLM(实验性)
2025 底 vLLM 开始支持视觉 embedding 模型,PagedAttention 让大 batch 吞吐 3-5 倍。
Ray Serve
需要自动扩缩容、多模型共存时上。

FastAPI 示例

from fastapi import FastAPI, UploadFile
from pydantic import BaseModel
from PIL import Image
from io import BytesIO
from transformers import ColPaliForRetrieval, ColPaliProcessor
import torch, asyncio

app = FastAPI()
model = ColPaliForRetrieval.from_pretrained("vidore/colpali-v1.2",
    torch_dtype=torch.bfloat16, device_map="cuda").eval()
processor = ColPaliProcessor.from_pretrained("vidore/colpali-v1.2")

class Query(BaseModel):
    text: str

@app.post("/embed/image")
async def embed_image(file: UploadFile):
    img = Image.open(BytesIO(await file.read()))
    batch = processor.process_images([img]).to("cuda")
    with torch.no_grad():
        vecs = model(**batch).embeddings[0]
    return {"vec": vecs.cpu().float().tolist()}

@app.post("/embed/query")
async def embed_query(q: Query):
    batch = processor.process_queries([q.text]).to("cuda")
    with torch.no_grad():
        vecs = model(**batch).embeddings[0]
    return {"vec": vecs.cpu().float().tolist()}

动态批处理

查询 QPS 高时,多个独立请求可以在 GPU 里攒一批处理,延迟不增、吞吐翻倍。TEI 自带,自写版需手动:

from asyncio import Queue, gather, create_task

queue: Queue = Queue()

async def batcher():
    while True:
        batch = []
        batch.append(await queue.get())
        try:
            while len(batch) < 16:
                batch.append(queue.get_nowait())
        except:
            pass
        queries = [b["q"] for b in batch]
        results = run_model(queries)
        for b, r in zip(batch, results):
            b["fut"].set_result(r)

批量索引流水线

1. Ingestion
文档上传到 S3 → 触发 SQS/Redis 消息 → worker 拉取
2. 预处理
PDF→图,失败重试 3 次,损坏文件进死信队列人工 review
3. 向量化
GPU worker 按 batch 处理,每页记录 embedding + 预览图 URL
4. 写入 Qdrant
attribute metadata(doc_id、tags、permission)便于过滤查询
5. ACK
一个文档所有页面都写成功后才 ack 消息,失败回滚不留半成品

冷启动优化

# 模型在 worker 进程启动时预加载
@app.on_event("startup")
async def warmup():
    # 冷启动跑一次,让 cudnn 选算法
    dummy_img = Image.new("RGB", (448, 448))
    batch = processor.process_images([dummy_img]).to("cuda")
    with torch.no_grad():
        _ = model(**batch)
Serverless 不适合
ColPali 模型 6GB+,每次冷启动要加载 20 秒。AWS Lambda、Cloud Run 这种按需启动的平台不合适——要么常驻,要么 Kubernetes + scale-to-zero(至少留 1 副本避免首请求超时)。

热更新与模型版本

A/B 切流
新模型部署成 /v2/embed,先 1% 流量镜像评估,确认 nDCG 无回归再切。
Embedding 版本兼容性
新旧模型的 embedding 空间不可混算。更换模型等于全量重做索引。规划停机窗口或双写索引。
Shadow indexing
切换模型前 2 天开始用新模型并行建一个 shadow 索引,完毕再原子切换。

监控指标

灾备与多副本

本章小结