Chapter 10

端到端实战:智能客服工单 Agent

把前九章串起来——用一个真实业务场景:电商客服工单处理。覆盖意图分流、订单查询、小额自动退款、大额审批、流式输出、生产部署。

需求分析

产品需求:

整体架构

┌─── Frontend (React) ───┐ │ 聊天窗 + 审批弹窗 │ └──────────┬─────────────┘ │ SSE ┌──────────▼─────────────┐ │ FastAPI (uvicorn) │ │ /chat /resume /state│ └──────────┬─────────────┘ │ ┌──────────▼─────────────┐ ┌─────────────┐ │ LangGraph App │◀────────▶│ Postgres │ │ classify → dispatch │ ckpt │ checkpoints │ │ → [search|refund|…] │ └─────────────┘ │ HITL interrupt │ └──────────┬─────────────┘ │ ┌──────────▼─────────────┐ │ LangSmith Trace │ └────────────────────────┘

Step 1:定义 State

from typing import TypedDict, Annotated, Literal
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    user_id: str
    intent: Literal["order", "shipping", "refund", "human", "other"]
    order_id: str | None
    refund_amount: float | None
    decision: str | None

Step 2:工具层

from langchain_core.tools import tool

@tool
def get_order(order_id: str) -> str:
    """查订单基本信息,返回 markdown。"""
    o = db.orders.get(order_id)
    if not o: return f"未找到订单 {order_id}"
    return f"订单 {o.id}\n金额: ¥{o.amount}\n状态: {o.status}\n下单: {o.created_at}"

@tool
def get_shipping(order_id: str) -> str:
    """查物流进度。"""
    trace = logistics.query(order_id)
    return "\n".join(f"{t.time}  {t.city}  {t.msg}" for t in trace)

@tool
def create_refund(order_id: str, amount: float, reason: str) -> str:
    """创建退款单。"""
    try:
        rid = refund_api.create(order_id, amount, reason, idem_key=f"{order_id}:{amount}")
        return f"退款单 {rid} 已创建"
    except ApiError as e:
        return f"创建失败: {e}"

Step 3:节点

from langchain_openai import ChatOpenAI
from langgraph.types import interrupt, Command

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# —— 1. 意图分类 ——
def classify(s):
    rsp = llm.with_structured_output(IntentSchema).invoke([
        ("system", "判断用户意图: order|shipping|refund|human|other,并提取 order_id/refund_amount"),
        *s["messages"],
    ])
    return {
        "intent": rsp.intent,
        "order_id": rsp.order_id,
        "refund_amount": rsp.refund_amount,
    }

# —— 2. 路由 ——
def route(s) -> Literal["qa_agent", "refund_flow", "handoff"]:
    if s["intent"] == "refund": return "refund_flow"
    if s["intent"] == "human":  return "handoff"
    return "qa_agent"

# —— 3. QA agent(查订单/物流)——
from langgraph.prebuilt import create_react_agent
qa_agent = create_react_agent(
    model="openai:gpt-4o-mini",
    tools=[get_order, get_shipping],
    prompt="你是客服,根据用户问题调用工具后简洁回复。",
)

# —— 4. 退款流程 ——
def check_amount(s) -> Literal["auto_refund", "approve_refund"]:
    return "auto_refund" if s["refund_amount"] <= 500 else "approve_refund"

def approve_refund(s):
    decision = interrupt({
        "type": "approve_refund",
        "order_id": s["order_id"],
        "amount": s["refund_amount"],
        "user_id": s["user_id"],
    })
    return {"decision": decision}

def auto_refund(s):
    return {"decision": "approve"}   # 小额直接过

def do_refund(s):
    if s["decision"] != "approve":
        return {"messages": [AIMessage(content="退款申请已驳回。")]}
    result = create_refund.invoke({
        "order_id": s["order_id"],
        "amount": s["refund_amount"],
        "reason": "客服审批通过",
    })
    return {"messages": [AIMessage(content=result)]}

# —— 5. 转人工 ——
def handoff(s):
    ticket = ticket_sys.create(user_id=s["user_id"], history=s["messages"])
    return {"messages": [AIMessage(content=f"已为您转接人工,工单号 {ticket.id}")]}

Step 4:拼图

from langgraph.graph import StateGraph, START, END
from langgraph.pregel import RetryPolicy

g = StateGraph(AgentState)
g.add_node("classify", classify)
g.add_node("qa_agent", qa_agent)
g.add_node("refund_flow", lambda s: {})     # 空 hub 节点
g.add_node("auto_refund", auto_refund)
g.add_node("approve_refund", approve_refund)
g.add_node("do_refund", do_refund, retry=RetryPolicy(max_attempts=3))
g.add_node("handoff", handoff)

g.add_edge(START, "classify")
g.add_conditional_edges("classify", route)

g.add_conditional_edges("refund_flow", check_amount)
g.add_edge("auto_refund", "do_refund")
g.add_edge("approve_refund", "do_refund")
g.add_edge("do_refund", END)

g.add_edge("qa_agent", END)
g.add_edge("handoff", END)

Step 5:加 Checkpointer + 编译

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

DB = "postgresql://app:pwd@db/agent"

async def build_app():
    ckpt = await AsyncPostgresSaver.from_conn_string(DB).__aenter__()
    await ckpt.setup()
    return g.compile(checkpointer=ckpt)

Step 6:FastAPI 对外

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import json

api = FastAPI()
APP = None

@api.on_event("startup")
async def startup():
    global APP
    APP = await build_app()

class ChatReq(BaseModel):
    thread_id: str
    user_id: str
    query: str

@api.post("/chat")
async def chat(req: ChatReq):
    cfg = {"configurable": {"thread_id": req.thread_id},
           "tags": ["prod"], "metadata": {"user_id": req.user_id}}
    inp = {"messages": [("user", req.query)], "user_id": req.user_id}

    async def gen():
        async for ev in APP.astream_events(inp, cfg, version="v2"):
            t = ev["event"]
            if t == "on_chat_model_stream":
                yield sse("token", {"c": ev["data"]["chunk"].content})
            elif t == "on_tool_start":
                yield sse("tool_start", {"name": ev["name"]})
            elif t == "on_tool_end":
                yield sse("tool_end", {"name": ev["name"], "out": str(ev["data"]["output"])[:300]})

        snap = await APP.aget_state(cfg)
        if snap.tasks and any(t.interrupts for t in snap.tasks):
            payload = snap.tasks[0].interrupts[0].value
            yield sse("needs_human", payload)

        yield "data: [DONE]\n\n"

    return StreamingResponse(gen(), media_type="text/event-stream",
                             headers={"X-Accel-Buffering": "no"})

def sse(t, data):
    return f"data: {json.dumps({'type':t, **data})}\n\n"

@api.post("/resume")
async def resume(thread_id: str, decision: str):
    from langgraph.types import Command
    cfg = {"configurable": {"thread_id": thread_id}}
    result = await APP.ainvoke(Command(resume=decision), cfg)
    return {"result": result}

Step 7:部署

Dockerfile

FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV LANGCHAIN_TRACING_V2=true
CMD ["uvicorn", "main:api", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

docker-compose.yml

services:
  app:
    build: .
    ports: ["8000:8000"]
    environment:
      OPENAI_API_KEY: ${OPENAI_API_KEY}
      LANGCHAIN_API_KEY: ${LANGCHAIN_API_KEY}
      LANGCHAIN_PROJECT: customer-support-prod
      DB_URL: postgresql://app:pwd@db:5432/agent
    depends_on: [db]
  db:
    image: postgres:16
    environment:
      POSTGRES_USER: app
      POSTGRES_PASSWORD: pwd
      POSTGRES_DB: agent
    volumes: ["pgdata:/var/lib/postgresql/data"]
volumes:
  pgdata:

Nginx 反代(关键:关缓冲)

location /chat {
    proxy_pass http://app:8000;
    proxy_buffering off;
    proxy_cache off;
    proxy_read_timeout 10m;
    chunked_transfer_encoding on;
}

Step 8:验收测试

def test_small_refund_auto():
    inp = {"messages": [("user", "退 A1 订单,质量问题,200 块")],
           "user_id": "u1"}
    out = app.invoke(inp, cfg("t1"))
    assert "退款单" in out["messages"][-1].content
    assert not out.get("__interrupt__")

def test_large_refund_needs_approval():
    inp = {"messages": [("user", "退 A2,5000 块")], "user_id": "u2"}
    out = app.invoke(inp, cfg("t2"))
    assert out["__interrupt__"][0].value["type"] == "approve_refund"

    out2 = app.invoke(Command(resume="approve"), cfg("t2"))
    assert "退款单" in out2["messages"][-1].content

def test_qa_flow():
    out = app.invoke({"messages": [("user", "A1 物流到哪了?")], "user_id": "u3"},
                      cfg("t3"))
    assert "订单" in out["messages"][-1].content or "物流" in out["messages"][-1].content

上线清单

  1. ✅ Checkpointer 用 Postgres,不要 MemorySaver
  2. ✅ 关键节点(do_refund、外部 API)都加 retry
  3. ✅ 工具全部幂等(带 idempotency key)
  4. ✅ HITL 审批节点用 interrupt(),payload 含决策所需全部信息
  5. ✅ 前端 SSE 关 nginx 缓冲,加心跳
  6. ✅ LangSmith 打 tag/metadata,成本可追溯
  7. ✅ 单测覆盖 happy path + HITL 分支 + 转人工 + 退款驳回
  8. ✅ 定期清理 90 天前 checkpoint
  9. ✅ 成本告警:单会话 token > 10k 报警
  10. ✅ 降级预案:LLM 宕机直接转人工

扩展思路

全书总结

最后一句话
LangGraph 本身不难——难的是把 LLM 的不确定性塞进一张确定的图里。 记住:图是骨,LLM 是肉,工具是手,人是头。每次 Agent 出问题,先问这四个里哪个没管好。