一、为什么要流式?
LLM 生成一段 500 字的回答,端到端可能要 5 秒。如果你等它全部生成完再一次性返回——前端白屏 5 秒,用户以为崩了。
流式的价值是首字节时间(TTFB)从几秒降到亚秒:
用户看到"正在打字"的感觉立刻出现,即使总时长没变,体感延迟大幅下降。这也是 ChatGPT 之所以火的交互设计精髓。
二、run_stream 最小示例
import asyncio
from pydantic_ai import Agent
agent = Agent("openai:gpt-4o-mini")
async def main():
async with agent.run_stream("给我写一段 200 字关于唐诗的介绍") as response:
async for chunk in response.stream_text(delta=True):
print(chunk, end="", flush=True)
print()
print("用量:", response.usage())
asyncio.run(main())
几个关键点:
agent.run_stream(...)返回一个 async 上下文管理器,离开上下文时底层连接被关闭response.stream_text(delta=True)产出增量字符串——每次 yield 的是新增部分delta=False则是累积字符串——每次 yield 的是到目前为止全部内容- 流结束后
response.usage()才有完整统计
三、delta=True vs delta=False
| 模式 | 每次 yield 的内容 | 典型用法 |
|---|---|---|
delta=True | 自上次以来的新增字符串 | 前端 SSE / 控制台打字机效果 |
delta=False(默认) | 到目前为止的全部字符串 | 需要每次拿完整文本重渲染的场景 |
delta=True——让前端只 append,不要重建 DOM。而如果要每 chunk 都做一次 markdown 重渲染,用 delta=False 更省心。
四、结构化流:边出边校验
能流式的不只是文本。Pydantic AI 也能流式吐结构化输出——每一个 chunk 都试图被解析成 output_type 的"部分值":
from pydantic import BaseModel
class Recipe(BaseModel):
name: str
ingredients: list[str]
steps: list[str]
agent = Agent("openai:gpt-4o", output_type=Recipe)
async with agent.run_stream("给我一份麻婆豆腐的做法") as response:
async for partial in response.stream_structured(debounce_by=0.05):
print("名字:", partial.output.name)
print("已出 ingredients:", partial.output.ingredients)
print("---")
你会看到 name 先出现,然后 ingredients 一个一个被追加,最后 steps 填满。每个中间 partial 都是一个"到目前为止尽量校验过"的 Recipe——必填字段如果还没出,可能是空字符串或空列表。
debounce_by=0.05 是防抖:不管底层流多快,至少间隔 50ms 才 yield 一次,避免前端消息过多。
结构化流的实战价值
想象一个"AI 简历解析器"——用户上传一段自我介绍,你用结构化 Agent 拆成 Resume 模型。传统做法前端要等几秒才出完整表单;结构化流的做法是:姓名先填上,工作经历一条一条加上来——和 ChatGPT 用户体验完全对齐。
五、提前结束 / 取消流
用户点了"停止生成"按钮?直接 break:
async with agent.run_stream("...") as response:
async for chunk in response.stream_text(delta=True):
print(chunk, end="")
if user_clicked_stop:
break
# 离开 async with 后,底层 HTTP 连接自动关闭
已经消费的 token 照样记账——LLM 那边还是跑完了的,但你不再接收。这是 provider 的协议规定,所有流式框架一致。
六、流式 + FastAPI:标准 SSE 响应
把 Pydantic AI 的流接到 FastAPI 的 StreamingResponse:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat")
async def chat(question: str):
async def sse_gen():
async with agent.run_stream(question) as response:
async for chunk in response.stream_text(delta=True):
yield f"data: {chunk}\n\n"
yield "event: done\ndata: \n\n"
return StreamingResponse(sse_gen(), media_type="text/event-stream")
前端用 EventSource 消费:
const es = new EventSource('/chat?question=...');
es.onmessage = (e) => { output.innerText += e.data; };
es.addEventListener('done', () => es.close());
七、消息历史:多轮对话的核心
LLM 本身是无状态的——每次调用都是独立的。要做多轮对话,必须手动把"前几轮"传回去。Pydantic AI 的做法:
agent = Agent("openai:gpt-4o-mini", system_prompt="你是私人助手,记住用户的信息。")
# 第一轮
r1 = await agent.run("我叫小王,今年 28。")
print(r1.output)
# "你好小王,我记住啦。"
# 第二轮——把第一轮的消息传进去
r2 = await agent.run(
"我多大了?",
message_history=r1.all_messages(),
)
print(r2.output)
# "你 28 岁。"
# 第三轮
r3 = await agent.run(
"那明年呢?",
message_history=r2.all_messages(),
)
print(r3.output)
# "明年你就 29 岁。"
核心操作就一步:run(..., message_history=上一次的 all_messages())。
all_messages() vs new_messages()
八、持久化消息历史:Redis/DB 示例
Pydantic AI 提供了消息的 JSON 序列化帮手,可以直接落库:
from pydantic_ai.messages import ModelMessagesTypeAdapter
import json
# 序列化
msgs = r1.all_messages()
raw = ModelMessagesTypeAdapter.dump_json(msgs).decode()
await redis.set(f"conv:{session_id}", raw)
# 反序列化
raw = await redis.get(f"conv:{session_id}")
history = ModelMessagesTypeAdapter.validate_json(raw)
r2 = await agent.run("...", message_history=history)
这是聊天机器人最核心的状态管理模式——每个会话一个 Redis key,增量 append,超过 N 轮就摘要后压缩。
九、跨 Agent 共享历史
Agent A 和 Agent B 是两个不同 Agent(不同 prompt、不同工具),但都属于同一个对话,历史要延续——message_history 就是答案:
analyst = Agent("openai:gpt-4o", system_prompt="你是数据分析师")
writer = Agent("openai:gpt-4o", system_prompt="你是文案作家")
r1 = await analyst.run("分析一下这组数据...")
r2 = await writer.run("基于分析结果写段公众号文案", message_history=r1.new_messages())
new_messages() 而不是 all_messages()。
十、截断与窗口:历史长到爆炸怎么办
多轮对话 30 轮以后,消息列表可能几千个 token——又贵又慢。两种常见策略:
策略 A:固定窗口
只保留最近 N 条消息:
def trim(msgs, keep=20):
if len(msgs) <= keep:
return msgs
# 保留第一条(通常是 system)和最后 keep-1 条
return [msgs[0]] + msgs[-(keep - 1):]
trimmed = trim(history)
r = await agent.run("...", message_history=trimmed)
策略 B:摘要压缩
到达阈值时,让一个"摘要 Agent"把前若干轮压成一段 system prompt,历史从头清空:
summarizer = Agent(
"openai:gpt-4o-mini",
system_prompt="你是会议记录员,把对话浓缩成 100 字关键事实列表。",
output_type=str,
)
async def compact(history, threshold=30):
if len(history) < threshold:
return history
# 取前 80% 让摘要 Agent 压缩
old = history[:int(len(history) * 0.8)]
recent = history[int(len(history) * 0.8):]
s = await summarizer.run(
"对话历史:\n" + serialize(old),
)
# 构造一个虚拟 system 消息 + 保留近期
return [make_system_msg(f"之前的对话摘要: {s.output}")] + recent
十一、流式 + 历史组合:真正的聊天体验
这是最终形态——用 run_stream 吐流,结束后 all_messages() 存库,下一轮再拿出来:
async def chat_turn(session_id: str, user_msg: str):
history_raw = await redis.get(f"conv:{session_id}")
history = ModelMessagesTypeAdapter.validate_json(history_raw) if history_raw else []
async with agent.run_stream(user_msg, message_history=history) as response:
async for chunk in response.stream_text(delta=True):
yield chunk # 边传给前端边推
# 流结束后持久化新一轮 messages
new_history = response.all_messages()
await redis.set(
f"conv:{session_id}",
ModelMessagesTypeAdapter.dump_json(new_history),
ex=3600,
)
十二、八个常见坑
- run_stream 忘
async with:底层连接泄漏,高并发几十次就把 socket 耗尽。 - 流迭代中途抛异常:确保有
try/finally(或信任async with的上下文管理来关闭)。 - delta=True 但前端期望累积:两边语义对不上,显示错位。明确约定一方。
- 结构化流没做 debounce:partial 可能一秒几百次,前端卡死。
debounce_by=0.05起步。 - message_history 传了但 system prompt 也重算了:多段 system 互相干扰。同一 Agent 跨轮传 history 一般不会出问题(框架识别),但跨 Agent 时用
new_messages()更干净。 - 历史不截断:token 随轮数线性增长,成本飙升。一定要有 trim 或摘要策略。
- 把 history 塞进 system_prompt 字符串:自制拼接,丢失 tool call/tool return 的结构。用
message_history参数,框架会正确还原。 - 流式场景下校验失败重试:一次流被中断、重新开始——不是你记的字符数对不上。输出结构必须严的场景,建议非流式 + 前端伪装打字机。
十三、本章小结
①
run_stream 用 async with,stream_text(delta=True) 吐增量,stream_structured 吐部分结构化结果。
② 多轮对话 = 把上一次的
all_messages() 作为下一次 message_history。
③ 历史用
ModelMessagesTypeAdapter 序列化成 JSON 存 Redis/DB,下一次反序列化即可。
④ 长对话必须 trim 或摘要,token 线性增长是灾难。