Chapter 09

生产化与性能优化

从实验到生产,掌握异步并发、结构化输出、Token 优化与成本控制,让 Agent 系统在高负载下稳定运行。

异步并发:同时运行多个 Agent

asyncio.gather 可以并发运行多个 Agent,大幅提升吞吐量:

from agents import Agent, Runner
import asyncio

# ── 并发处理多个用户请求 ──────────────────────────────────
agent = Agent(
    name="助手",
    instructions="简洁地回答问题。",
    model="gpt-4o-mini"
)

async def process_batch(questions: list[str]) -> list[str]:
    """并发处理多个问题"""
    # asyncio.gather 同时发出所有请求,而非串行等待
    tasks = [Runner.run(agent, q) for q in questions]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    answers = []
    for q, r in zip(questions, results):
        if isinstance(r, Exception):
            answers.append(f"[错误] {type(r).__name__}: {r}")
        else:
            answers.append(r.final_output)
    return answers

async def main():
    questions = [
        "Python 的 asyncio 是什么?",
        "什么是 Docker?",
        "解释一下 REST API。",
        "什么是微服务架构?",
    ]

    # 串行:约 4 * 2s = 8s
    # 并发:约 2s(受速率限制)
    answers = await process_batch(questions)
    for q, a in zip(questions, answers):
        print(f"Q: {q}\nA: {a}\n")

asyncio.run(main())

并发限制:Semaphore 控制速率

import asyncio
from agents import Agent, Runner

# OpenAI 有 TPM(每分钟 Token)和 RPM(每分钟请求)限制
# 使用 Semaphore 控制并发数,避免触发 Rate Limit
MAX_CONCURRENT = 10  # 最多同时 10 个 Agent 请求
semaphore = asyncio.Semaphore(MAX_CONCURRENT)

async def rate_limited_run(agent, question: str) -> str:
    async with semaphore:  # 获取信号量,限制并发
        result = await Runner.run(agent, question)
        return result.final_output

async def batch_with_rate_limit(agent, questions: list[str]) -> list[str]:
    tasks = [rate_limited_run(agent, q) for q in questions]
    return await asyncio.gather(*tasks, return_exceptions=True)

结构化输出:强制返回 Pydantic 模型

from pydantic import BaseModel, Field
from agents import Agent, Runner
from typing import Literal
import asyncio

# ── 定义结构化输出模型 ─────────────────────────────────────
class SentimentAnalysis(BaseModel):
    sentiment: Literal["positive", "negative", "neutral"]
    confidence: float = Field(ge=0.0, le=1.0, description="置信度 0-1")
    key_phrases: list[str] = Field(description="关键短语列表(最多5个)")
    reason: str = Field(description="判断理由(一句话)")

class ExtractedEntity(BaseModel):
    name: str
    entity_type: Literal["person", "organization", "location", "date"]
    context: str

class DocumentAnalysis(BaseModel):
    summary: str
    sentiment: SentimentAnalysis
    entities: list[ExtractedEntity]
    main_topics: list[str]
    word_count_estimate: int

# ── Agent 设置结构化输出 ───────────────────────────────────
analysis_agent = Agent(
    name="文档分析师",
    instructions="""分析用户提供的文本,提取情感、实体和主题。
    严格按照要求的 JSON 格式输出,不要添加额外说明。
    实体列表最多 10 个,主题最多 5 个。
    """,
    model="gpt-4o",
    output_type=DocumentAnalysis  # 强制结构化输出
)

async def analyze(text: str) -> DocumentAnalysis:
    result = await Runner.run(analysis_agent, text)
    # final_output_as() 返回 Pydantic 模型实例,带类型检查
    analysis = result.final_output_as(DocumentAnalysis)
    return analysis

async def main():
    text = "OpenAI 于 2025 年 3 月在旧金山发布了全新的 Agents SDK,获得了开发者社区的热烈欢迎。"
    result = await analyze(text)
    print(f"情感:{result.sentiment.sentiment}({result.sentiment.confidence:.0%})")
    print(f"实体:{[e.name for e in result.entities]}")
    print(f"摘要:{result.summary}")

asyncio.run(main())

错误重试:指数退避

import asyncio
from openai import RateLimitError, APITimeoutError, APIConnectionError
from agents import Runner

async def run_with_retry(
    agent,
    user_input: str,
    max_retries: int = 3,
    base_delay: float = 1.0
) -> str:
    """带指数退避的重试机制"""
    retryable_errors = (RateLimitError, APITimeoutError, APIConnectionError)

    for attempt in range(max_retries + 1):
        try:
            result = await Runner.run(agent, user_input)
            return result.final_output

        except retryable_errors as e:
            if attempt == max_retries:
                raise  # 超过重试次数,向上抛出

            # 指数退避:1s → 2s → 4s → 8s(最大 60s)
            delay = min(base_delay * (2 ** attempt), 60)
            print(f"请求失败({type(e).__name__}),{delay}s 后重试...")
            await asyncio.sleep(delay)

        except Exception:
            raise  # 不可重试的错误,直接抛出

Token 优化技巧

Instructions 优化

  • 删除不必要的重复说明
  • 用列表代替长段落(更少 Token)
  • 不要在 instructions 里放示例对话
  • 仅列出 Agent 真正需要的工具
  • 使用简洁英文代替中文(英文 Token 更少)
  • 定期 review instructions,删掉失效规则

对话历史管理

  • 不要传入完整的历史(超过 20 轮时摘要)
  • 只传入与当前任务相关的历史消息
  • 工具返回值截断(最多 2000 字符)
  • 多轮对话中间结果存储到 Context,不放消息历史
  • 使用 Prompt Caching(重复前缀免费)
  • 批量处理时共享系统提示词缓存

成本估算与预算控制

# Token 成本估算(2025 年参考价格,实际以 OpenAI 账单为准)
COST_PER_1K_TOKENS = {
    "gpt-4o":       {"input": 0.0025,  "output": 0.010},   # $
    "gpt-4o-mini":  {"input": 0.00015, "output": 0.0006},
    "o3-mini":      {"input": 0.00110, "output": 0.0044},
}

def estimate_cost(
    model: str,
    input_tokens: int,
    output_tokens: int
) -> float:
    """估算单次 Run 的美元成本"""
    pricing = COST_PER_1K_TOKENS.get(model, {})
    cost = (
        input_tokens / 1000 * pricing.get("input", 0) +
        output_tokens / 1000 * pricing.get("output", 0)
    )
    return cost

# 典型场景成本估算:
# 简单问答(gpt-4o-mini):输入 500 + 输出 200 tokens ≈ $0.0002
# 代码审查(gpt-4o):输入 2000 + 输出 1000 tokens ≈ $0.015
# 研究报告(多轮, gpt-4o):输入 10000 + 输出 3000 tokens ≈ $0.055
# 每天 1000 次简单问答:$0.20 / 天

# 预算保护:设置最大 Token 输出
from agents.models import ModelSettings

budget_conscious_agent = Agent(
    name="节省成本助手",
    instructions="用最少的 Token 清晰地回答问题。回答不超过3句话。",
    model="gpt-4o-mini",
    model_settings=ModelSettings(
        max_tokens=500  # 硬性输出上限,防止长篇大论
    )
)
生产化检查清单 上线前确认:max_turns 已设置(防循环)、工具返回值有长度限制、已配置重试和超时、已设置护栏(输入+输出)、已配置追踪(至少结构级别)、已写异常处理(捕获 MaxTurnsExceeded 等)、已做负载测试、已估算每日成本并设置告警阈值。