一、生产 RAG 的核心考量
从 notebook 到 prod,思路上要换 6 件事:
| 维度 | Notebook | Production |
|---|---|---|
| 索引 | 每次重建 | 持久化 + 增量更新 |
| 调用 | 同步 query | 异步 aquery + streaming |
| 延迟 | 不关心 | p95 是 SLA |
| 成本 | 不关心 | 缓存 + 路由 + budget |
| 失败 | 报错就停 | 重试/降级/fallback |
| 部署 | 本机 | Docker + K8s + 监控 |
二、持久化策略:索引不能每次重建
LlamaIndex 三个需要持久化的组件:docstore / index_store / vector_store。生产上通常各走各的存储:
from llama_index.core import StorageContext, VectorStoreIndex
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.storage.index_store.redis import RedisIndexStore
# 生产级组合:Qdrant 存向量、Redis 存 doc/index 元数据
storage = StorageContext.from_defaults(
docstore=RedisDocumentStore.from_host_and_port("redis", 6379, namespace="rag"),
index_store=RedisIndexStore.from_host_and_port("redis", 6379, namespace="rag"),
vector_store=QdrantVectorStore(client=qdrant_client, collection_name="docs_v1"),
)
# 首次:建索引
# index = VectorStoreIndex.from_documents(docs, storage_context=storage)
# 生产启动:从已有存储加载,不重建
from llama_index.core import load_index_from_storage
index = load_index_from_storage(storage)
docs_v2),不要原地覆盖。服务代码里加 INDEX_VERSION 环境变量,切换 = 改 env + 滚动重启,2 分钟回滚也没压力。
三、FastAPI 生产模板
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from contextlib import asynccontextmanager
import asyncio
# ---- 生命周期:启动时加载,关闭时清理 ----
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.index = load_index_from_storage(storage)
app.state.qe = app.state.index.as_query_engine(
similarity_top_k=20,
node_postprocessors=[rerank, LongContextReorder()],
streaming=True,
use_async=True,
)
yield
# 关闭时:flush cache 等
app = FastAPI(lifespan=lifespan)
class QueryRequest(BaseModel):
query: str
user_id: str
top_k: int | None = 5
@app.post("/v1/rag/query")
async def query(req: QueryRequest):
try:
resp = await asyncio.wait_for(app.state.qe.aquery(req.query), timeout=15)
return {
"answer": str(resp),
"sources": [{"id": n.node.node_id, "text": n.node.text[:200]} for n in resp.source_nodes],
}
except asyncio.TimeoutError:
raise HTTPException(504, "RAG timeout")
except Exception as e:
logger.exception("query failed")
raise HTTPException(500, "internal")
# ---- 流式 SSE ----
@app.post("/v1/rag/stream")
async def stream(req: QueryRequest):
resp = await app.state.qe.aquery(req.query)
async def gen():
async for token in resp.async_response_gen():
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
健康检查 & ready probe
@app.get("/healthz")
async def health():
return {"status": "ok"}
@app.get("/readyz")
async def ready():
# 关键依赖探活
try:
await qdrant_client.get_collections()
await redis.ping()
return {"status": "ready"}
except Exception:
raise HTTPException(503)
K8s 的 readinessProbe 打 /readyz,livenessProbe 打 /healthz——别两个合一。
四、llama_deploy:微服务化编排
如果你有多个 workflow / agent / RAG pipeline,想要独立扩缩容,用 llama_deploy:
from llama_deploy import (
deploy_core, deploy_workflow, ControlPlaneConfig,
WorkflowServiceConfig,
)
from llama_deploy.message_queues import RedisMessageQueueConfig
# 1. 启动 control plane(集群大脑)
async def main():
await deploy_core(
ControlPlaneConfig(),
RedisMessageQueueConfig(host="redis"),
)
# 2. 发布一个 workflow 为独立服务
from my_app.workflows import RAGWorkflow
await deploy_workflow(
workflow=RAGWorkflow(),
workflow_config=WorkflowServiceConfig(
service_name="rag_service",
host="0.0.0.0", port=8002,
),
)
# 3. 客户端调用
from llama_deploy import LlamaDeployClient
client = LlamaDeployClient(ControlPlaneConfig())
session = client.create_session()
result = session.run("rag_service", query="...")
什么时候用:有 3+ 个独立 workflow、要求热更新/灰度、多团队共用一个 RAG core——llama_deploy 比自己攒 RabbitMQ + worker 省事很多。简单单体直接 FastAPI 就够。
五、多层缓存
生产级 RAG 的缓存要分 3 层——每一层节省的成本/延迟不同:
| 层 | 缓存什么 | 命中率 | 省什么 |
|---|---|---|---|
| 响应缓存 | query → final answer | 5-15% | 全链路延迟 + 成本 |
| embedding 缓存 | text → vector | 40-60%(文档层面) | embedding API 调用 |
| 检索缓存 | query → nodes | 10-20% | vector store 查询 |
响应缓存(语义 + 精确双模式)
import hashlib, json
from redis.asyncio import Redis
redis = Redis.from_url("redis://redis:6379")
async def cached_query(q: str, ttl=3600):
# 精确匹配
k = "rag:resp:" + hashlib.md5(q.encode()).hexdigest()
if hit := await redis.get(k):
return json.loads(hit)
resp = await qe.aquery(q)
payload = {"answer": str(resp), "nodes": [...]}
await redis.setex(k, ttl, json.dumps(payload))
return payload
想要语义级缓存("怎么退款" 和 "退款流程是什么" 命中同一个)——上 GPTCache 或自己写:embed(query) → 在向量库查 top_1 → 余弦相似度 > 0.95 命中。
Embedding 缓存
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.storage.kvstore import RedisKVStore
from llama_index.core.ingestion import IngestionCache
cache = IngestionCache(
cache=RedisKVStore.from_host_and_port("redis", 6379),
collection="embed_cache",
)
pipeline = IngestionPipeline(
transformations=[SentenceSplitter(), OpenAIEmbedding()],
cache=cache,
docstore=docstore,
)
大头:文档再次被切块时,如果 chunk 文本没变,直接用缓存的 embedding。重跑索引从几十刀降到几毛钱。
六、成本控制
- 模型路由:简单问题用 gpt-4o-mini / Haiku,复杂的用 Sonnet / gpt-4o。路由逻辑:query 长度 < 30 字且没有"分析/对比"关键词 → 走小模型
- 重排替代大 top_k:top_k=50 给 LLM 合成 vs top_k=50 + rerank top_n=5——后者准且便宜一半
- Prompt caching(Anthropic/OpenAI)——system prompt + few-shot 缓存后,下次请求便宜 90%
- Batch API:非实时场景(后台总结、报告生成)用 Batch API,50% 折扣
- budget guardrail:每个 user_id / 每天封顶,超限拒绝
# 粗路由的骨架
async def smart_query(q: str, user_id: str):
if await over_budget(user_id):
raise HTTPException(429, "daily quota exceeded")
complexity = judge_complexity(q) # 自己写个简单规则或小 LLM
qe = qe_mini if complexity < 0.5 else qe_pro
resp = await qe.aquery(q)
await record_usage(user_id, resp.metadata.get("token_usage", {}))
return resp
七、限流 + 并发控制
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=lambda r: r.state.user_id)
@app.post("/v1/rag/query")
@limiter.limit("20/minute")
async def query(req: QueryRequest): ...
# 进程内并发 semaphore(保护 LLM API 不被打爆)
sem = asyncio.Semaphore(50)
async def safe_query(q):
async with sem:
return await qe.aquery(q)
八、降级与 fallback
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
async def call_llm_with_retry(prompt):
return await primary_llm.acomplete(prompt)
async def robust_query(q):
try:
return await asyncio.wait_for(qe_pro.aquery(q), timeout=10)
except (asyncio.TimeoutError, ProviderError):
# 主 LLM 超时 → 切小模型
try:
return await qe_mini.aquery(q)
except Exception:
# 全挂 → 返回"系统繁忙,请稍后"+检索片段
nodes = retriever.retrieve(q)
return Response(response="系统繁忙,以下是相关资料:\n" + "\n---\n".join(n.text for n in nodes[:3]))
九、索引增量更新
生产数据每天都在变——文档新增、修改、删除。靠 IngestionPipeline + DocstoreStrategy.UPSERTS_AND_DELETE:
from llama_index.core.ingestion import IngestionPipeline, DocstoreStrategy
pipeline = IngestionPipeline(
transformations=[splitter, embed_model],
docstore=redis_docstore,
vector_store=qdrant_vs,
docstore_strategy=DocstoreStrategy.UPSERTS_AND_DELETE, # 删掉不再存在的
)
# 定时任务:每小时拉最新文档,增量跑
async def sync_job():
docs = await fetch_updated_docs(since=last_sync_time)
await pipeline.arun(documents=docs, num_workers=4)
关键点:Document 的 doc_id 要稳定(用源系统 ID,不是随机 UUID),不然 UPSERTS 识别不出来。
十、Docker 化
FROM python:3.12-slim
WORKDIR /app
# 系统依赖:如果用 BGE 本地 rerank 需要 C++ toolchain
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential git curl && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 预下载 embedding/rerank 模型到镜像(避免冷启动拉几 GB)
RUN python -c "from sentence_transformers import CrossEncoder; CrossEncoder('BAAI/bge-reranker-v2-m3')"
COPY . .
# gunicorn + uvicorn 混合,多 worker
CMD ["gunicorn", "app:app", \
"-k", "uvicorn.workers.UvicornWorker", \
"-w", "4", \
"-b", "0.0.0.0:8000", \
"--timeout", "60"]
docker-compose 本地一键
services:
app:
build: .
ports: ["8000:8000"]
env_file: .env
depends_on: [redis, qdrant]
redis:
image: redis:7-alpine
volumes: ["redis_data:/data"]
qdrant:
image: qdrant/qdrant:latest
ports: ["6333:6333"]
volumes: ["qdrant_data:/qdrant/storage"]
volumes:
redis_data:
qdrant_data:
十一、Kubernetes 部署要点
- 资源声明:app pod 建议
requests: {cpu: 1, memory: 2Gi}、limits 拉高一倍——embedding/rerank 模型吃内存 - HPA:基于 p95 延迟 + QPS 扩,不是单纯 CPU——RAG 的瓶颈常常是外部 LLM 而非本地 CPU
- PDB(PodDisruptionBudget):发布时保留至少 1 个 pod 可用
- 配置:model 名/top_k/温度走 ConfigMap,API key 走 Secret(配合 ExternalSecrets Operator)
- Persistent Volume:Qdrant/Redis 用 StatefulSet + PVC,不要用 emptyDir
- Sidecar:OTel collector 跟 app pod 同 pod,app 本地端口打 trace
十二、CI/CD 中的 eval regression
# .github/workflows/eval.yml
name: RAG regression
on: [pull_request]
jobs:
eval:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: {python-version: "3.12"}
- run: pip install -r requirements.txt
- name: Run eval on golden set
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: python scripts/run_eval.py --out eval_result.json
- name: Compare with baseline
run: python scripts/compare.py baseline.json eval_result.json --fail-if-worse-than 0.03
每个 PR 跑 50 条黄金集,任何指标比 baseline 下滑超过 3% 就阻断合并——RAG 回归防线就从这里来。
十三、监控大盘
至少 4 个 dashboard:
- Traffic:QPS、p50/p95/p99 延迟、错误率(5xx)
- Cost:每日 token 消耗、每 query 成本趋势、缓存命中率
- Quality:用户 👍/👎 率、refusal 率(LLM 说"我不知道"的比例)
- Infra:pod CPU/memory、Qdrant p99、Redis 内存、LLM provider 错误率
十四、上线前 20 条 checklist
1. 所有 LLM 调用走 async(
aquery/acomplete)
2. 全局 timeout(每个 aquery 外包一层
asyncio.wait_for)
3. 重试 + 指数退避(tenacity)
4. 主 LLM 挂掉有 fallback(小模型/返回检索片段)
5. Prompt/model 名/top_k 走 config,不硬编码
数据
6. 索引版本化(v1/v2 独立 collection)
7. 增量更新 job(每日/每小时) + 死文档清理
8. doc_id 稳定(源系统主键)
9. 敏感信息过滤(PII,上线前扫一遍)
性能
10. Embedding cache 开
11. 响应 cache 开(TTL 1 小时起)
12. Rerank top_n=5 而非 top_k 塞满
13. HNSW + scalar quantization 调过
14. prompt caching(Anthropic/OpenAI)开
可观测
15. Phoenix / Langfuse / OTel trace 接入
16. 结构化 log(query / latency / token / variant)
17. 用户 👍/👎 埋点
安全/合规
18. API 鉴权(API key / JWT)
19. 每用户限流 + 日预算
20. Prompt injection 防护(system prompt 锁定身份 + 输出审计)
十五、全书小结
① 它是 "LLM 时代的数据框架"——把文档变成 LLM 能用的 Node,把检索变成可精细控制的 pipeline。
② Ch1-Ch6 是"数据与索引"——怎么切块、怎么选 index、怎么挑 vector store。
③ Ch7-Ch8 是"检索精度"——retriever 三层结构 + query transform。RAG 的 90% 质量都在这两章。
④ Ch9-Ch10 是"编排"——Agent(LLM 自主决策) + Workflow(代码编排)。简单 RAG 不需要,复杂应用绕不开。
⑤ Ch11-Ch12 是"工程化"——评估、观测、部署,让 RAG 从 demo 到 service。
记住四句话:召回是天花板,Rerank 是性价比之王,Eval 不能靠感觉,部署要有 checklist。剩下的,就靠你把手弄脏,边做边调。