为什么流式这么重要
LLM 生成一个完整回答平均 3-10 秒。如果不流式,用户看到的是:
[用户发消息] ─── 8 秒白屏 ─── [完整答案刷出来]
流式后:
[用户发消息] ─ 300ms ─ [第一个字] ─── [逐字流出] ─── [完整]
↑ ↑
TTFT token/s
TTFT(Time To First Token)从 8 秒降到 300ms,感知速度提升一个数量级。LangGraph 比单纯 LLM 流式多一层——它还能流"节点进度"。
三种 stream_mode
# 通用签名 for chunk in app.stream( {"messages": [...]}, config=config, stream_mode="values", # or "updates" / "messages" / "debug" ): print(chunk)
① values:每步完成后,吐出完整 state
for chunk in app.stream(inp, config, stream_mode="values"): print(chunk["messages"][-1]) # 每跑一个节点就吐一次,chunk 是完整 state 快照 # 适合:前端只想看"最新消息",state 体积小
② updates:只吐增量
for chunk in app.stream(inp, config, stream_mode="updates"): print(chunk) # {"think": {"messages": [AIMessage(...)]}} # {"act": {"messages": [ToolMessage(...)]}} # chunk 是 {node_name: update_dict},省流量、知道是谁更新
③ messages:LLM token 级流式
for token, meta in app.stream(inp, config, stream_mode="messages"): print(token.content, end="", flush=True) # meta 包含 {"langgraph_node": "think", "langgraph_step": 1, ...}
组合模式
传 list 可同时拿多种:
传 list 可同时拿多种:
stream_mode=["updates", "messages"]。
chunk 是 (mode, payload) 元组,前端可按类型分发。
④ debug:全量事件流(调试用)
for evt in app.stream(inp, config, stream_mode="debug"): print(evt["type"], evt["payload"]) # type 有 task / task_result / checkpoint 等,生产别开,太吵
astream_events:最细粒度
想同时拿 LLM token + 工具调用 + 检索事件?用 astream_events:
async for event in app.astream_events(inp, config, version="v2"): kind = event["event"] name = event["name"] if kind == "on_chat_model_stream": print(event["data"]["chunk"].content, end="") elif kind == "on_tool_start": print(f"\n🔧 调用工具 {name} 参数: {event['data']['input']}") elif kind == "on_tool_end": print(f"\n✓ {name} 返回: {event['data']['output']}") elif kind == "on_chain_start" and event["metadata"].get("langgraph_node"): print(f"\n▶ 进入节点: {event['metadata']['langgraph_node']}")
常用事件类型
| event | 触发时机 | 典型用途 |
|---|---|---|
on_chat_model_stream | LLM 每个 token | 打字机效果 |
on_chat_model_end | LLM 一次调用结束 | 统计 tokens / latency |
on_tool_start | 工具开始跑 | 前端显示"正在查询..." |
on_tool_end | 工具跑完 | 把结果展示给用户 |
on_retriever_end | 检索完成 | 展示引用文档 |
on_chain_start/end | 子链/节点进出 | 打进度条 |
前端 SSE 对接
后端(FastAPI + SSE)
from fastapi import FastAPI from fastapi.responses import StreamingResponse import json app_web = FastAPI() @app_web.post("/chat") async def chat(req): async def gen(): cfg = {"configurable": {"thread_id": req.thread_id}} inp = {"messages": [("user", req.query)]} async for ev in app.astream_events(inp, cfg, version="v2"): kind = ev["event"] if kind == "on_chat_model_stream": yield f"data: {json.dumps({'type':'token','c':ev['data']['chunk'].content})}\n\n" elif kind == "on_tool_start": yield f"data: {json.dumps({'type':'tool_start','name':ev['name'],'input':ev['data']['input']})}\n\n" elif kind == "on_tool_end": yield f"data: {json.dumps({'type':'tool_end','name':ev['name'],'output':str(ev['data']['output'])})}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(gen(), media_type="text/event-stream")
前端(原生 fetch + ReadableStream)
const res = await fetch("/chat", { method: "POST", body: JSON.stringify({query, thread_id}), }); const reader = res.body.pipeThrough(new TextDecoderStream()).getReader(); let buf = ""; while (true) { const {done, value} = await reader.read(); if (done) break; buf += value; const parts = buf.split("\n\n"); buf = parts.pop(); // 末段可能不完整,留下 for (const p of parts) { if (!p.startsWith("data: ")) continue; const body = p.slice(6); if (body === "[DONE]") return; const evt = JSON.parse(body); if (evt.type === "token") appendText(evt.c); else if (evt.type === "tool_start") showToolBadge(evt.name); else if (evt.type === "tool_end") removeToolBadge(evt.name); } }
三种 UX 形态
① ChatGPT 式:只流文本
用 stream_mode="messages" 直接打字机效果,工具调用用 loading 句式("正在查询订单...")掩盖掉。
② Perplexity 式:过程透明
把 on_tool_start、on_retriever_end 也显示出来:
🔍 正在搜索 "LangGraph 教程" ├ 召回 5 篇文档 🧠 正在生成回答 │ LangGraph 是一个用于构建…… ✓ 完成(1.3s)
③ Devin 式:展开可点
工具调用可折叠,用户点开看输入/输出。和 astream_events 的 start/end 配对完美。
流式 + 中断组合
如果图里有 interrupt(),流到中断点会自然停下。前端要识别这种"流意外结束":
async for ev in app.astream_events(inp, cfg, version="v2"): ... # 流结束后检查 state snap = await app.aget_state(cfg) if snap.tasks and any(t.interrupts for t in snap.tasks): # 需要人工,给前端推 { type: "needs_human", payload: ... } yield f"data: {json.dumps({'type':'needs_human','payload':snap.tasks[0].interrupts[0].value})}\n\n"
只流某个节点
有时你只想流"对外说话"的 LLM,不想让"内部 rewrite"的 token 也泄漏给用户。用 tags:
llm_public = ChatOpenAI(...).with_config(tags=["public"]) llm_internal = ChatOpenAI(...).with_config(tags=["internal"]) # 前端只订阅 public tag async for ev in app.astream_events(inp, cfg, version="v2", include_tags=["public"]): ...
性能实践:不要让流式阻塞
- 用异步 LLM 客户端:
ChatOpenAI默认就是,别降级到同步 - 工具也尽量 async:
async def+@tool都支持 - 流里别做重活:落盘、写日志异步 fire-and-forget
- SSE 保持连接:Nginx 要
proxy_buffering off;,否则会攒一堆一起推 - 心跳:长工具跑 30s+ 时定期
yield ": heartbeat\n\n"防断连
常见坑
| 坑 | 症状 | 解法 |
|---|---|---|
| 拿不到 token 流 | 只吐完整消息 | 用 stream_mode="messages" 或 astream_events |
| 前端一下子刷全部 | 看似不流式 | Nginx proxy_buffering off + 响应 header X-Accel-Buffering: no |
| SSE 断连无提示 | 用户以为卡住 | 加 heartbeat,前端 onerror 自动重连 |
| astream_events v1 已弃 | 警告刷屏 | 显式传 version="v2" |
| 中文被切半 | 出现 � | SSE 要 UTF-8,别随便 slice bytes |
本章小结
- 三种 stream_mode:
values(全量快照)/updates(增量)/messages(token) - 最细粒度用
astream_events(version="v2"),按 event 类型分发 - 前端 SSE 三件套:后端 StreamingResponse + 前端 ReadableStream + Nginx 关缓冲
- 用 tag 过滤"对外流"和"内部流",避免思考过程泄漏
- 流里遇到 interrupt 要识别,转成"需要人工"的推送