一、什么时候才需要 Graph?
老实讲,绝大多数场景用不到 Graph。一个 Agent + 几个工具 + 一个 output_type 就能解决。Graph 只在以下场景才有优势:
- 多步工作流:比如"提取关键词 → 查询数据库 → 总结报告",步骤间有明确的顺序、输入输出关系
- 条件分支 / 循环:"如果 LLM 判断信息不足,回到上一步重问";固定流程里夹杂判断
- 持久化 / 断点续跑:流程可能几十分钟,中途失败要从断点续
- 人在环路(HITL):流程跑到一半,等人工审核后再继续
- 多 Agent 编排:多个专门 Agent 交替接力,中间有显式数据交接
判别思路:如果你能把事情画成一张"有命名节点 + 带方向箭头"的图,并且图不止"线性一条龙",那就是 Graph 的场景。否则单 Agent 就够。
二、pydantic_graph 的三个核心概念
BaseNode
每个节点是一个继承
BaseNode 的类——它的 run 方法是这个节点的逻辑,返回值是下一个节点(或 End)。节点即类型,边即"返回了谁"。End
表示图的终止,携带最终结果。
End[OutputType] 的类型参数就是图的返回类型。Graph
整个图对象。构造时传入所有节点类,运行时
graph.run(start_node, state=..., deps=...)。三、Hello Graph:一个两节点的小图
from __future__ import annotations
from dataclasses import dataclass, field
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
@dataclass
class GraphState:
count: int = 0
@dataclass
class Increment(BaseNode[GraphState]):
step: int = 1
async def run(self, ctx: GraphRunContext[GraphState]) -> Increment | End[int]:
ctx.state.count += self.step
if ctx.state.count >= 5:
return End(ctx.state.count)
return Increment(step=self.step)
graph = Graph(nodes=[Increment])
result = graph.run_sync(Increment(step=1), state=GraphState())
print(result.output) # 5
print(result.state) # GraphState(count=5)
理解这段代码:
GraphState是整个图共享的可变状态——类似全局变量,但类型化、显式Increment是一个节点。它的run返回Increment(下一步还是我)或End[int](结束,返回 int)- 返回什么就走到什么——"边"就是 return 值的类型
Graph(nodes=[Increment])注册图的所有可能节点graph.run_sync(起始节点实例, state=...)启动
四、多节点 + 分支:带判断的图
把一个 Agent 流程画成图——模型先判断"用户想干什么",再路由到不同处理节点:
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Literal
from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
# ─── 状态与依赖 ───
@dataclass
class State:
user_input: str = ""
intent: str = ""
answer: str = ""
# ─── 分类器 Agent ───
class IntentResult(BaseModel):
intent: Literal["chat", "search", "book"]
classifier = Agent("openai:gpt-4o-mini", output_type=IntentResult,
system_prompt="判断用户意图:闲聊=chat、查资料=search、预订=book。")
# ─── 节点 ───
@dataclass
class Classify(BaseNode[State]):
async def run(self, ctx: GraphRunContext[State]) -> Chat | Search | Book:
r = await classifier.run(ctx.state.user_input)
ctx.state.intent = r.output.intent
if r.output.intent == "chat":
return Chat()
if r.output.intent == "search":
return Search()
return Book()
@dataclass
class Chat(BaseNode[State]):
async def run(self, ctx: GraphRunContext[State]) -> End[str]:
a = await chatter.run(ctx.state.user_input)
ctx.state.answer = a.output
return End(a.output)
@dataclass
class Search(BaseNode[State]):
async def run(self, ctx: GraphRunContext[State]) -> End[str]:
a = await searcher.run(ctx.state.user_input)
ctx.state.answer = a.output
return End(a.output)
@dataclass
class Book(BaseNode[State]):
async def run(self, ctx: GraphRunContext[State]) -> End[str]:
a = await booker.run(ctx.state.user_input)
ctx.state.answer = a.output
return End(a.output)
# ─── 组图 ───
graph = Graph(nodes=[Classify, Chat, Search, Book])
async def handle(question: str):
state = State(user_input=question)
result = await graph.run(Classify(), state=state)
print("意图:", state.intent, "答案:", result.output)
流程:Classify 节点调用分类 Agent,根据意图返回 Chat / Search / Book 三个节点之一;这三个节点再各自调用专属 Agent 得到答案,最后 End(answer)。
为什么这比直接嵌套 if/await 好?
- 节点是类:每个"步骤"是独立类,可以单独测试
- State 显式:跨节点的共享数据一目了然
- 类型即流程图:
-> Chat | Search | Book的 return 签名就是图的边,IDE 能检查 - 可视化:
graph.mermaid_code()直接生成 Mermaid 流程图 - 持久化:图的执行过程能被 checkpoint
五、可视化:一键画图
print(graph.mermaid_code(start_node=Classify))
stateDiagram-v2
[*] --> Classify
Classify --> Chat
Classify --> Search
Classify --> Book
Chat --> [*]
Search --> [*]
Book --> [*]
把这段贴到 mermaid.live 就能看到图。评审代码时把流程图塞进 PR 描述——架构讨论变成"图"而不是"猜"。
六、State 和 Deps 的分工
Graph 有两个运行时"容器",别混了:
| 容器 | 性质 | 谁读谁写 | 放什么 |
|---|---|---|---|
state | 图生命周期共享,可变 | 所有节点读写 | 流程中累积的数据(意图、中间结果、步数) |
deps | 图运行时依赖,不可变引用 | 所有节点只读 | 外部依赖(DB、HTTP client、配置) |
@dataclass
class State:
count: int = 0
@dataclass
class Deps:
db: object
http: object
@dataclass
class MyNode(BaseNode[State, Deps]): # 泛型两个参数:State, Deps
async def run(self, ctx: GraphRunContext[State, Deps]) -> End[int]:
rows = await ctx.deps.db.fetch("...")
ctx.state.count += len(rows)
return End(ctx.state.count)
graph = Graph(nodes=[MyNode])
result = await graph.run(MyNode(), state=State(), deps=Deps(db=real_db, http=real_http))
七、持久化:长任务的断点续跑
Graph 的一大杀手锏:BaseStatePersistence。它在每次节点运行前后做 snapshot,失败后可以从中断处恢复。
from pydantic_graph.persistence.file import FileStatePersistence
from pathlib import Path
persistence = FileStatePersistence(Path(f"/tmp/graph/{run_id}.json"))
async with graph.iter(Classify(), state=state, persistence=persistence) as run:
async for node in run:
print("跑完节点:", node) # 每个节点执行完都 snapshot 到磁盘
result = run.result
进程挂了?新进程用相同 run_id 恢复:
async with graph.iter_from_persistence(persistence) as run:
async for node in run:
print("续跑节点:", node)
自带的实现:
FileStatePersistence:写本地 JSON,开发调试用SimpleStatePersistence:内存字典,测试用- 自定义:继承
BaseStatePersistence,接 Redis / Postgres / S3 自由发挥
八、人在环路(Human-in-the-Loop)
流程跑到某节点需要人工批准才继续?思路是:节点返回一个特殊的 End,把 state 存住;人工批完后从下一个节点接着跑。
@dataclass
class AwaitApproval(BaseNode[State]):
draft: str
async def run(self, ctx):
ctx.state.draft = self.draft
return End({"status": "needs_approval", "draft": self.draft})
@dataclass
class Publish(BaseNode[State]):
async def run(self, ctx):
# 发布 ctx.state.draft
return End({"status": "published"})
graph = Graph(nodes=[GenerateDraft, AwaitApproval, Publish])
# 第一次运行到 AwaitApproval 结束
result1 = await graph.run(GenerateDraft(), state=state, persistence=persistence)
# 返回给前端:{"status": "needs_approval", "draft": "..."}
# 人工审完点批准,新请求继续:
result2 = await graph.run(Publish(), state=state, persistence=persistence)
九、和 LangGraph 的同异
LangGraph
- 节点是函数,边是字符串或 lambda
- 基于
StateGraph加 edge/conditional_edge 配置 - 生态更大,内建 checkpoint/HITL 能力多
- 和 LangChain 深度绑定
pydantic_graph
- 节点是类,边是"下个节点的类型"
- 没有单独 edge/cond_edge——全在 return 类型里
- 代码更薄,学习曲线更短,mypy 检查更严
- 独立子包,可以不跟 Pydantic AI 一起用
选型建议:
- 重度依赖 LangChain 生态(各种 retriever、memory、callback)→ LangGraph
- Python 类型驱动、轻量、和 Pydantic AI 无缝 → pydantic_graph
十、实战:客服分流 Agent
一个中等复杂度的真实场景——客服 bot 按流程处理:
- 识别用户诉求(售前/售后/投诉)
- 售前/售后走对应 Agent
- 投诉升级到人工队列,持久化等人工接手
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal
from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
@dataclass
class CsState:
user_msg: str = ""
category: str = ""
handled_by: str = ""
reply: str = ""
class Category(BaseModel):
kind: Literal["presale", "aftersale", "complaint"]
classifier = Agent("openai:gpt-4o-mini", output_type=Category,
system_prompt="判断消息类别:售前咨询=presale、售后问题=aftersale、投诉=complaint。")
presale = Agent("openai:gpt-4o-mini", system_prompt="你是售前助手,推销产品特性、解答购买疑问。")
after = Agent("openai:gpt-4o-mini", system_prompt="你是售后助手,处理退货换货、物流查询。")
@dataclass
class Route(BaseNode[CsState]):
async def run(self, ctx) -> HandlePresale | HandleAfter | EscalateHuman:
r = await classifier.run(ctx.state.user_msg)
ctx.state.category = r.output.kind
if r.output.kind == "presale": return HandlePresale()
if r.output.kind == "aftersale": return HandleAfter()
return EscalateHuman()
@dataclass
class HandlePresale(BaseNode[CsState]):
async def run(self, ctx) -> End[str]:
r = await presale.run(ctx.state.user_msg)
ctx.state.reply = r.output
ctx.state.handled_by = "presale-agent"
return End(r.output)
@dataclass
class HandleAfter(BaseNode[CsState]):
async def run(self, ctx) -> End[str]:
r = await after.run(ctx.state.user_msg)
ctx.state.reply = r.output
ctx.state.handled_by = "after-agent"
return End(r.output)
@dataclass
class EscalateHuman(BaseNode[CsState]):
async def run(self, ctx) -> End[str]:
ctx.state.handled_by = "human-queue"
ctx.state.reply = "已升级到人工,请稍候,工号 12345 将尽快回复。"
return End(ctx.state.reply)
graph = Graph(nodes=[Route, HandlePresale, HandleAfter, EscalateHuman])
之后每次用户发消息,跑一次 graph.run(Route(), state=CsState(user_msg=...))。图把"路由 + 处理"解耦,逻辑一清二白。
十一、八个常见坑
- 忘记
from __future__ import annotations:类互相引用(Chat | Search)会引发前向声明问题。Python 3.11 以下必须加。 - State 写太大:每次 checkpoint 都要序列化 State——里面放几 MB 的数据会慢哭。大数据存外部,State 里只放引用/ID。
- 用 mutable default 值:
field(default_factory=list),不要= []。 - 节点直接 return Agent 的结果对象:结果对象不是
BaseNode或End——框架会报错。要返回节点或 End 包装。 - 忘记把所有节点注册进 Graph(nodes=[...]):运行时抛"未知节点"。
- 循环无终止条件:节点总是 return 自己,无限循环。默认 Graph 有最大深度保护,建议自己加显式计数。
- deps 里放 Agent:循环引用,序列化/打印时可能栈溢出。Agent 应是模块级全局变量,不进 deps。
- 直接在节点
run里print长日志:checkpoint 的每一步被阻塞。用 logfire.span 做结构化 trace。
十二、本章小结
三条心法:
① Graph 适合多节点 + 条件分支 + 持久化 / HITL 的场景,简单 Agent 用不着。
② 节点是类,返回什么就走到什么——边就是 return 类型签名。
③ State(可变共享状态)+ Deps(不可变外部依赖),职责明确分开。
① Graph 适合多节点 + 条件分支 + 持久化 / HITL 的场景,简单 Agent 用不着。
② 节点是类,返回什么就走到什么——边就是 return 类型签名。
③ State(可变共享状态)+ Deps(不可变外部依赖),职责明确分开。