Chapter 10

实战项目——AI 研究助手

综合运用本教程所有知识,构建一个多 Agent 协作的深度研究助手,支持 Web 搜索、PDF 解析、流式进度推送和本地模型集成。

项目架构设计

研究助手采用四 Agent 协作架构:入口 Agent 负责接收研究请求并协调,三个专家 Agent 分别负责搜索、分析和写作:

用户: "深度研究:2025年 AI Agent 技术趋势" │ ▼ ┌──────────────────────────────────────────────────────┐ │ 研究协调 Agent(Orchestrator) │ │ 职责:解析研究主题,分配搜索任务,最终整合报告 │ └────────────┬──────────────────┬──────────────────────┘ │ Handoff │ Handoff ┌────────▼───────┐ ┌───────▼────────┐ │ 搜索 Agent │ │ 分析 Agent │ │ 职责:Web搜索 │ │ 职责:内容摘要 │ │ 工具:Tavily │ │ 工具:向量检索 │ └────────────────┘ └────────────────┘ │ │ └─────────┬─────────┘ │ Handoff ┌────────▼────────┐ │ 写作 Agent │ │ 职责:生成报告 │ │ 结构化输出 │ └─────────────────┘

项目结构

research-assistant/
├── app.py              # FastAPI 主应用
├── agents/
│   ├── __init__.py
│   ├── orchestrator.py  # 研究协调 Agent
│   ├── search.py        # 搜索 Agent
│   ├── analyst.py       # 分析 Agent
│   └── writer.py        # 写作 Agent
├── tools/
│   ├── web_search.py    # Tavily 搜索工具
│   ├── pdf_parser.py    # PDF 解析工具
│   └── vector_search.py # 向量检索工具
├── models.py            # Pydantic 数据模型
└── requirements.txt

工具实现

# tools/web_search.py
from agents import function_tool, RunContextWrapper
from dataclasses import dataclass, field
import httpx

@dataclass
class ResearchContext:
    topic: str
    search_results: list[dict] = field(default_factory=list)
    analysis_notes: list[str] = field(default_factory=list)
    sources: list[str] = field(default_factory=list)

@function_tool
async def tavily_search(
    ctx: RunContextWrapper[ResearchContext],
    query: str,
    max_results: int = 5
) -> str:
    """使用 Tavily API 搜索最新信息。适合需要实时数据的研究任务。"""
    import os
    api_key = os.getenv("TAVILY_API_KEY")

    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://api.tavily.com/search",
            json={
                "api_key": api_key,
                "query": query,
                "max_results": max_results,
                "search_depth": "advanced",
                "include_answer": True
            },
            timeout=30
        )

    data = response.json()

    # 存储到 Context,供后续 Agent 使用
    for result in data.get("results", []):
        ctx.context.search_results.append(result)
        ctx.context.sources.append(result.get("url", ""))

    # 格式化返回给 LLM
    formatted = f"搜索查询:{query}\n"
    formatted += f"综合答案:{data.get('answer', '无')}\n\n"
    formatted += "详细结果:\n"
    for i, r in enumerate(data.get("results", [])[:max_results], 1):
        formatted += f"{i}. [{r.get('title')}]({r.get('url')})\n{r.get('content', '')[:300]}...\n\n"

    return formatted[:4000]  # 限制长度

@function_tool
async def parse_pdf(pdf_url: str) -> str:
    """下载并解析 PDF 文档,提取文字内容。适合学术论文和报告。"""
    import pdfplumber, io
    async with httpx.AsyncClient() as client:
        response = await client.get(pdf_url, timeout=30)
        response.raise_for_status()

    with pdfplumber.open(io.BytesIO(response.content)) as pdf:
        text = ""
        for page in pdf.pages[:10]:  # 最多读 10 页
            text += page.extract_text() or ""

    return text[:5000]  # 截取前 5000 字符

多 Agent 定义

# agents/__init__.py
from agents import Agent, handoff
from pydantic import BaseModel
from tools.web_search import tavily_search, parse_pdf

# ── 输出模型 ───────────────────────────────────────────────
class ResearchReport(BaseModel):
    title: str
    executive_summary: str
    key_findings: list[str]
    detailed_analysis: str
    sources: list[str]
    word_count: int

# ── 搜索 Agent ────────────────────────────────────────────
search_agent = Agent(
    name="搜索专家",
    instructions="""你是信息收集专家。
    接到研究主题后,拆解为 3-5 个关键搜索查询,
    使用 tavily_search 工具逐一搜索,
    遇到 PDF 链接时使用 parse_pdf 工具提取内容。
    完成搜索后,整理出结构化的原始资料摘要。
    """,
    model="gpt-4o-mini",
    tools=[tavily_search, parse_pdf]
)

# ── 分析 Agent ────────────────────────────────────────────
analyst_agent = Agent(
    name="分析专家",
    instructions="""你是数据分析专家。
    基于搜索专家收集的资料,进行深度分析:
    1. 识别核心趋势和关键数据点
    2. 发现不同来源之间的共识与分歧
    3. 评估信息的可靠性和时效性
    4. 提炼出 5-8 个关键发现
    输出结构化的分析笔记,供写作专家使用。
    """,
    model="gpt-4o"   # 分析任务用高质量模型
)

# ── 写作 Agent ────────────────────────────────────────────
writer_agent = Agent(
    name="写作专家",
    instructions="""你是专业研究报告写作专家。
    基于分析专家的笔记,生成高质量研究报告:
    - 执行摘要:200 字以内,点出核心价值
    - 关键发现:5-8 条,每条一句话
    - 详细分析:分节展开,使用数据支撑
    - 引用来源:列出所有参考链接

    报告风格:专业、客观、数据驱动
    """,
    model="gpt-4o",
    output_type=ResearchReport
)

# ── 研究协调 Agent(入口)──────────────────────────────────
research_orchestrator = Agent(
    name="研究协调员",
    instructions="""你是研究项目协调员。收到研究主题后:
    1. 首先转交给搜索专家收集信息
    2. 搜索完成后,转交分析专家进行深度分析
    3. 分析完成后,转交写作专家生成最终报告

    确保每个阶段都完整完成后再进行下一步。
    """,
    model="gpt-4o-mini",
    handoffs=[
        handoff(search_agent),
        handoff(analyst_agent),
        handoff(writer_agent),
    ]
)

FastAPI WebSocket 流式接口

# app.py
from fastapi import FastAPI, WebSocket
from agents import Runner
from agents.stream_events import RunItemStreamEvent, AgentUpdatedStreamEvent
from agents_module import research_orchestrator, ResearchReport, ResearchContext
import json

app = FastAPI(title="AI 研究助手")

@app.websocket("/research/stream")
async def research_websocket(websocket: WebSocket):
    await websocket.accept()

    try:
        # 接收研究主题
        data = await websocket.receive_json()
        topic = data.get("topic", "")

        if not topic:
            await websocket.send_json({"error": "请提供研究主题"})
            return

        # 初始化研究上下文
        ctx = ResearchContext(topic=topic)

        # 发送开始消息
        await websocket.send_json({
            "type": "start",
            "message": f"开始研究:{topic}"
        })

        # 流式运行研究工作流
        stream = Runner.run_streamed(
            research_orchestrator,
            f"请对以下主题进行深度研究并生成报告:{topic}",
            context=ctx
        )

        async for event in stream.stream_events():

            # Agent 切换:通知前端当前执行阶段
            if isinstance(event, AgentUpdatedStreamEvent):
                await websocket.send_json({
                    "type": "agent_switch",
                    "agent": event.new_agent.name,
                    "message": f"进入阶段:{event.new_agent.name}"
                })

            # 工具调用:通知前端正在搜索什么
            elif isinstance(event, RunItemStreamEvent):
                item = event.item
                if item.type == "tool_call_item":
                    tool_input = item.raw_item.arguments if hasattr(item.raw_item, "arguments") else ""
                    await websocket.send_json({
                        "type": "tool_call",
                        "tool": item.raw_item.name,
                        "message": f"调用工具:{item.raw_item.name}"
                    })

        # 发送最终报告
        final_output = stream.final_output
        if isinstance(final_output, ResearchReport):
            await websocket.send_json({
                "type": "complete",
                "report": final_output.model_dump()
            })

    except Exception as e:
        await websocket.send_json({"type": "error", "message": str(e)})
    finally:
        await websocket.close()

与 Ollama 集成:本地模型替代

# 使用 Ollama 运行本地模型(隐私数据场景)
# 前提:本地运行 ollama pull qwen2.5:7b
from openai import AsyncOpenAI
from agents import Agent, set_default_openai_client

# Ollama 兼容 OpenAI API 格式
ollama_client = AsyncOpenAI(
    base_url="http://localhost:11434/v1",
    api_key="ollama"  # Ollama 不需要真实 API Key
)

# 可以为不同 Agent 指定不同的客户端/模型
# 搜索 Agent 用 OpenAI(需要强推理),分析 Agent 用本地模型(保护数据)
local_analyst = Agent(
    name="本地分析专家",
    instructions="分析搜索结果,提炼关键信息。",
    model="qwen2.5:7b",  # Ollama 本地模型名称
    model_settings=ModelSettings(
        openai_client=ollama_client  # 指定使用 Ollama 客户端
    )
)

# 混合使用:公开数据用 OpenAI,私有数据用本地
# 这个架构在金融、医疗等合规场景非常实用

部署注意事项

Agent 超时设置
研究任务可能耗时较长(30s-3分钟),需要在 Nginx/Caddy 和 FastAPI 两层都设置足够的超时。Nginx 建议 proxy_read_timeout 300s;FastAPI WebSocket 连接不需要 timeout,但需要处理客户端断开(ConnectionClosedError)。
并发限制
每个研究任务可能调用 5-10 次 LLM,同时进行多个研究任务会快速消耗 OpenAI Rate Limit。建议使用 Redis 实现任务队列,限制同时运行的研究任务数(如最多 5 个)。
结果缓存
相同研究主题的结果可以缓存(Redis TTL 1-24小时)。缓存策略:对 topic 做 hash 作为 key,缓存整个 ResearchReport JSON。注意:时效性强的主题(当日新闻)不适合长时间缓存。
成本控制
一次完整研究约消耗 5,000-20,000 Token(gpt-4o),成本约 $0.05-0.25。建议为每个用户设置每日研究次数限制,并在 UI 中显示预估成本,让用户知情。

未来展望:SDK 路线图与 MCP 融合

OpenAI Agents SDK 路线图(2025 展望) ───────────────────────────────────────────────────────── 已实现(v0.x): ✓ 核心 Agent/Tool/Handoff/Runner/Guardrails ✓ 流式输出、内置追踪 ✓ 结构化输出(Pydantic) 规划中: → 原生 MCP 服务器支持 Agent 可以直接作为 MCP 工具暴露,供其他 AI 系统调用 → 持久化 Runs(断点续传) 长任务中断后可以从上次位置继续,不需要重新开始 → Agent 评估框架 内置 eval 套件,自动测量 Agent 的任务完成率和质量 → 多模态 Agent 直接处理图像、音频输入,Computer Use 集成 与 MCP 协议的融合: MCP(Model Context Protocol)是 Agent 工具集成的标准协议 openai-agents + MCP = 任何 MCP 工具服务都可以直接作为 Agent 工具 无需为每个平台写适配代码 已支持:Cursor MCP、Claude Desktop MCP 生态 ─────────────────────────────────────────────────────────
课程总结 恭喜完成 OpenAI Agents SDK 全部 10 章学习!核心要点回顾:Agent + Tool + Handoff 构成多 Agent 系统的三角基础;Runner 的三种模式(run/run_sync/run_streamed)适应不同场景;Context 是跨工具状态共享的安全容器;Guardrails 是生产系统的必备安全层;追踪系统让 Agent 行为可观测、可调试;异步并发和结构化输出是生产化的关键。祝你构建出优秀的 Agent 系统!