Chapter 04

Runner 与 Run 生命周期

Runner 是驱动 Agent 执行的核心引擎。掌握三种运行模式、流式事件处理、多轮对话管理,构建响应式 AI 接口。

三种运行模式

Runner.run()(异步模式)
推荐用于生产环境。在 async 函数中使用 await,可与其他异步操作并发执行。适合 FastAPI、aiohttp 等异步 Web 框架。
Runner.run_sync()(同步模式)
适合脚本、CLI 工具、测试代码等同步场景。内部自动创建事件循环,不需要 asyncio.run() 包装。注意:不能在已有事件循环中调用(如 Jupyter Notebook 需要特殊处理)。
Runner.run_streamed()(流式模式)
返回流式结果,允许在 Agent 完整执行前就开始处理部分输出。适合需要实时反馈的 UI 场景(聊天界面、进度展示)。
from agents import Agent, Runner
import asyncio

agent = Agent(
    name="助手",
    instructions="简洁地回答问题。",
    model="gpt-4o-mini"
)

# ── 模式 1:异步(推荐)──────────────────────────────────
async def async_demo():
    result = await Runner.run(agent, "什么是协程?")
    print(result.final_output)

# ── 模式 2:同步(脚本)──────────────────────────────────
def sync_demo():
    result = Runner.run_sync(agent, "什么是协程?")
    print(result.final_output)

# ── 模式 3:流式(实时反馈)──────────────────────────────
async def stream_demo():
    result = Runner.run_streamed(agent, "解释量子纠缠")
    async for event in result.stream_events():
        # 只打印文字 delta,忽略其他事件
        if hasattr(event, "delta") and hasattr(event.delta, "text"):
            print(event.delta.text, end="", flush=True)
    print()  # 换行

asyncio.run(stream_demo())

流式事件类型详解

流式模式下,stream_events() 会产生不同类型的事件,每种事件代表 Agent 执行过程中的一个阶段:

from agents import Runner
from agents.stream_events import (
    RawResponsesStreamEvent,    # 原始 LLM 流(文字 delta)
    RunItemStreamEvent,         # Run 中的离散事件(工具调用完成等)
    AgentUpdatedStreamEvent,    # Agent 切换事件(Handoff 后触发)
)

async def detailed_stream_demo(agent, user_input: str):
    result = Runner.run_streamed(agent, user_input)

    async for event in result.stream_events():

        # 1️⃣ 原始 LLM 响应流(每个文字 token)
        if isinstance(event, RawResponsesStreamEvent):
            data = event.data
            if hasattr(data, "delta") and hasattr(data.delta, "text"):
                print(data.delta.text, end="", flush=True)

        # 2️⃣ Run 项事件(工具调用开始/完成、消息创建等)
        elif isinstance(event, RunItemStreamEvent):
            item = event.item
            if item.type == "tool_call_item":
                print(f"\n[调用工具: {item.raw_item.name}]")
            elif item.type == "tool_call_output_item":
                print(f[工具返回: {str(item.output)[:50]}...]")

        # 3️⃣ Agent 切换事件(发生 Handoff 时)
        elif isinstance(event, AgentUpdatedStreamEvent):
            print(f"\n[切换到 Agent: {event.new_agent.name}]")

    # 流完成后,仍然可以访问完整结果
    print(f"\n\n最终回复:{result.final_output}")

多轮对话:维护消息历史

默认情况下,每次 Runner.run() 都是独立的对话。要实现多轮对话,需要手动维护消息历史:

from agents import Agent, Runner
import asyncio

agent = Agent(
    name="对话助手",
    instructions="你是一个有记忆的对话助手,记住对话历史并保持上下文一致性。",
    model="gpt-4o-mini"
)

async def multi_turn_chat():
    # 初始化消息历史(空列表 = 新对话)
    message_history = []

    questions = [
        "我叫 Alice,我是一名 Python 开发者。",
        "你还记得我的名字吗?我从事什么职业?",
        "给我推荐一个适合我的 Python 库。"
    ]

    for question in questions:
        print(f"用户:{question}")

        # 将当前消息历史作为输入(list 格式)
        # 第一轮:message_history = [],直接传字符串
        # 后续轮:message_history 包含之前的对话
        if not message_history:
            input_data = question
        else:
            # to_input_list() 将上次结果转为标准消息列表
            # 然后追加新的用户消息
            input_data = message_history + [{
                "role": "user",
                "content": question
            }]

        result = await Runner.run(agent, input_data)
        print(f"助手:{result.final_output}\n")

        # 更新消息历史:包含本次的问答
        message_history = result.to_input_list()

asyncio.run(multi_turn_chat())

max_turns:安全阀机制

from agents import Runner, RunConfig
from agents.exceptions import MaxTurnsExceeded

async def safe_run(agent, user_input: str):
    try:
        result = await Runner.run(
            agent,
            user_input,
            run_config=RunConfig(
                max_turns=10,   # 最多 10 轮 LLM 调用,防止无限循环
            )
        )
        return result.final_output

    except MaxTurnsExceeded:
        return "抱歉,任务处理超时(超过最大轮次限制),请简化您的请求。"

实战:FastAPI SSE 流式聊天接口

将流式 Agent 集成到 FastAPI,实现 Server-Sent Events(SSE)实时推送:

# app.py — FastAPI + Agents SDK 流式聊天接口
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
from agents.stream_events import RawResponsesStreamEvent
from pydantic import BaseModel
import json, asyncio

app = FastAPI()

# 创建 Agent(全局单例,复用连接池)
chat_agent = Agent(
    name="聊天助手",
    instructions="你是友善的 AI 助手,使用 Markdown 格式输出。",
    model="gpt-4o-mini"
)

class ChatRequest(BaseModel):
    message: str
    history: list[dict] = []  # 消息历史

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    async def generate():
        # 构建输入(历史 + 新消息)
        if req.history:
            input_data = req.history + [{"role": "user", "content": req.message}]
        else:
            input_data = req.message

        # 流式运行 Agent
        stream_result = Runner.run_streamed(chat_agent, input_data)

        async for event in stream_result.stream_events():
            if isinstance(event, RawResponsesStreamEvent):
                data = event.data
                if hasattr(data, "delta") and hasattr(data.delta, "text"):
                    # SSE 格式:data: {json}\n\n
                    chunk = json.dumps({"delta": data.delta.text}, ensure_ascii=False)
                    yield f"data: {chunk}\n\n"

        # 流结束信号
        final = json.dumps({"done": True, "full": stream_result.final_output})
        yield f"data: {final}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )

# 前端 JavaScript 消费 SSE:
# const es = new EventSource('/chat/stream', {method: 'POST', body: JSON.stringify({...})});
# es.onmessage = (e) => { const data = JSON.parse(e.data); ... };
生产部署提示 在 Nginx/Caddy 前代理 SSE 时,需要禁用代理缓冲(proxy_buffering off)。Gunicorn 建议使用 uvicorn workers(uvicorn.workers.UvicornWorker)运行 FastAPI,充分利用 asyncio 的并发能力。