Chapter 05

持久化 Checkpointer:会话恢复与时光机

Agent 每跑一步自动落盘。断线能接、历史可回放、状态可分支。这一章是"玩具 Demo"和"生产 Agent"之间的分水岭。

为什么必须要 Checkpointer

没有 Checkpointer 的 Agent 是一次性的:invoke 一次、state 活在内存、结束即毁。真实产品里会遇到:

这些都是 Checkpointer 提供的能力。

三件套:Checkpointer + thread_id + config

from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()    # 内存版(进程重启就没)
app = g.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "user-42-session-1"}}

# 第 1 次调用
app.invoke({"messages": [("user", "我叫小明")]}, config=config)

# 第 2 次调用 — 同一个 thread_id,自动接着聊
app.invoke({"messages": [("user", "我叫什么?")]}, config=config)
# → "你叫小明" ✓
thread_id 就是"会话 ID" 同一个 thread_id 共享 state。不同用户、不同会话用不同 thread_id 隔离。可以是 UUID、也可以是业务 ID 如 "u42:s7"。

三种 Checkpointer 对比

实现特点适用
MemorySaver进程内 dict,零依赖单测、开发、短会话
SqliteSaver本地 SQLite 文件个人工具、CLI、小规模
PostgresSaver生产级,支持高并发线上服务、多副本

SQLite 版

from langgraph.checkpoint.sqlite import SqliteSaver

with SqliteSaver.from_conn_string("checkpoints.sqlite") as checkpointer:
    app = g.compile(checkpointer=checkpointer)
    app.invoke(...)

Postgres 版(异步推荐)

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

DB_URL = "postgresql://user:pwd@localhost:5432/langgraph"

async with AsyncPostgresSaver.from_conn_string(DB_URL) as checkpointer:
    await checkpointer.setup()    # 首次建表
    app = g.compile(checkpointer=checkpointer)
    result = await app.ainvoke({"messages": [...]}, config=config)

查状态:get_state / get_state_history

# 当前状态
snap = app.get_state(config)
print(snap.values)        # {"messages": [...]}
print(snap.next)           # 下一步要跑的节点(中断时有值)
print(snap.config)         # 带 checkpoint_id

# 历史快照(最新在前)
for snap in app.get_state_history(config):
    print(snap.metadata["step"], snap.values["messages"][-1].content[:50])

回溯:从某个 checkpoint 重跑

每个 snapshot 有唯一的 checkpoint_id。用它做"时光倒流":

# 找到想回的那一步
history = list(app.get_state_history(config))
target = history[2]           # 比如倒数第 3 步

# 用它的 config 继续跑 — 走新的分支
new_config = target.config
app.invoke(None, config=new_config)   # None = 不加输入,从快照继续

手动改 state:update_state

可以在不跑任何节点的情况下修改 state。适合"用户说上一条回答错了,我帮你改":

app.update_state(
    config,
    {"messages": [AIMessage(content="改过的回答", id="m7")]},
    as_node="llm",           # 装作这个更新是 llm 节点产的
)
# 下一次 invoke 会从这个新 state 继续

多租户:thread_id 的命名策略

生产里 thread_id 是会话隔离的唯一手段。常见策略:

# 简单: 用户 ID + 会话自增
thread_id = f"{user_id}:{session_n}"

# 多项目: 加上应用标识,不同应用共享 DB
thread_id = f"{app_name}:{user_id}:{session_id}"

# 业务语义: 挂在具体对象上
thread_id = f"ticket:{ticket_id}"
thread_id 一定要唯一且稳定 同一个会话在 invoke 多次之间必须用同一个 ID,否则会"失忆"。 不同会话别共用 ID,否则互相污染。

checkpoint 的生命周期

Checkpointer 不会自己清理历史。生产中要定期清理过期 thread:

# 清掉一整个 thread 的所有 checkpoint
app.checkpointer.delete_thread(thread_id="user-42-session-1")

# 或者直接 DB 侧按时间清理(Postgres)
# DELETE FROM checkpoints WHERE created_at < NOW() - INTERVAL '30 days'

跨节点共享:store(长期记忆)

Checkpoint = 单会话的状态;store = 跨会话的长期记忆(用户档案、偏好):

from langgraph.store.memory import InMemoryStore

store = InMemoryStore()
app = g.compile(checkpointer=checkpointer, store=store)

def save_profile(state, config, *, store):
    user_id = config["configurable"]["user_id"]
    store.put(("profiles", user_id), "last_q", {"q": state["query"]})

def load_profile(state, config, *, store):
    user_id = config["configurable"]["user_id"]
    prof = store.get(("profiles", user_id), "last_q")
    return {"prior_q": prof.value["q"] if prof else None}

实战:对话历史太长怎么办

长会话里 messages 可能涨到成百上千条,既贵又慢。两种策略:

① 滑动窗口 + 摘要

def truncate(s):
    msgs = s["messages"]
    if len(msgs) > 20:
        # 摘要前半段,保留后 10 条
        summary = summarize(msgs[:-10])
        return {"messages": [RemoveMessage(id=m.id) for m in msgs[:-10]] +
                [SystemMessage(content=f"之前对话摘要: {summary}")]}
    return {}

② RemoveMessage:add_messages 的删除支持

from langchain_core.messages import RemoveMessage

def clean(s):
    return {"messages": [RemoveMessage(id=m.id) for m in s["messages"][:-5]]}

常见坑

症状解法
忘传 config"thread_id required"每次 invoke 都带 config
换了 thread_id 却期望记住Agent 失忆同会话保持 ID 一致
MemorySaver 上线重启就丢生产用 Postgres/SQLite
checkpoint 爆表DB 占用飞涨定期 delete_thread / 按时间 GC
async checkpointer 被同步调用协程未 awaitapp 用 ainvoke/astream

本章小结