异步并发:同时运行多个 Agent
asyncio.gather 可以并发运行多个 Agent,大幅提升吞吐量:
from agents import Agent, Runner
import asyncio
# ── 并发处理多个用户请求 ──────────────────────────────────
agent = Agent(
name="助手",
instructions="简洁地回答问题。",
model="gpt-4o-mini"
)
async def process_batch(questions: list[str]) -> list[str]:
"""并发处理多个问题"""
# asyncio.gather 同时发出所有请求,而非串行等待
tasks = [Runner.run(agent, q) for q in questions]
results = await asyncio.gather(*tasks, return_exceptions=True)
answers = []
for q, r in zip(questions, results):
if isinstance(r, Exception):
answers.append(f"[错误] {type(r).__name__}: {r}")
else:
answers.append(r.final_output)
return answers
async def main():
questions = [
"Python 的 asyncio 是什么?",
"什么是 Docker?",
"解释一下 REST API。",
"什么是微服务架构?",
]
# 串行:约 4 * 2s = 8s
# 并发:约 2s(受速率限制)
answers = await process_batch(questions)
for q, a in zip(questions, answers):
print(f"Q: {q}\nA: {a}\n")
asyncio.run(main())
并发限制:Semaphore 控制速率
import asyncio
from agents import Agent, Runner
# OpenAI 有 TPM(每分钟 Token)和 RPM(每分钟请求)限制
# 使用 Semaphore 控制并发数,避免触发 Rate Limit
MAX_CONCURRENT = 10 # 最多同时 10 个 Agent 请求
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async def rate_limited_run(agent, question: str) -> str:
async with semaphore: # 获取信号量,限制并发
result = await Runner.run(agent, question)
return result.final_output
async def batch_with_rate_limit(agent, questions: list[str]) -> list[str]:
tasks = [rate_limited_run(agent, q) for q in questions]
return await asyncio.gather(*tasks, return_exceptions=True)
结构化输出:强制返回 Pydantic 模型
from pydantic import BaseModel, Field
from agents import Agent, Runner
from typing import Literal
import asyncio
# ── 定义结构化输出模型 ─────────────────────────────────────
class SentimentAnalysis(BaseModel):
sentiment: Literal["positive", "negative", "neutral"]
confidence: float = Field(ge=0.0, le=1.0, description="置信度 0-1")
key_phrases: list[str] = Field(description="关键短语列表(最多5个)")
reason: str = Field(description="判断理由(一句话)")
class ExtractedEntity(BaseModel):
name: str
entity_type: Literal["person", "organization", "location", "date"]
context: str
class DocumentAnalysis(BaseModel):
summary: str
sentiment: SentimentAnalysis
entities: list[ExtractedEntity]
main_topics: list[str]
word_count_estimate: int
# ── Agent 设置结构化输出 ───────────────────────────────────
analysis_agent = Agent(
name="文档分析师",
instructions="""分析用户提供的文本,提取情感、实体和主题。
严格按照要求的 JSON 格式输出,不要添加额外说明。
实体列表最多 10 个,主题最多 5 个。
""",
model="gpt-4o",
output_type=DocumentAnalysis # 强制结构化输出
)
async def analyze(text: str) -> DocumentAnalysis:
result = await Runner.run(analysis_agent, text)
# final_output_as() 返回 Pydantic 模型实例,带类型检查
analysis = result.final_output_as(DocumentAnalysis)
return analysis
async def main():
text = "OpenAI 于 2025 年 3 月在旧金山发布了全新的 Agents SDK,获得了开发者社区的热烈欢迎。"
result = await analyze(text)
print(f"情感:{result.sentiment.sentiment}({result.sentiment.confidence:.0%})")
print(f"实体:{[e.name for e in result.entities]}")
print(f"摘要:{result.summary}")
asyncio.run(main())
错误重试:指数退避
import asyncio
from openai import RateLimitError, APITimeoutError, APIConnectionError
from agents import Runner
async def run_with_retry(
agent,
user_input: str,
max_retries: int = 3,
base_delay: float = 1.0
) -> str:
"""带指数退避的重试机制"""
retryable_errors = (RateLimitError, APITimeoutError, APIConnectionError)
for attempt in range(max_retries + 1):
try:
result = await Runner.run(agent, user_input)
return result.final_output
except retryable_errors as e:
if attempt == max_retries:
raise # 超过重试次数,向上抛出
# 指数退避:1s → 2s → 4s → 8s(最大 60s)
delay = min(base_delay * (2 ** attempt), 60)
print(f"请求失败({type(e).__name__}),{delay}s 后重试...")
await asyncio.sleep(delay)
except Exception:
raise # 不可重试的错误,直接抛出
Token 优化技巧
Instructions 优化
- 删除不必要的重复说明
- 用列表代替长段落(更少 Token)
- 不要在 instructions 里放示例对话
- 仅列出 Agent 真正需要的工具
- 使用简洁英文代替中文(英文 Token 更少)
- 定期 review instructions,删掉失效规则
对话历史管理
- 不要传入完整的历史(超过 20 轮时摘要)
- 只传入与当前任务相关的历史消息
- 工具返回值截断(最多 2000 字符)
- 多轮对话中间结果存储到 Context,不放消息历史
- 使用 Prompt Caching(重复前缀免费)
- 批量处理时共享系统提示词缓存
成本估算与预算控制
# Token 成本估算(2025 年参考价格,实际以 OpenAI 账单为准)
COST_PER_1K_TOKENS = {
"gpt-4o": {"input": 0.0025, "output": 0.010}, # $
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"o3-mini": {"input": 0.00110, "output": 0.0044},
}
def estimate_cost(
model: str,
input_tokens: int,
output_tokens: int
) -> float:
"""估算单次 Run 的美元成本"""
pricing = COST_PER_1K_TOKENS.get(model, {})
cost = (
input_tokens / 1000 * pricing.get("input", 0) +
output_tokens / 1000 * pricing.get("output", 0)
)
return cost
# 典型场景成本估算:
# 简单问答(gpt-4o-mini):输入 500 + 输出 200 tokens ≈ $0.0002
# 代码审查(gpt-4o):输入 2000 + 输出 1000 tokens ≈ $0.015
# 研究报告(多轮, gpt-4o):输入 10000 + 输出 3000 tokens ≈ $0.055
# 每天 1000 次简单问答:$0.20 / 天
# 预算保护:设置最大 Token 输出
from agents.models import ModelSettings
budget_conscious_agent = Agent(
name="节省成本助手",
instructions="用最少的 Token 清晰地回答问题。回答不超过3句话。",
model="gpt-4o-mini",
model_settings=ModelSettings(
max_tokens=500 # 硬性输出上限,防止长篇大论
)
)
生产化检查清单
上线前确认:max_turns 已设置(防循环)、工具返回值有长度限制、已配置重试和超时、已设置护栏(输入+输出)、已配置追踪(至少结构级别)、已写异常处理(捕获 MaxTurnsExceeded 等)、已做负载测试、已估算每日成本并设置告警阈值。