Chapter 09

多步任务规划与执行框架

让 Agent 像人类一样先思考再行动,从容应对复杂的长链任务

任务分解策略

为什么需要任务分解?

直接把复杂任务交给 Computer Use Agent,往往会遭遇以下困境:Claude 不知道从哪里开始、中途遗忘了最终目标、某一步失败后不知道如何恢复、Token 消耗在无效的截图和尝试上。任务分解(Task Decomposition)是解决这些问题的核心方法。

层级分解(Hierarchical Decomposition)
将大任务分解为子任务,子任务再分解为操作步骤。例如"月度报表自动化"→"数据采集"+"数据处理"+"报表生成"+"邮件分发"→每个子任务拆解为具体操作序列。
里程碑式分解(Milestone-based)
设定若干检查点(Milestone),每完成一个检查点就截图确认状态,决定是否继续。适合有风险的操作序列,每个检查点都是可以暂停和人工干预的时机。
并行分解(Parallel Decomposition)
识别可以并行执行的子任务,分配给多个 Agent 实例同时处理,最后汇总结果。适合批量处理场景(如同时处理 100 张订单)。
依赖图(Dependency Graph)
识别任务间的依赖关系,构建 DAG(有向无环图),按拓扑顺序执行。避免因为顺序错误导致任务失败。

任务分解 ASCII 示意图

原始任务:处理月度财务报表

层级分解:
月度报表自动化
├── 阶段1:数据采集
│   ├── 步骤1.1 打开 ERP 系统
│   ├── 步骤1.2 导出销售数据 (CSV)
│   └── 步骤1.3 导出成本数据 (CSV)
│   [里程碑1: 确认两个 CSV 文件已下载]
│
├── 阶段2:数据处理
│   ├── 步骤2.1 打开 Excel,导入 CSV
│   ├── 步骤2.2 执行数据清洗宏
│   └── 步骤2.3 计算利润率
│   [里程碑2: 确认数据表格完整]
│
├── 阶段3:报表生成
│   ├── 步骤3.1 套用报表模板
│   ├── 步骤3.2 生成图表
│   └── 步骤3.3 导出 PDF
│   [里程碑3: 确认 PDF 生成无误]
│
└── 阶段4:分发
    ├── 步骤4.1 打开邮件客户端
    ├── 步骤4.2 添加收件人
    └── 步骤4.3 发送
    [里程碑4: 确认邮件已发送]

思维链规划(先规划再执行)

两阶段执行模式

让 Claude 在执行操作之前先输出一个明确的计划,是提升复杂任务成功率的最有效手段之一。实现方式是在系统提示词中明确要求分两个阶段:

PLANNING_SYSTEM_PROMPT = """你是一个精确执行计算机任务的 AI 助手。

在开始执行任何任务之前,你必须:
1. 先输出一个详细的执行计划,格式如下:
   ## 执行计划
   - 目标:[用一句话描述最终目标]
   - 前提条件:[执行前需要确认的条件]
   - 步骤:
     1. [步骤1] - 预计结果:[...]
     2. [步骤2] - 预计结果:[...]
     ...
   - 风险点:[可能失败的环节]
   - 回滚方案:[如果失败如何恢复]

2. 计划输出完成后,用如下标记表示开始执行:
   ## 开始执行

3. 执行过程中,每完成一个主要步骤,输出:
   ✓ [步骤N] 完成,截图确认...

4. 遇到意外情况,立即停止并说明:
   ⚠️ 异常:[描述问题],等待指示

注意:不要跳过规划阶段直接开始操作。"""


async def run_planned_agent(
    client: anthropic.Anthropic,
    task: str,
    tools: list,
    computer_controller,
    require_plan_approval: bool = True
) -> str:
    """带规划阶段的 Agent 执行器"""
    messages = [{"role": "user", "content": task}]

    # 第一轮:获取计划
    plan_response = client.beta.messages.create(
        model="claude-opus-4-5",
        max_tokens=2048,
        system=PLANNING_SYSTEM_PROMPT,
        tools=tools,
        messages=messages,
        betas=["computer-use-2024-10-22"]
    )

    # 提取计划文本
    plan_text = "".join(
        b.text for b in plan_response.content if b.type == "text"
    )

    print("=== 执行计划 ===")
    print(plan_text)

    if require_plan_approval:
        approval = input("\n确认执行此计划?(yes/no): ")
        if approval.lower() != "yes":
            return "计划被用户取消"

    # 继续执行阶段
    messages.append({"role": "assistant", "content": plan_response.content})
    messages.append({
        "role": "user",
        "content": "计划已确认,请按计划开始执行。"
    })

    # 进入正常执行循环
    return await run_execution_loop(client, messages, tools, computer_controller)

里程碑检查点

带检查点的任务状态机

from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import json


class MilestoneStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"


@dataclass
class Milestone:
    id: str
    name: str
    description: str
    status: MilestoneStatus = MilestoneStatus.PENDING
    started_at: Optional[str] = None
    completed_at: Optional[str] = None
    screenshot_b64: Optional[str] = None  # 里程碑截图证据
    notes: list[str] = field(default_factory=list)


class MilestoneTracker:
    """任务里程碑追踪器"""

    def __init__(self, task_id: str, milestones: list[Milestone]):
        self.task_id = task_id
        self.milestones = {m.id: m for m in milestones}
        self._order = [m.id for m in milestones]

    def start(self, milestone_id: str):
        m = self.milestones[milestone_id]
        m.status = MilestoneStatus.IN_PROGRESS
        m.started_at = datetime.utcnow().isoformat()

    def complete(self, milestone_id: str, screenshot_b64: str = None, notes: str = None):
        m = self.milestones[milestone_id]
        m.status = MilestoneStatus.COMPLETED
        m.completed_at = datetime.utcnow().isoformat()
        if screenshot_b64:
            m.screenshot_b64 = screenshot_b64
        if notes:
            m.notes.append(notes)

    def fail(self, milestone_id: str, reason: str):
        m = self.milestones[milestone_id]
        m.status = MilestoneStatus.FAILED
        m.notes.append(f"失败原因: {reason}")

    def current_milestone(self) -> Optional[Milestone]:
        for mid in self._order:
            m = self.milestones[mid]
            if m.status in (MilestoneStatus.PENDING, MilestoneStatus.IN_PROGRESS):
                return m
        return None

    def progress_summary(self) -> str:
        completed = sum(1 for m in self.milestones.values()
                        if m.status == MilestoneStatus.COMPLETED)
        total = len(self.milestones)
        lines = [f"任务进度: {completed}/{total}"]
        for mid in self._order:
            m = self.milestones[mid]
            icon = {
                MilestoneStatus.PENDING: "○",
                MilestoneStatus.IN_PROGRESS: "●",
                MilestoneStatus.COMPLETED: "✓",
                MilestoneStatus.FAILED: "✗",
                MilestoneStatus.SKIPPED: "—",
            }[m.status]
            lines.append(f"  {icon} {m.name}")
        return "\n".join(lines)

    def save_state(self, path: str):
        """保存任务状态到文件(用于断点续传)"""
        state = {
            "task_id": self.task_id,
            "milestones": [
                {
                    "id": m.id, "name": m.name,
                    "status": m.status.value,
                    "completed_at": m.completed_at,
                    "notes": m.notes
                }
                for m in self.milestones.values()
            ]
        }
        with open(path, "w") as f:
            json.dump(state, f, ensure_ascii=False, indent=2)

失败恢复机制

Retry / Fallback / Ask Human 三级策略

from enum import Enum


class RecoveryStrategy(Enum):
    RETRY = "retry"           # 重试当前步骤
    RETRY_DIFFERENT = "retry_different"  # 换一种方式重试
    FALLBACK = "fallback"     # 用备用方案
    ASK_HUMAN = "ask_human"   # 请求人工介入
    ABORT = "abort"           # 中止任务


class FailureRecoveryEngine:
    """任务失败恢复引擎"""

    # 不同错误类型对应的恢复策略
    ERROR_STRATEGIES = {
        "screenshot_failed": (RecoveryStrategy.RETRY, 3),
        "element_not_found": (RecoveryStrategy.RETRY_DIFFERENT, 2),
        "page_load_timeout": (RecoveryStrategy.RETRY, 2),
        "captcha_detected": (RecoveryStrategy.ASK_HUMAN, 1),
        "permission_denied": (RecoveryStrategy.ASK_HUMAN, 1),
        "data_validation_failed": (RecoveryStrategy.ASK_HUMAN, 1),
        "network_error": (RecoveryStrategy.RETRY, 3),
    }

    def __init__(self):
        self._attempt_counts: dict[str, int] = {}

    async def handle_failure(
        self,
        error_type: str,
        context: dict,
        human_callback=None
    ) -> dict:
        """处理失败并返回恢复指令"""
        strategy, max_attempts = self.ERROR_STRATEGIES.get(
            error_type,
            (RecoveryStrategy.ASK_HUMAN, 1)
        )
        key = f"{error_type}:{context.get('step', 'unknown')}"
        self._attempt_counts[key] = self._attempt_counts.get(key, 0) + 1

        if self._attempt_counts[key] > max_attempts:
            # 超过重试次数,升级到人工介入
            strategy = RecoveryStrategy.ASK_HUMAN

        if strategy == RecoveryStrategy.RETRY:
            import asyncio
            await asyncio.sleep(1.0 * self._attempt_counts[key])
            return {"action": "retry", "message": "请重试上一个步骤"}

        elif strategy == RecoveryStrategy.RETRY_DIFFERENT:
            return {
                "action": "retry_different",
                "message": f"上一步失败({error_type}),请尝试其他方法完成相同目标。考虑使用不同的选择器、坐标或操作方式。"
            }

        elif strategy == RecoveryStrategy.ASK_HUMAN:
            if human_callback:
                guidance = await human_callback(error_type, context)
                return {"action": "continue_with_guidance", "guidance": guidance}
            return {"action": "pause", "message": f"任务暂停:{error_type},需要人工处理"}

        else:
            return {"action": "abort", "message": "任务已中止"}

Messages 历史管理

避免 Context 溢出的策略

长时间运行的 Agent 会积累大量消息历史,最终超出 Claude 的上下文窗口限制(200K tokens)。以下是几种常见的历史管理策略:

import anthropic
from typing import Literal


class MessageHistoryManager:
    """Messages 历史管理器,防止 Context 溢出"""

    def __init__(
        self,
        max_messages: int = 50,
        strategy: Literal["sliding_window", "summarize", "milestone_reset"] = "sliding_window"
    ):
        self.max_messages = max_messages
        self.strategy = strategy
        self._messages: list[dict] = []
        self._milestone_summaries: list[str] = []

    def add(self, message: dict):
        self._messages.append(message)
        if len(self._messages) > self.max_messages:
            self._trim()

    def _trim(self):
        if self.strategy == "sliding_window":
            # 保留第1条(任务描述)和最近 N 条
            keep = self.max_messages // 2
            self._messages = [self._messages[0]] + self._messages[-keep:]

        elif self.strategy == "milestone_reset":
            # 达到里程碑时压缩历史为摘要
            recent_content = "".join(
                msg.get("content", "") if isinstance(msg.get("content"), str) else ""
                for msg in self._messages[1:]
            )
            summary = f"[已完成步骤摘要] 已成功执行 {len(self._messages)-1} 步操作。最后状态: {recent_content[-200:]}"
            self._milestone_summaries.append(summary)
            # 保留任务描述 + 摘要 + 最近几条
            self._messages = [
                self._messages[0],
                {"role": "user", "content": "\n".join(self._milestone_summaries)},
            ] + self._messages[-10:]

    @property
    def messages(self) -> list[dict]:
        return self._messages

    def estimate_tokens(self) -> int:
        """粗略估算消息历史的 token 数量"""
        total_chars = sum(
            len(str(msg.get("content", "")))
            for msg in self._messages
        )
        return total_chars // 4  # 粗略估算:4字符≈1token

    def should_trim(self, threshold_tokens: int = 150000) -> bool:
        return self.estimate_tokens() > threshold_tokens

状态跟踪

持久化 Agent 执行状态

from dataclasses import dataclass, field, asdict
import json
import os


@dataclass
class AgentState:
    """Agent 运行时状态,支持持久化和恢复"""
    task_id: str
    original_task: str
    current_step: int = 0
    completed_steps: list[str] = field(default_factory=list)
    failed_steps: list[dict] = field(default_factory=list)
    context_vars: dict = field(default_factory=dict)  # 步骤间传递的变量
    status: str = "running"  # running/completed/failed/paused
    created_at: str = ""
    updated_at: str = ""
    token_used: int = 0


class AgentStateManager:
    """Agent 状态持久化管理器"""

    def __init__(self, state_dir: str = "/tmp/agent_states"):
        self.state_dir = state_dir
        os.makedirs(state_dir, exist_ok=True)

    def save(self, state: AgentState):
        state.updated_at = datetime.utcnow().isoformat()
        path = os.path.join(self.state_dir, f"{state.task_id}.json")
        with open(path, "w") as f:
            json.dump(asdict(state), f, ensure_ascii=False, indent=2)

    def load(self, task_id: str) -> Optional[AgentState]:
        path = os.path.join(self.state_dir, f"{task_id}.json")
        if not os.path.exists(path):
            return None
        with open(path) as f:
            data = json.load(f)
        return AgentState(**data)

    def set_context_var(self, state: AgentState, key: str, value):
        """在步骤间传递变量(如上一步获取的文件路径)"""
        state.context_vars[key] = value
        self.save(state)

并行 Agent 协作

多 Agent 分工并行执行

import asyncio
from dataclasses import dataclass


@dataclass
class AgentTask:
    agent_id: str
    task: str
    tools: list[dict]
    dependencies: list[str] = field(default_factory=list)  # 依赖的 agent_id


class MultiAgentOrchestrator:
    """多 Agent 协调器,支持并行和依赖管理"""

    def __init__(self, client: anthropic.Anthropic):
        self.client = client
        self._results: dict[str, str] = {}

    async def run_agent(self, agent_task: AgentTask) -> str:
        """执行单个 Agent 任务,将上游依赖结果注入 context"""
        # 等待所有依赖完成
        dep_results = {}
        for dep_id in agent_task.dependencies:
            while dep_id not in self._results:
                await asyncio.sleep(0.1)
            dep_results[dep_id] = self._results[dep_id]

        # 构建包含依赖结果的任务
        task_with_context = agent_task.task
        if dep_results:
            context_str = "\n".join(
                f"[来自 Agent {k} 的结果]\n{v}"
                for k, v in dep_results.items()
            )
            task_with_context = f"{task_with_context}\n\n上游结果:\n{context_str}"

        result = await run_computer_use_agent(
            self.client, agent_task.tools, task_with_context
        )
        self._results[agent_task.agent_id] = result
        return result

    async def run_all(self, tasks: list[AgentTask]) -> dict[str, str]:
        """并行执行所有任务,自动处理依赖顺序"""
        coroutines = [self.run_agent(t) for t in tasks]
        await asyncio.gather(*coroutines)
        return self._results


# 使用示例:并行处理多个用户报表
async def batch_report_generation(user_ids: list[str]):
    orchestrator = MultiAgentOrchestrator(client)
    tasks = [
        AgentTask(
            agent_id=f"user_{uid}",
            task=f"生成用户 {uid} 的月度报表并保存到 /output/{uid}_report.pdf",
            tools=COMPUTER_USE_TOOLS
        )
        for uid in user_ids
    ]
    results = await orchestrator.run_all(tasks)
    return results
并行 Agent 的资源竞争

多个 Agent 并行运行时,需要注意资源竞争问题:如果多个 Agent 都操作同一个浏览器实例,会相互干扰。建议为每个并行 Agent 分配独立的 Docker 容器或虚拟桌面实例,确保环境完全隔离。成本方面,N 个并行 Agent 意味着 N 倍的 API 调用费用,需要权衡。

规划与执行分离的价值

将"规划"(Planner)和"执行"(Executor)分离为两个独立的 Claude 调用,是一种高级架构模式。Planner 使用完整的上下文和工具列表生成执行计划(可以是 JSON 格式的步骤列表);Executor 专注执行单个步骤,只拿到当前步骤的描述和前置结果。这样的好处是:单步执行更专注、更少受历史干扰,整体任务成功率更高,且每步的 Token 消耗更低。

任务分解的深层原理

为什么 Agent 需要外部框架来维持状态?

理解这一点需要先理解 LLM 的无状态本质:每次 API 调用都是独立的,Claude 只能看到当前 messages 数组中的内容。没有外部状态管理,长链任务会面临以下根本性困难:

上下文窗口有限(Context Window Limit)
claude-opus-4-5 的上下文窗口为 200K tokens,看似很大,但每次截图消耗 600-1500 tokens,一个 20 步的任务如果每步截 2 张图,就已消耗约 20,000-60,000 tokens,再加上消息历史,极易溢出。外部状态管理(如 MilestoneTracker)允许我们在 messages 中只保留当前阶段必要的上下文,旧的已完成步骤摘要化为文本,大幅节约 token。
目标漂移(Goal Drift)
LLM 在长对话中有"目标漂移"倾向——随着对话越来越长,最初的目标逐渐被近期内容"稀释",Claude 可能开始关注最后几条消息而忘记原始任务。解决方案是在每轮请求的 system prompt 或最新 user 消息中重复注入当前里程碑目标,保持目标的"新鲜度"。
失败传播(Failure Propagation)
没有检查点时,步骤3的失败可能到步骤8才暴露(因为后续步骤依赖步骤3的输出但未验证)。此时已无法回溯到步骤3重试,整个任务必须从头开始。里程碑检查点在每个关键步骤后立即验证状态,将失败的检测时机提前,减少浪费的后续工作。
并行副作用(Parallel Side Effects)
多个 Agent 并行时,如果操作同一共享资源(同一个文件、同一个数据库行、同一个 UI 窗口),会产生竞争条件(Race Condition)。必须通过任务级别的互斥锁(Mutex)或资源隔离(每个 Agent 独立容器)来避免。不要假设"并行就一定更快"——资源争用可能让并行 Agent 比串行更慢。

消息历史管理的关键技术

长任务中管理 messages 数组的增长是工程难点。以下是三种常用策略及其原理:

from dataclasses import dataclass
from typing import List, Dict, Any
import anthropic


class MessageHistoryManager:
    """
    管理 Computer Use Agent 的 messages 历史。

    策略一:滑动窗口 —— 只保留最近 N 轮完整对话
    策略二:摘要压缩 —— 将旧轮次的工具调用压缩为文字摘要
    策略三:里程碑快照 —— 完成里程碑后清空历史,注入里程碑摘要
    """

    def __init__(
        self,
        max_messages: int = 40,     # 滑动窗口大小(消息条数)
        compress_after: int = 30,   # 超过多少条时触发摘要压缩
    ):
        self.max_messages = max_messages
        self.compress_after = compress_after
        self.messages: List[Dict] = []
        self.milestone_summaries: List[str] = []

    def add(self, role: str, content: Any):
        """添加消息,超出限制时自动触发压缩"""
        self.messages.append({"role": role, "content": content})
        if len(self.messages) > self.compress_after:
            self._compress_old_messages()

    def _compress_old_messages(self):
        """
        将前半部分消息压缩为一条摘要文本消息。

        压缩前: [msg1, msg2, ..., msg30, msg31, ..., msg40]
        压缩后: [summary_msg, msg21, ..., msg40]  (保留最新20条)
        """
        keep_recent = self.max_messages // 2  # 保留最新的一半
        old_messages = self.messages[:-keep_recent]
        recent_messages = self.messages[-keep_recent:]

        # 将旧消息提炼为摘要(实际中可用另一个 LLM 调用生成摘要)
        summary_parts = []
        for msg in old_messages:
            if isinstance(msg["content"], list):
                # 处理包含 tool_use / tool_result 的复杂消息
                for block in msg["content"]:
                    if isinstance(block, dict) and block.get("type") == "tool_use":
                        # 提取工具调用的关键信息
                        tool_name = block.get("name", "unknown")
                        inp = block.get("input", {})
                        summary_parts.append(f"[已执行] {tool_name}: {str(inp)[:100]}")
            elif isinstance(msg["content"], str):
                # 纯文本消息直接截断保留
                summary_parts.append(msg["content"][:200])

        summary_text = "[历史操作摘要]\n" + "\n".join(summary_parts[:20])
        summary_msg = {"role": "user", "content": summary_text}

        # 重置 messages:摘要 + 最新的消息
        self.messages = [summary_msg] + recent_messages

    def snapshot_milestone(self, milestone_name: str, result_summary: str):
        """
        完成里程碑时拍快照:清空所有消息,只保留里程碑摘要。
        适合任务分为明确阶段的场景(如 阶段1→阶段2→阶段3)。
        """
        self.milestone_summaries.append(f"✓ {milestone_name}: {result_summary}")
        context = "\n".join(self.milestone_summaries)
        # 清空历史,注入里程碑上下文
        self.messages = [{
            "role": "user",
            "content": f"已完成进度:\n{context}\n\n继续下一阶段。"
        }]

    def get_messages(self) -> List[Dict]:
        """获取当前 messages 数组,供 API 调用使用"""
        return self.messages

失败恢复的策略分类

不同类型的失败需要不同的恢复策略,盲目重试往往无效甚至有害:

瞬时失败(Transient Failure)
原因是临时状态(网络抖动、页面加载慢、弹窗遮挡),适合策略:等待后直接重试(Retry with Wait)。判断依据:同样的截图在1-2秒后往往已恢复正常。最大重试3次,间隔指数增长(1s, 2s, 4s)。
状态失败(State Failure)
原因是当前状态不符合预期(已登录/未登录、文件已存在/不存在、权限不足),适合策略:检测状态后选择备用路径(Fallback Path)。不能简单重试,需要先修正状态再继续。
逻辑失败(Logic Failure)
原因是提示词理解错误、步骤规划有误,适合策略:回滚到上一个检查点(Checkpoint Rollback),用更具体的提示词重新规划该阶段。简单重试会重复同样的错误。
环境失败(Environment Failure)
原因是目标应用崩溃、系统资源耗尽、容器网络故障,适合策略:重置环境(Container Restart),从最近里程碑快照重新加载状态后继续执行。

常见误区:并行 Agent 性能假设

误区:并行 Agent 总是比串行更快

实际上,以下情况下并行 Agent 可能更慢或更昂贵:

只有真正独立的批量任务(如同时处理 100 个互不相关的订单)才适合并行 Agent 架构。

第9章小结