调试三层策略
| 层 | 工具 | 场景 |
|---|---|---|
| 开发期 | get_state_history + Mermaid | 单测、本地复现 |
| 集成测试 | LangSmith Playground | 改 prompt 回归 |
| 生产 | LangSmith Trace + OTel | 线上 bug 复盘、成本监控 |
LangSmith:最简单的 Trace 方案
LangChain 官方 SaaS,一次设置全链路自动上报:
# 环境变量
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=ls__xxx
export LANGCHAIN_PROJECT=my-agent
export LANGCHAIN_ENDPOINT=https://api.smith.langchain.com
之后 app.invoke() 每次都会自动上报,包含:
- 每个节点的输入/输出/耗时
- 每次 LLM 调用的 prompt、output、tokens、成本
- 每次工具调用的参数、返回、异常
- State 在每一步的变化
给 run 打标签
result = app.invoke( inp, config={ "configurable": {"thread_id": tid}, "tags": ["prod", "v1.3", f"user:{uid}"], "metadata": {"feature": "refund", "plan": "pro"}, "run_name": "退款 Agent", }, )
在 LangSmith UI 里就能按 tag/metadata 筛选、聚合成本、做 A/B 对比。
没 LangSmith?用 callbacks
from langchain_core.callbacks import BaseCallbackHandler class PrintHandler(BaseCallbackHandler): def on_chain_start(self, serialized, inputs, **kw): name = serialized.get("name", "?") print(f"▶ {name}", inputs) def on_tool_end(self, output, **kw): print(f"✓ tool: {output}") def on_llm_end(self, response, **kw): gen = response.generations[0][0] print(f"🧠 {gen.text[:100]}...") app.invoke(inp, config={"callbacks": [PrintHandler()]})
接入 OpenTelemetry
公司有自建可观测体系(Jaeger / Tempo / Datadog)就用 OTel:
from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter trace.set_tracer_provider(TracerProvider()) trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(OTLPSpanExporter(endpoint="http://tempo:4317")) ) tracer = trace.get_tracer("langgraph") def my_node(state): with tracer.start_as_current_span("my_node") as span: span.set_attribute("input.len", len(state["messages"])) result = do(state) span.set_attribute("output.tokens", result.usage.total_tokens) return result
错误处理:节点级 retry
from langgraph.pregel import RetryPolicy retry = RetryPolicy( max_attempts=3, initial_interval=1.0, backoff_factor=2.0, retry_on=[ConnectionError, TimeoutError], ) g.add_node("llm", llm_node, retry=retry) g.add_node("api", api_node, retry=retry)
哪些错该 retry
✅ 瞬态:网络超时、rate limit、502/504
❌ 不该:参数错误、权限错误、业务规则失败——retry 也没用,要让图跳到 fallback 节点
✅ 瞬态:网络超时、rate limit、502/504
❌ 不该:参数错误、权限错误、业务规则失败——retry 也没用,要让图跳到 fallback 节点
区分"重试"和"降级"
def search(s): try: hits = vector_db.query(s["q"]) return {"hits": hits, "source": "vector"} except VectorDbDown: hits = bm25.query(s["q"]) # 降级到 BM25 return {"hits": hits, "source": "bm25"}
本地调试:get_state_history 是神器
线上跑错了,只要有 thread_id 和 checkpointer,本地就能重现:
# 从生产 dump 出这个 thread 的所有 checkpoint(或直连生产 DB 只读) for snap in app.get_state_history(config): print("---", snap.metadata["step"], snap.next) print(snap.values["messages"][-1].content[:200]) # 找到出错那一步,改一下 state 再继续 app.update_state(bad_snap.config, {"retry_reason": "stale_cache"}) app.invoke(None, bad_snap.config) # 从这一步重跑
生产要监控的 7 个指标
- TTFT:第一个 token 响应时间,P50/P95/P99
- 总耗时:整条 invoke 时间
- 节点耗时热图:哪一步最慢,一眼看出
- tokens 消耗:per request + 日级聚合,算成本
- 工具成功率:每个工具的 ok / error 比
- 循环深度:
n_steps分布,有没有 Agent 疯狂循环 - HITL 等待时长:interrupt → resume 的间隔,优化审批 UX
打印节点级 trace 的小函数
def trace_run(app, inp, config): for chunk in app.stream(inp, config, stream_mode="updates"): for node_name, update in chunk.items(): print(f"\n━━ {node_name} ━━") if "messages" in update: for m in update["messages"]: print(f" [{m.type}] {m.content[:200]}") if getattr(m, "tool_calls", None): for tc in m.tool_calls: print(f" 🔧 {tc['name']}({tc['args']})") else: print(" ", update)
单元测试模板
import pytest from langgraph.checkpoint.memory import MemorySaver @pytest.fixture def agent(): return g.compile(checkpointer=MemorySaver()) def test_refund_happy_path(agent): cfg = {"configurable": {"thread_id": "t1"}} out = agent.invoke({"messages": [("user", "退 A1,质量问题")]}, cfg) assert "refund_result" in out assert out["refund_result"].startswith("退款单已创建") def test_refund_needs_approval(agent): cfg = {"configurable": {"thread_id": "t2"}} out = agent.invoke({"order_id": "A2", "amount": 5000}, cfg) assert out.get("__interrupt__") # 大额被 HITL 拦住 # 模拟人工 approve from langgraph.types import Command out2 = agent.invoke(Command(resume="approve"), cfg) assert out2["refund_result"] == "ok"
常见调试场景
| 症状 | 先查什么 | 常见原因 |
|---|---|---|
| Agent 回答不着调 | 最后一次 LLM 的 prompt | history 被其他 Agent 污染 |
| 工具从没调过 | LLM 输出的 tool_calls | docstring 不清晰 / bind_tools 忘了 |
| GraphRecursionError | get_state_history | 循环没终止条件 |
| resume 报 "no state" | thread_id 是否一致 | 前端生成了新 ID |
| 某步偶发超时 | 节点 trace 耗时 | 外部 API 抖动,加 retry |
| 成本异常高 | tokens per run | history 无限累积,缺摘要 |
本章小结
- 生产必须有 trace:LangSmith 最省事,OTel 最灵活,callbacks 最轻量
- 节点级 retry 防瞬态,业务错误走 fallback 节点
- 线上复盘靠 checkpointer:thread_id + get_state_history 能本地复现
- 监控 7 个指标:TTFT、总耗时、节点热图、tokens、工具成功率、循环深度、HITL 等待
- 每个 Agent 都该有 unit test,尤其覆盖 HITL 分支