三种运行模式
Runner.run()(异步模式)
推荐用于生产环境。在 async 函数中使用 await,可与其他异步操作并发执行。适合 FastAPI、aiohttp 等异步 Web 框架。
Runner.run_sync()(同步模式)
适合脚本、CLI 工具、测试代码等同步场景。内部自动创建事件循环,不需要 asyncio.run() 包装。注意:不能在已有事件循环中调用(如 Jupyter Notebook 需要特殊处理)。
Runner.run_streamed()(流式模式)
返回流式结果,允许在 Agent 完整执行前就开始处理部分输出。适合需要实时反馈的 UI 场景(聊天界面、进度展示)。
from agents import Agent, Runner
import asyncio
agent = Agent(
name="助手",
instructions="简洁地回答问题。",
model="gpt-4o-mini"
)
# ── 模式 1:异步(推荐)──────────────────────────────────
async def async_demo():
result = await Runner.run(agent, "什么是协程?")
print(result.final_output)
# ── 模式 2:同步(脚本)──────────────────────────────────
def sync_demo():
result = Runner.run_sync(agent, "什么是协程?")
print(result.final_output)
# ── 模式 3:流式(实时反馈)──────────────────────────────
async def stream_demo():
result = Runner.run_streamed(agent, "解释量子纠缠")
async for event in result.stream_events():
# 只打印文字 delta,忽略其他事件
if hasattr(event, "delta") and hasattr(event.delta, "text"):
print(event.delta.text, end="", flush=True)
print() # 换行
asyncio.run(stream_demo())
流式事件类型详解
流式模式下,stream_events() 会产生不同类型的事件,每种事件代表 Agent 执行过程中的一个阶段:
from agents import Runner
from agents.stream_events import (
RawResponsesStreamEvent, # 原始 LLM 流(文字 delta)
RunItemStreamEvent, # Run 中的离散事件(工具调用完成等)
AgentUpdatedStreamEvent, # Agent 切换事件(Handoff 后触发)
)
async def detailed_stream_demo(agent, user_input: str):
result = Runner.run_streamed(agent, user_input)
async for event in result.stream_events():
# 1️⃣ 原始 LLM 响应流(每个文字 token)
if isinstance(event, RawResponsesStreamEvent):
data = event.data
if hasattr(data, "delta") and hasattr(data.delta, "text"):
print(data.delta.text, end="", flush=True)
# 2️⃣ Run 项事件(工具调用开始/完成、消息创建等)
elif isinstance(event, RunItemStreamEvent):
item = event.item
if item.type == "tool_call_item":
print(f"\n[调用工具: {item.raw_item.name}]")
elif item.type == "tool_call_output_item":
print(f[工具返回: {str(item.output)[:50]}...]")
# 3️⃣ Agent 切换事件(发生 Handoff 时)
elif isinstance(event, AgentUpdatedStreamEvent):
print(f"\n[切换到 Agent: {event.new_agent.name}]")
# 流完成后,仍然可以访问完整结果
print(f"\n\n最终回复:{result.final_output}")
多轮对话:维护消息历史
默认情况下,每次 Runner.run() 都是独立的对话。要实现多轮对话,需要手动维护消息历史:
from agents import Agent, Runner
import asyncio
agent = Agent(
name="对话助手",
instructions="你是一个有记忆的对话助手,记住对话历史并保持上下文一致性。",
model="gpt-4o-mini"
)
async def multi_turn_chat():
# 初始化消息历史(空列表 = 新对话)
message_history = []
questions = [
"我叫 Alice,我是一名 Python 开发者。",
"你还记得我的名字吗?我从事什么职业?",
"给我推荐一个适合我的 Python 库。"
]
for question in questions:
print(f"用户:{question}")
# 将当前消息历史作为输入(list 格式)
# 第一轮:message_history = [],直接传字符串
# 后续轮:message_history 包含之前的对话
if not message_history:
input_data = question
else:
# to_input_list() 将上次结果转为标准消息列表
# 然后追加新的用户消息
input_data = message_history + [{
"role": "user",
"content": question
}]
result = await Runner.run(agent, input_data)
print(f"助手:{result.final_output}\n")
# 更新消息历史:包含本次的问答
message_history = result.to_input_list()
asyncio.run(multi_turn_chat())
max_turns:安全阀机制
from agents import Runner, RunConfig
from agents.exceptions import MaxTurnsExceeded
async def safe_run(agent, user_input: str):
try:
result = await Runner.run(
agent,
user_input,
run_config=RunConfig(
max_turns=10, # 最多 10 轮 LLM 调用,防止无限循环
)
)
return result.final_output
except MaxTurnsExceeded:
return "抱歉,任务处理超时(超过最大轮次限制),请简化您的请求。"
实战:FastAPI SSE 流式聊天接口
将流式 Agent 集成到 FastAPI,实现 Server-Sent Events(SSE)实时推送:
# app.py — FastAPI + Agents SDK 流式聊天接口
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
from agents.stream_events import RawResponsesStreamEvent
from pydantic import BaseModel
import json, asyncio
app = FastAPI()
# 创建 Agent(全局单例,复用连接池)
chat_agent = Agent(
name="聊天助手",
instructions="你是友善的 AI 助手,使用 Markdown 格式输出。",
model="gpt-4o-mini"
)
class ChatRequest(BaseModel):
message: str
history: list[dict] = [] # 消息历史
@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
async def generate():
# 构建输入(历史 + 新消息)
if req.history:
input_data = req.history + [{"role": "user", "content": req.message}]
else:
input_data = req.message
# 流式运行 Agent
stream_result = Runner.run_streamed(chat_agent, input_data)
async for event in stream_result.stream_events():
if isinstance(event, RawResponsesStreamEvent):
data = event.data
if hasattr(data, "delta") and hasattr(data.delta, "text"):
# SSE 格式:data: {json}\n\n
chunk = json.dumps({"delta": data.delta.text}, ensure_ascii=False)
yield f"data: {chunk}\n\n"
# 流结束信号
final = json.dumps({"done": True, "full": stream_result.final_output})
yield f"data: {final}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
# 前端 JavaScript 消费 SSE:
# const es = new EventSource('/chat/stream', {method: 'POST', body: JSON.stringify({...})});
# es.onmessage = (e) => { const data = JSON.parse(e.data); ... };
生产部署提示
在 Nginx/Caddy 前代理 SSE 时,需要禁用代理缓冲(proxy_buffering off)。Gunicorn 建议使用 uvicorn workers(uvicorn.workers.UvicornWorker)运行 FastAPI,充分利用 asyncio 的并发能力。