项目架构设计
研究助手采用四 Agent 协作架构:入口 Agent 负责接收研究请求并协调,三个专家 Agent 分别负责搜索、分析和写作:
用户: "深度研究:2025年 AI Agent 技术趋势"
│
▼
┌──────────────────────────────────────────────────────┐
│ 研究协调 Agent(Orchestrator) │
│ 职责:解析研究主题,分配搜索任务,最终整合报告 │
└────────────┬──────────────────┬──────────────────────┘
│ Handoff │ Handoff
┌────────▼───────┐ ┌───────▼────────┐
│ 搜索 Agent │ │ 分析 Agent │
│ 职责:Web搜索 │ │ 职责:内容摘要 │
│ 工具:Tavily │ │ 工具:向量检索 │
└────────────────┘ └────────────────┘
│ │
└─────────┬─────────┘
│ Handoff
┌────────▼────────┐
│ 写作 Agent │
│ 职责:生成报告 │
│ 结构化输出 │
└─────────────────┘
项目结构
research-assistant/
├── app.py # FastAPI 主应用
├── agents/
│ ├── __init__.py
│ ├── orchestrator.py # 研究协调 Agent
│ ├── search.py # 搜索 Agent
│ ├── analyst.py # 分析 Agent
│ └── writer.py # 写作 Agent
├── tools/
│ ├── web_search.py # Tavily 搜索工具
│ ├── pdf_parser.py # PDF 解析工具
│ └── vector_search.py # 向量检索工具
├── models.py # Pydantic 数据模型
└── requirements.txt
工具实现
# tools/web_search.py
from agents import function_tool, RunContextWrapper
from dataclasses import dataclass, field
import httpx
@dataclass
class ResearchContext:
topic: str
search_results: list[dict] = field(default_factory=list)
analysis_notes: list[str] = field(default_factory=list)
sources: list[str] = field(default_factory=list)
@function_tool
async def tavily_search(
ctx: RunContextWrapper[ResearchContext],
query: str,
max_results: int = 5
) -> str:
"""使用 Tavily API 搜索最新信息。适合需要实时数据的研究任务。"""
import os
api_key = os.getenv("TAVILY_API_KEY")
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.tavily.com/search",
json={
"api_key": api_key,
"query": query,
"max_results": max_results,
"search_depth": "advanced",
"include_answer": True
},
timeout=30
)
data = response.json()
# 存储到 Context,供后续 Agent 使用
for result in data.get("results", []):
ctx.context.search_results.append(result)
ctx.context.sources.append(result.get("url", ""))
# 格式化返回给 LLM
formatted = f"搜索查询:{query}\n"
formatted += f"综合答案:{data.get('answer', '无')}\n\n"
formatted += "详细结果:\n"
for i, r in enumerate(data.get("results", [])[:max_results], 1):
formatted += f"{i}. [{r.get('title')}]({r.get('url')})\n{r.get('content', '')[:300]}...\n\n"
return formatted[:4000] # 限制长度
@function_tool
async def parse_pdf(pdf_url: str) -> str:
"""下载并解析 PDF 文档,提取文字内容。适合学术论文和报告。"""
import pdfplumber, io
async with httpx.AsyncClient() as client:
response = await client.get(pdf_url, timeout=30)
response.raise_for_status()
with pdfplumber.open(io.BytesIO(response.content)) as pdf:
text = ""
for page in pdf.pages[:10]: # 最多读 10 页
text += page.extract_text() or ""
return text[:5000] # 截取前 5000 字符
多 Agent 定义
# agents/__init__.py
from agents import Agent, handoff
from pydantic import BaseModel
from tools.web_search import tavily_search, parse_pdf
# ── 输出模型 ───────────────────────────────────────────────
class ResearchReport(BaseModel):
title: str
executive_summary: str
key_findings: list[str]
detailed_analysis: str
sources: list[str]
word_count: int
# ── 搜索 Agent ────────────────────────────────────────────
search_agent = Agent(
name="搜索专家",
instructions="""你是信息收集专家。
接到研究主题后,拆解为 3-5 个关键搜索查询,
使用 tavily_search 工具逐一搜索,
遇到 PDF 链接时使用 parse_pdf 工具提取内容。
完成搜索后,整理出结构化的原始资料摘要。
""",
model="gpt-4o-mini",
tools=[tavily_search, parse_pdf]
)
# ── 分析 Agent ────────────────────────────────────────────
analyst_agent = Agent(
name="分析专家",
instructions="""你是数据分析专家。
基于搜索专家收集的资料,进行深度分析:
1. 识别核心趋势和关键数据点
2. 发现不同来源之间的共识与分歧
3. 评估信息的可靠性和时效性
4. 提炼出 5-8 个关键发现
输出结构化的分析笔记,供写作专家使用。
""",
model="gpt-4o" # 分析任务用高质量模型
)
# ── 写作 Agent ────────────────────────────────────────────
writer_agent = Agent(
name="写作专家",
instructions="""你是专业研究报告写作专家。
基于分析专家的笔记,生成高质量研究报告:
- 执行摘要:200 字以内,点出核心价值
- 关键发现:5-8 条,每条一句话
- 详细分析:分节展开,使用数据支撑
- 引用来源:列出所有参考链接
报告风格:专业、客观、数据驱动
""",
model="gpt-4o",
output_type=ResearchReport
)
# ── 研究协调 Agent(入口)──────────────────────────────────
research_orchestrator = Agent(
name="研究协调员",
instructions="""你是研究项目协调员。收到研究主题后:
1. 首先转交给搜索专家收集信息
2. 搜索完成后,转交分析专家进行深度分析
3. 分析完成后,转交写作专家生成最终报告
确保每个阶段都完整完成后再进行下一步。
""",
model="gpt-4o-mini",
handoffs=[
handoff(search_agent),
handoff(analyst_agent),
handoff(writer_agent),
]
)
FastAPI WebSocket 流式接口
# app.py
from fastapi import FastAPI, WebSocket
from agents import Runner
from agents.stream_events import RunItemStreamEvent, AgentUpdatedStreamEvent
from agents_module import research_orchestrator, ResearchReport, ResearchContext
import json
app = FastAPI(title="AI 研究助手")
@app.websocket("/research/stream")
async def research_websocket(websocket: WebSocket):
await websocket.accept()
try:
# 接收研究主题
data = await websocket.receive_json()
topic = data.get("topic", "")
if not topic:
await websocket.send_json({"error": "请提供研究主题"})
return
# 初始化研究上下文
ctx = ResearchContext(topic=topic)
# 发送开始消息
await websocket.send_json({
"type": "start",
"message": f"开始研究:{topic}"
})
# 流式运行研究工作流
stream = Runner.run_streamed(
research_orchestrator,
f"请对以下主题进行深度研究并生成报告:{topic}",
context=ctx
)
async for event in stream.stream_events():
# Agent 切换:通知前端当前执行阶段
if isinstance(event, AgentUpdatedStreamEvent):
await websocket.send_json({
"type": "agent_switch",
"agent": event.new_agent.name,
"message": f"进入阶段:{event.new_agent.name}"
})
# 工具调用:通知前端正在搜索什么
elif isinstance(event, RunItemStreamEvent):
item = event.item
if item.type == "tool_call_item":
tool_input = item.raw_item.arguments if hasattr(item.raw_item, "arguments") else ""
await websocket.send_json({
"type": "tool_call",
"tool": item.raw_item.name,
"message": f"调用工具:{item.raw_item.name}"
})
# 发送最终报告
final_output = stream.final_output
if isinstance(final_output, ResearchReport):
await websocket.send_json({
"type": "complete",
"report": final_output.model_dump()
})
except Exception as e:
await websocket.send_json({"type": "error", "message": str(e)})
finally:
await websocket.close()
与 Ollama 集成:本地模型替代
# 使用 Ollama 运行本地模型(隐私数据场景)
# 前提:本地运行 ollama pull qwen2.5:7b
from openai import AsyncOpenAI
from agents import Agent, set_default_openai_client
# Ollama 兼容 OpenAI API 格式
ollama_client = AsyncOpenAI(
base_url="http://localhost:11434/v1",
api_key="ollama" # Ollama 不需要真实 API Key
)
# 可以为不同 Agent 指定不同的客户端/模型
# 搜索 Agent 用 OpenAI(需要强推理),分析 Agent 用本地模型(保护数据)
local_analyst = Agent(
name="本地分析专家",
instructions="分析搜索结果,提炼关键信息。",
model="qwen2.5:7b", # Ollama 本地模型名称
model_settings=ModelSettings(
openai_client=ollama_client # 指定使用 Ollama 客户端
)
)
# 混合使用:公开数据用 OpenAI,私有数据用本地
# 这个架构在金融、医疗等合规场景非常实用
部署注意事项
Agent 超时设置
研究任务可能耗时较长(30s-3分钟),需要在 Nginx/Caddy 和 FastAPI 两层都设置足够的超时。Nginx 建议 proxy_read_timeout 300s;FastAPI WebSocket 连接不需要 timeout,但需要处理客户端断开(ConnectionClosedError)。
并发限制
每个研究任务可能调用 5-10 次 LLM,同时进行多个研究任务会快速消耗 OpenAI Rate Limit。建议使用 Redis 实现任务队列,限制同时运行的研究任务数(如最多 5 个)。
结果缓存
相同研究主题的结果可以缓存(Redis TTL 1-24小时)。缓存策略:对 topic 做 hash 作为 key,缓存整个 ResearchReport JSON。注意:时效性强的主题(当日新闻)不适合长时间缓存。
成本控制
一次完整研究约消耗 5,000-20,000 Token(gpt-4o),成本约 $0.05-0.25。建议为每个用户设置每日研究次数限制,并在 UI 中显示预估成本,让用户知情。
未来展望:SDK 路线图与 MCP 融合
OpenAI Agents SDK 路线图(2025 展望)
─────────────────────────────────────────────────────────
已实现(v0.x):
✓ 核心 Agent/Tool/Handoff/Runner/Guardrails
✓ 流式输出、内置追踪
✓ 结构化输出(Pydantic)
规划中:
→ 原生 MCP 服务器支持
Agent 可以直接作为 MCP 工具暴露,供其他 AI 系统调用
→ 持久化 Runs(断点续传)
长任务中断后可以从上次位置继续,不需要重新开始
→ Agent 评估框架
内置 eval 套件,自动测量 Agent 的任务完成率和质量
→ 多模态 Agent
直接处理图像、音频输入,Computer Use 集成
与 MCP 协议的融合:
MCP(Model Context Protocol)是 Agent 工具集成的标准协议
openai-agents + MCP = 任何 MCP 工具服务都可以直接作为 Agent 工具
无需为每个平台写适配代码
已支持:Cursor MCP、Claude Desktop MCP 生态
─────────────────────────────────────────────────────────
课程总结
恭喜完成 OpenAI Agents SDK 全部 10 章学习!核心要点回顾:Agent + Tool + Handoff 构成多 Agent 系统的三角基础;Runner 的三种模式(run/run_sync/run_streamed)适应不同场景;Context 是跨工具状态共享的安全容器;Guardrails 是生产系统的必备安全层;追踪系统让 Agent 行为可观测、可调试;异步并发和结构化输出是生产化的关键。祝你构建出优秀的 Agent 系统!