Chapter 10

生产部署

LangServe 快速部署、流式 SSE 输出、错误处理、限流与成本监控

LangServe:一键部署为 API

LangServe 是 LangChain 官方的部署工具,能将任意 Runnable(链、Agent)快速暴露为 REST API, 自动生成 /invoke/stream/batch 端点,以及内置的 Playground UI。

# pip install langserve[all] fastapi uvicorn

from fastapi import FastAPI
from langserve import add_routes
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 定义链
llm = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一位专业的技术翻译,将输入翻译为{language}"),
    ("human", "{text}")
])
chain = prompt | llm | StrOutputParser()

# 创建 FastAPI 应用
app = FastAPI(title="翻译 API", version="1.0")

# 添加 LangServe 路由(一行代码!)
add_routes(
    app,
    chain,
    path="/translate",
    # 以下字段对 Playground UI 可见
    input_type=dict,
    config_keys=["configurable"],
)

# 启动:uvicorn app:app --reload --port 8000
#
# 自动生成的端点:
# POST /translate/invoke  → 单次调用
# POST /translate/stream  → 流式输出(SSE)
# POST /translate/batch   → 批量调用
# GET  /translate/playground  → 可视化 UI

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

流式输出(Streaming)

LLM 生成速度慢,流式输出可以大幅改善用户体验——让用户看到"正在输出"而非等待几秒后看到完整回复。

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
    async def generate():
        # astream() 返回异步生成器,逐 token 产出
        async for chunk in chain.astream({
            "question": request.question
        }):
            # SSE 格式:data: {json}


            yield f"data: {json.dumps({'content': chunk, 'done': False}, ensure_ascii=False)}\n\n"
        # 发送结束信号
        yield f"data: {json.dumps({'content': '', 'done': True})}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

生产级错误处理与限流

from fastapi import HTTPException, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
import anthropic

# 限流:每个 IP 每分钟最多 20 次请求
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter

@app.post("/chat")
@limiter.limit("20/minute")
async def chat(request: Request, body: ChatRequest):
    try:
        result = await chain.ainvoke({"question": body.question})
        return {"answer": result}
    
    except anthropic.RateLimitError:
        # API 限流:让客户端稍后重试
        raise HTTPException(status_code=429, detail="AI 服务繁忙,请稍后重试")
    
    except anthropic.APIConnectionError:
        # 网络问题
        raise HTTPException(status_code=503, detail="AI 服务暂时不可用")
    
    except Exception as e:
        # 记录未预期的错误(不要把内部错误暴露给用户)
        logger.error(f"Chat error: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="内部错误,请联系支持")

成本控制最佳实践

降低 Token 消耗

  • 使用 gpt-4o-mini 而非 gpt-4o(成本 15-30 倍差距)
  • 对 RAG 检索结果做摘要,不把原文全塞入 Prompt
  • 对话历史截断:只保留最近 N 轮,不保留全部历史
  • 缓存相同问题的回复(Redis 缓存)
  • 在 LangSmith 监控每次调用的实际 Token 用量

监控告警

  • 设置每日/每月费用预算,超限发告警邮件
  • 监控 LangSmith 中的错误率和 p99 延迟
  • 对 Token 用量超高的用户设限
  • 使用 LangSmith 的异常检测找出回答质量下降的时间段

全课程总结