Chapter 06

Human-in-the-loop:给 Agent 装一个停止键

真正的生产 Agent 不是"全自动",而是"半自动带审批"。退款前人工复核、发邮件前人工过目、合同落章前经理 sign-off——这一章让你的图"等人"。

为什么需要 HITL

LLM 再聪明也会幻觉。三类场景必须有人类在环:

LangGraph 的 interrupt 让图在指定位置暂停、等外部输入、再 resume。

方式 A:interrupt_before 静态中断

最简单——在 compile 时声明"某个节点跑之前先停":

from langgraph.checkpoint.memory import MemorySaver

app = g.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["send_email", "refund"],
)

config = {"configurable": {"thread_id": "t1"}}

# 第一次跑,到 send_email 前自动停
for chunk in app.stream({"messages": [...]}, config=config):
    print(chunk)

# 查状态 — next 字段告诉我们卡在哪
snap = app.get_state(config)
print(snap.next)   # ("send_email",)

# 人工审批后,resume (输入 None = 不加新输入)
for chunk in app.stream(None, config=config):
    print(chunk)

也可以 interrupt_after——节点跑完再停(用于"展示结果等确认")。

方式 B:interrupt 动态中断(推荐)

LangGraph 0.2+ 提供的 interrupt() 原语更灵活:在节点代码里"抛出"中断,把需要人看的内容传出去,等 Command(resume=...) 恢复:

from langgraph.types import interrupt, Command

def approve_refund(state):
    order_id = state["order_id"]
    amount = state["amount"]

    # 抛出中断,把决策材料传给外部
    decision = interrupt({
        "action": "refund",
        "order_id": order_id,
        "amount": amount,
        "reason": state["reason"],
    })
    # 人工返回后,decision 就是 Command(resume=...) 里的值

    if decision == "approve":
        result = api_refund(order_id, amount)
        return {"refund_result": result}
    else:
        return {"refund_result": "rejected"}

触发 + 恢复

app = g.compile(checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "t1"}}

# 第一次 — 到 interrupt 自动停
result = app.invoke({"order_id": "A1", "amount": 200, "reason": "质量问题"}, config=config)
print(result["__interrupt__"])
# [{"value": {"action": "refund", "order_id": "A1", ...}, ...}]

# 把中断内容展示给人,等人类决定
human_says = "approve"

# 恢复:Command(resume=...) 把值传回 interrupt()
result = app.invoke(Command(resume=human_says), config=config)
print(result["refund_result"])
interrupt 和 interrupt_before 选哪个
静态 interrupt_before 简单但固定——不管数据内容,进节点就停。
动态 interrupt() 能按 state 决定停不停(小金额自动过、大金额才审批),还能把"要展示给人的内容"作为 payload 返回。生产推荐 interrupt()

三种人工介入模式

① 审批(Approve / Reject)

def approval_node(state):
    decision = interrupt({"type": "approve", "detail": state["draft"]})
    if decision == "approve": return {"status": "approved"}
    return {"status": "rejected"}

② 编辑(Human edits draft)

def review_reply(state):
    draft = state["reply"]
    edited = interrupt({"type": "edit", "current": draft})
    # edited 是人工改后的文本
    return {"reply": edited}

③ 补充信息(Ask for missing info)

def ask_missing(state):
    if not state.get("phone"):
        phone = interrupt({"type": "ask", "field": "phone"})
        return {"phone": phone}
    return {}

HITL 的 UI 对接模式

┌──────────────┐                         ┌─────────────┐
│   Frontend   │                         │   Backend   │
└──────┬───────┘                         └──────┬──────┘
       │ POST /start (query)                    │
       │───────────────────────────────────────>│ app.invoke(..)
       │                                        │  → 触发 interrupt
       │ { __interrupt__: {action:"refund"}}    │
       │<───────────────────────────────────────│
       │                                        │
       │ 展示审批 UI, 等用户点 approve           │
       │                                        │
       │ POST /resume (thread_id, "approve")    │
       │───────────────────────────────────────>│ app.invoke(
       │                                        │   Command(resume="approve"),
       │                                        │   config={thread_id})
       │ { result: "refund created" }           │
       │<───────────────────────────────────────│

后端参考实现

from fastapi import FastAPI
api = FastAPI()

@api.post("/start")
async def start(req):
    cfg = {"configurable": {"thread_id": req.thread_id}}
    result = await app.ainvoke({"query": req.query}, config=cfg)
    if result.get("__interrupt__"):
        return {"needs_human": True, "payload": result["__interrupt__"][0].value}
    return {"needs_human": False, "result": result}

@api.post("/resume")
async def resume(req):
    cfg = {"configurable": {"thread_id": req.thread_id}}
    result = await app.ainvoke(Command(resume=req.decision), config=cfg)
    return {"result": result}

update_state:让人类改历史

除了 interrupt,也可以 不依赖 interrupt,让用户直接改某个历史 checkpoint:

# 场景:用户说"上一条 reply 不对,我改一下"
app.update_state(
    config,
    {"messages": [AIMessage(content="修正后的回答", id=old_msg_id)]},
    as_node="reply_node",
)
# 接下来 invoke 会从新 state 继续
app.invoke(None, config=config)

审批的 7 个实战 UI 场景

  1. 工单退款前审批(金额 > 1000 才卡)
  2. 邮件发送前预览(支持编辑)
  3. 代码 PR 修改前人工过目
  4. 数据库写操作前 dry-run 展示 SQL
  5. 浏览器自动化:关键页面截图让人确认
  6. AI 画的设计稿让设计师签字
  7. 医疗/法务 AI 建议让专家复核

常见坑

症状解法
没有 checkpointerinterrupt 抛异常HITL 必须配 checkpointer
resume 时传了新输入重复生成resume 用 Command(resume=...),不要传 {"messages":...}
interrupt 里访问了可变对象resume 后值变了传 dict/str 这类可序列化原语
前端等待期间 checkpoint 被清resume 报 "no state"保留窗口期 > 用户审批时长
把 interrupt 放在循环里每轮都弹循环外面设一次审批节点

本章小结