Chapter 12

生产部署 · FastAPI/llama_deploy/缓存/上线清单

jupyter 跑通的 RAG 还离生产差十万八千里。这章是全书的"最后一公里"——怎么把 LlamaIndex 做成一个稳定的 service:持久化策略、异步 API、多层缓存、成本控制、容器化、上线 checklist。做完这章你有一套能上线的骨架。

一、生产 RAG 的核心考量

从 notebook 到 prod,思路上要换 6 件事:

维度NotebookProduction
索引每次重建持久化 + 增量更新
调用同步 query异步 aquery + streaming
延迟不关心p95 是 SLA
成本不关心缓存 + 路由 + budget
失败报错就停重试/降级/fallback
部署本机Docker + K8s + 监控

二、持久化策略:索引不能每次重建

LlamaIndex 三个需要持久化的组件:docstore / index_store / vector_store。生产上通常各走各的存储:

from llama_index.core import StorageContext, VectorStoreIndex
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.storage.index_store.redis import RedisIndexStore

# 生产级组合:Qdrant 存向量、Redis 存 doc/index 元数据
storage = StorageContext.from_defaults(
    docstore=RedisDocumentStore.from_host_and_port("redis", 6379, namespace="rag"),
    index_store=RedisIndexStore.from_host_and_port("redis", 6379, namespace="rag"),
    vector_store=QdrantVectorStore(client=qdrant_client, collection_name="docs_v1"),
)

# 首次:建索引
# index = VectorStoreIndex.from_documents(docs, storage_context=storage)

# 生产启动:从已有存储加载,不重建
from llama_index.core import load_index_from_storage
index = load_index_from_storage(storage)
索引版本化:每次换 embedding 模型 / 换 chunk 策略 → 新 collection 名(docs_v2),不要原地覆盖。服务代码里加 INDEX_VERSION 环境变量,切换 = 改 env + 滚动重启,2 分钟回滚也没压力。

三、FastAPI 生产模板

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from contextlib import asynccontextmanager
import asyncio

# ---- 生命周期:启动时加载,关闭时清理 ----
@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.index = load_index_from_storage(storage)
    app.state.qe = app.state.index.as_query_engine(
        similarity_top_k=20,
        node_postprocessors=[rerank, LongContextReorder()],
        streaming=True,
        use_async=True,
    )
    yield
    # 关闭时:flush cache 等

app = FastAPI(lifespan=lifespan)

class QueryRequest(BaseModel):
    query: str
    user_id: str
    top_k: int | None = 5

@app.post("/v1/rag/query")
async def query(req: QueryRequest):
    try:
        resp = await asyncio.wait_for(app.state.qe.aquery(req.query), timeout=15)
        return {
            "answer": str(resp),
            "sources": [{"id": n.node.node_id, "text": n.node.text[:200]} for n in resp.source_nodes],
        }
    except asyncio.TimeoutError:
        raise HTTPException(504, "RAG timeout")
    except Exception as e:
        logger.exception("query failed")
        raise HTTPException(500, "internal")

# ---- 流式 SSE ----
@app.post("/v1/rag/stream")
async def stream(req: QueryRequest):
    resp = await app.state.qe.aquery(req.query)

    async def gen():
        async for token in resp.async_response_gen():
            yield f"data: {token}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(gen(), media_type="text/event-stream")

健康检查 & ready probe

@app.get("/healthz")
async def health():
    return {"status": "ok"}

@app.get("/readyz")
async def ready():
    # 关键依赖探活
    try:
        await qdrant_client.get_collections()
        await redis.ping()
        return {"status": "ready"}
    except Exception:
        raise HTTPException(503)

K8s 的 readinessProbe 打 /readyz,livenessProbe 打 /healthz——别两个合一。

四、llama_deploy:微服务化编排

如果你有多个 workflow / agent / RAG pipeline,想要独立扩缩容,用 llama_deploy:

from llama_deploy import (
    deploy_core, deploy_workflow, ControlPlaneConfig,
    WorkflowServiceConfig,
)
from llama_deploy.message_queues import RedisMessageQueueConfig

# 1. 启动 control plane(集群大脑)
async def main():
    await deploy_core(
        ControlPlaneConfig(),
        RedisMessageQueueConfig(host="redis"),
    )

# 2. 发布一个 workflow 为独立服务
from my_app.workflows import RAGWorkflow
await deploy_workflow(
    workflow=RAGWorkflow(),
    workflow_config=WorkflowServiceConfig(
        service_name="rag_service",
        host="0.0.0.0", port=8002,
    ),
)

# 3. 客户端调用
from llama_deploy import LlamaDeployClient
client = LlamaDeployClient(ControlPlaneConfig())
session = client.create_session()
result = session.run("rag_service", query="...")

什么时候用:有 3+ 个独立 workflow、要求热更新/灰度、多团队共用一个 RAG core——llama_deploy 比自己攒 RabbitMQ + worker 省事很多。简单单体直接 FastAPI 就够。

五、多层缓存

生产级 RAG 的缓存要分 3 层——每一层节省的成本/延迟不同:

缓存什么命中率省什么
响应缓存query → final answer5-15%全链路延迟 + 成本
embedding 缓存text → vector40-60%(文档层面)embedding API 调用
检索缓存query → nodes10-20%vector store 查询

响应缓存(语义 + 精确双模式)

import hashlib, json
from redis.asyncio import Redis

redis = Redis.from_url("redis://redis:6379")

async def cached_query(q: str, ttl=3600):
    # 精确匹配
    k = "rag:resp:" + hashlib.md5(q.encode()).hexdigest()
    if hit := await redis.get(k):
        return json.loads(hit)

    resp = await qe.aquery(q)
    payload = {"answer": str(resp), "nodes": [...]}
    await redis.setex(k, ttl, json.dumps(payload))
    return payload

想要语义级缓存("怎么退款""退款流程是什么" 命中同一个)——上 GPTCache 或自己写:embed(query) → 在向量库查 top_1 → 余弦相似度 > 0.95 命中。

Embedding 缓存

from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.storage.kvstore import RedisKVStore
from llama_index.core.ingestion import IngestionCache

cache = IngestionCache(
    cache=RedisKVStore.from_host_and_port("redis", 6379),
    collection="embed_cache",
)

pipeline = IngestionPipeline(
    transformations=[SentenceSplitter(), OpenAIEmbedding()],
    cache=cache,
    docstore=docstore,
)

大头:文档再次被切块时,如果 chunk 文本没变,直接用缓存的 embedding。重跑索引从几十刀降到几毛钱。

六、成本控制

  1. 模型路由:简单问题用 gpt-4o-mini / Haiku,复杂的用 Sonnet / gpt-4o。路由逻辑:query 长度 < 30 字且没有"分析/对比"关键词 → 走小模型
  2. 重排替代大 top_k:top_k=50 给 LLM 合成 vs top_k=50 + rerank top_n=5——后者准且便宜一半
  3. Prompt caching(Anthropic/OpenAI)——system prompt + few-shot 缓存后,下次请求便宜 90%
  4. Batch API:非实时场景(后台总结、报告生成)用 Batch API,50% 折扣
  5. budget guardrail:每个 user_id / 每天封顶,超限拒绝
# 粗路由的骨架
async def smart_query(q: str, user_id: str):
    if await over_budget(user_id):
        raise HTTPException(429, "daily quota exceeded")

    complexity = judge_complexity(q)    # 自己写个简单规则或小 LLM
    qe = qe_mini if complexity < 0.5 else qe_pro
    resp = await qe.aquery(q)
    await record_usage(user_id, resp.metadata.get("token_usage", {}))
    return resp

七、限流 + 并发控制

from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=lambda r: r.state.user_id)

@app.post("/v1/rag/query")
@limiter.limit("20/minute")
async def query(req: QueryRequest): ...

# 进程内并发 semaphore(保护 LLM API 不被打爆)
sem = asyncio.Semaphore(50)

async def safe_query(q):
    async with sem:
        return await qe.aquery(q)

八、降级与 fallback

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
async def call_llm_with_retry(prompt):
    return await primary_llm.acomplete(prompt)

async def robust_query(q):
    try:
        return await asyncio.wait_for(qe_pro.aquery(q), timeout=10)
    except (asyncio.TimeoutError, ProviderError):
        # 主 LLM 超时 → 切小模型
        try:
            return await qe_mini.aquery(q)
        except Exception:
            # 全挂 → 返回"系统繁忙,请稍后"+检索片段
            nodes = retriever.retrieve(q)
            return Response(response="系统繁忙,以下是相关资料:\n" + "\n---\n".join(n.text for n in nodes[:3]))

九、索引增量更新

生产数据每天都在变——文档新增、修改、删除。靠 IngestionPipeline + DocstoreStrategy.UPSERTS_AND_DELETE:

from llama_index.core.ingestion import IngestionPipeline, DocstoreStrategy

pipeline = IngestionPipeline(
    transformations=[splitter, embed_model],
    docstore=redis_docstore,
    vector_store=qdrant_vs,
    docstore_strategy=DocstoreStrategy.UPSERTS_AND_DELETE,   # 删掉不再存在的
)

# 定时任务:每小时拉最新文档,增量跑
async def sync_job():
    docs = await fetch_updated_docs(since=last_sync_time)
    await pipeline.arun(documents=docs, num_workers=4)

关键点:Document 的 doc_id 要稳定(用源系统 ID,不是随机 UUID),不然 UPSERTS 识别不出来。

十、Docker 化

FROM python:3.12-slim

WORKDIR /app

# 系统依赖:如果用 BGE 本地 rerank 需要 C++ toolchain
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential git curl && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 预下载 embedding/rerank 模型到镜像(避免冷启动拉几 GB)
RUN python -c "from sentence_transformers import CrossEncoder; CrossEncoder('BAAI/bge-reranker-v2-m3')"

COPY . .

# gunicorn + uvicorn 混合,多 worker
CMD ["gunicorn", "app:app", \
     "-k", "uvicorn.workers.UvicornWorker", \
     "-w", "4", \
     "-b", "0.0.0.0:8000", \
     "--timeout", "60"]

docker-compose 本地一键

services:
  app:
    build: .
    ports: ["8000:8000"]
    env_file: .env
    depends_on: [redis, qdrant]
  redis:
    image: redis:7-alpine
    volumes: ["redis_data:/data"]
  qdrant:
    image: qdrant/qdrant:latest
    ports: ["6333:6333"]
    volumes: ["qdrant_data:/qdrant/storage"]
volumes:
  redis_data:
  qdrant_data:

十一、Kubernetes 部署要点

十二、CI/CD 中的 eval regression

# .github/workflows/eval.yml
name: RAG regression
on: [pull_request]
jobs:
  eval:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: {python-version: "3.12"}
      - run: pip install -r requirements.txt
      - name: Run eval on golden set
        env:
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: python scripts/run_eval.py --out eval_result.json
      - name: Compare with baseline
        run: python scripts/compare.py baseline.json eval_result.json --fail-if-worse-than 0.03

每个 PR 跑 50 条黄金集,任何指标比 baseline 下滑超过 3% 就阻断合并——RAG 回归防线就从这里来

十三、监控大盘

至少 4 个 dashboard:

  1. Traffic:QPS、p50/p95/p99 延迟、错误率(5xx)
  2. Cost:每日 token 消耗、每 query 成本趋势、缓存命中率
  3. Quality:用户 👍/👎 率、refusal 率(LLM 说"我不知道"的比例)
  4. Infra:pod CPU/memory、Qdrant p99、Redis 内存、LLM provider 错误率

十四、上线前 20 条 checklist

代码/架构
1. 所有 LLM 调用走 async(aquery/acomplete)
2. 全局 timeout(每个 aquery 外包一层 asyncio.wait_for)
3. 重试 + 指数退避(tenacity)
4. 主 LLM 挂掉有 fallback(小模型/返回检索片段)
5. Prompt/model 名/top_k 走 config,不硬编码

数据
6. 索引版本化(v1/v2 独立 collection)
7. 增量更新 job(每日/每小时) + 死文档清理
8. doc_id 稳定(源系统主键)
9. 敏感信息过滤(PII,上线前扫一遍)

性能
10. Embedding cache 开
11. 响应 cache 开(TTL 1 小时起)
12. Rerank top_n=5 而非 top_k 塞满
13. HNSW + scalar quantization 调过
14. prompt caching(Anthropic/OpenAI)开

可观测
15. Phoenix / Langfuse / OTel trace 接入
16. 结构化 log(query / latency / token / variant)
17. 用户 👍/👎 埋点

安全/合规
18. API 鉴权(API key / JWT)
19. 每用户限流 + 日预算
20. Prompt injection 防护(system prompt 锁定身份 + 输出审计)

十五、全书小结

这就是 LlamaIndex:
① 它是 "LLM 时代的数据框架"——把文档变成 LLM 能用的 Node,把检索变成可精细控制的 pipeline。
② Ch1-Ch6 是"数据与索引"——怎么切块、怎么选 index、怎么挑 vector store。
③ Ch7-Ch8 是"检索精度"——retriever 三层结构 + query transform。RAG 的 90% 质量都在这两章。
④ Ch9-Ch10 是"编排"——Agent(LLM 自主决策) + Workflow(代码编排)。简单 RAG 不需要,复杂应用绕不开。
⑤ Ch11-Ch12 是"工程化"——评估、观测、部署,让 RAG 从 demo 到 service。

记住四句话:召回是天花板,Rerank 是性价比之王,Eval 不能靠感觉,部署要有 checklist。剩下的,就靠你把手弄脏,边做边调。