任务分解策略
为什么需要任务分解?
直接把复杂任务交给 Computer Use Agent,往往会遭遇以下困境:Claude 不知道从哪里开始、中途遗忘了最终目标、某一步失败后不知道如何恢复、Token 消耗在无效的截图和尝试上。任务分解(Task Decomposition)是解决这些问题的核心方法。
任务分解 ASCII 示意图
原始任务:处理月度财务报表
层级分解:
月度报表自动化
├── 阶段1:数据采集
│ ├── 步骤1.1 打开 ERP 系统
│ ├── 步骤1.2 导出销售数据 (CSV)
│ └── 步骤1.3 导出成本数据 (CSV)
│ [里程碑1: 确认两个 CSV 文件已下载]
│
├── 阶段2:数据处理
│ ├── 步骤2.1 打开 Excel,导入 CSV
│ ├── 步骤2.2 执行数据清洗宏
│ └── 步骤2.3 计算利润率
│ [里程碑2: 确认数据表格完整]
│
├── 阶段3:报表生成
│ ├── 步骤3.1 套用报表模板
│ ├── 步骤3.2 生成图表
│ └── 步骤3.3 导出 PDF
│ [里程碑3: 确认 PDF 生成无误]
│
└── 阶段4:分发
├── 步骤4.1 打开邮件客户端
├── 步骤4.2 添加收件人
└── 步骤4.3 发送
[里程碑4: 确认邮件已发送]
思维链规划(先规划再执行)
两阶段执行模式
让 Claude 在执行操作之前先输出一个明确的计划,是提升复杂任务成功率的最有效手段之一。实现方式是在系统提示词中明确要求分两个阶段:
PLANNING_SYSTEM_PROMPT = """你是一个精确执行计算机任务的 AI 助手。
在开始执行任何任务之前,你必须:
1. 先输出一个详细的执行计划,格式如下:
## 执行计划
- 目标:[用一句话描述最终目标]
- 前提条件:[执行前需要确认的条件]
- 步骤:
1. [步骤1] - 预计结果:[...]
2. [步骤2] - 预计结果:[...]
...
- 风险点:[可能失败的环节]
- 回滚方案:[如果失败如何恢复]
2. 计划输出完成后,用如下标记表示开始执行:
## 开始执行
3. 执行过程中,每完成一个主要步骤,输出:
✓ [步骤N] 完成,截图确认...
4. 遇到意外情况,立即停止并说明:
⚠️ 异常:[描述问题],等待指示
注意:不要跳过规划阶段直接开始操作。"""
async def run_planned_agent(
client: anthropic.Anthropic,
task: str,
tools: list,
computer_controller,
require_plan_approval: bool = True
) -> str:
"""带规划阶段的 Agent 执行器"""
messages = [{"role": "user", "content": task}]
# 第一轮:获取计划
plan_response = client.beta.messages.create(
model="claude-opus-4-5",
max_tokens=2048,
system=PLANNING_SYSTEM_PROMPT,
tools=tools,
messages=messages,
betas=["computer-use-2024-10-22"]
)
# 提取计划文本
plan_text = "".join(
b.text for b in plan_response.content if b.type == "text"
)
print("=== 执行计划 ===")
print(plan_text)
if require_plan_approval:
approval = input("\n确认执行此计划?(yes/no): ")
if approval.lower() != "yes":
return "计划被用户取消"
# 继续执行阶段
messages.append({"role": "assistant", "content": plan_response.content})
messages.append({
"role": "user",
"content": "计划已确认,请按计划开始执行。"
})
# 进入正常执行循环
return await run_execution_loop(client, messages, tools, computer_controller)
里程碑检查点
带检查点的任务状态机
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import json
class MilestoneStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class Milestone:
id: str
name: str
description: str
status: MilestoneStatus = MilestoneStatus.PENDING
started_at: Optional[str] = None
completed_at: Optional[str] = None
screenshot_b64: Optional[str] = None # 里程碑截图证据
notes: list[str] = field(default_factory=list)
class MilestoneTracker:
"""任务里程碑追踪器"""
def __init__(self, task_id: str, milestones: list[Milestone]):
self.task_id = task_id
self.milestones = {m.id: m for m in milestones}
self._order = [m.id for m in milestones]
def start(self, milestone_id: str):
m = self.milestones[milestone_id]
m.status = MilestoneStatus.IN_PROGRESS
m.started_at = datetime.utcnow().isoformat()
def complete(self, milestone_id: str, screenshot_b64: str = None, notes: str = None):
m = self.milestones[milestone_id]
m.status = MilestoneStatus.COMPLETED
m.completed_at = datetime.utcnow().isoformat()
if screenshot_b64:
m.screenshot_b64 = screenshot_b64
if notes:
m.notes.append(notes)
def fail(self, milestone_id: str, reason: str):
m = self.milestones[milestone_id]
m.status = MilestoneStatus.FAILED
m.notes.append(f"失败原因: {reason}")
def current_milestone(self) -> Optional[Milestone]:
for mid in self._order:
m = self.milestones[mid]
if m.status in (MilestoneStatus.PENDING, MilestoneStatus.IN_PROGRESS):
return m
return None
def progress_summary(self) -> str:
completed = sum(1 for m in self.milestones.values()
if m.status == MilestoneStatus.COMPLETED)
total = len(self.milestones)
lines = [f"任务进度: {completed}/{total}"]
for mid in self._order:
m = self.milestones[mid]
icon = {
MilestoneStatus.PENDING: "○",
MilestoneStatus.IN_PROGRESS: "●",
MilestoneStatus.COMPLETED: "✓",
MilestoneStatus.FAILED: "✗",
MilestoneStatus.SKIPPED: "—",
}[m.status]
lines.append(f" {icon} {m.name}")
return "\n".join(lines)
def save_state(self, path: str):
"""保存任务状态到文件(用于断点续传)"""
state = {
"task_id": self.task_id,
"milestones": [
{
"id": m.id, "name": m.name,
"status": m.status.value,
"completed_at": m.completed_at,
"notes": m.notes
}
for m in self.milestones.values()
]
}
with open(path, "w") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
失败恢复机制
Retry / Fallback / Ask Human 三级策略
from enum import Enum
class RecoveryStrategy(Enum):
RETRY = "retry" # 重试当前步骤
RETRY_DIFFERENT = "retry_different" # 换一种方式重试
FALLBACK = "fallback" # 用备用方案
ASK_HUMAN = "ask_human" # 请求人工介入
ABORT = "abort" # 中止任务
class FailureRecoveryEngine:
"""任务失败恢复引擎"""
# 不同错误类型对应的恢复策略
ERROR_STRATEGIES = {
"screenshot_failed": (RecoveryStrategy.RETRY, 3),
"element_not_found": (RecoveryStrategy.RETRY_DIFFERENT, 2),
"page_load_timeout": (RecoveryStrategy.RETRY, 2),
"captcha_detected": (RecoveryStrategy.ASK_HUMAN, 1),
"permission_denied": (RecoveryStrategy.ASK_HUMAN, 1),
"data_validation_failed": (RecoveryStrategy.ASK_HUMAN, 1),
"network_error": (RecoveryStrategy.RETRY, 3),
}
def __init__(self):
self._attempt_counts: dict[str, int] = {}
async def handle_failure(
self,
error_type: str,
context: dict,
human_callback=None
) -> dict:
"""处理失败并返回恢复指令"""
strategy, max_attempts = self.ERROR_STRATEGIES.get(
error_type,
(RecoveryStrategy.ASK_HUMAN, 1)
)
key = f"{error_type}:{context.get('step', 'unknown')}"
self._attempt_counts[key] = self._attempt_counts.get(key, 0) + 1
if self._attempt_counts[key] > max_attempts:
# 超过重试次数,升级到人工介入
strategy = RecoveryStrategy.ASK_HUMAN
if strategy == RecoveryStrategy.RETRY:
import asyncio
await asyncio.sleep(1.0 * self._attempt_counts[key])
return {"action": "retry", "message": "请重试上一个步骤"}
elif strategy == RecoveryStrategy.RETRY_DIFFERENT:
return {
"action": "retry_different",
"message": f"上一步失败({error_type}),请尝试其他方法完成相同目标。考虑使用不同的选择器、坐标或操作方式。"
}
elif strategy == RecoveryStrategy.ASK_HUMAN:
if human_callback:
guidance = await human_callback(error_type, context)
return {"action": "continue_with_guidance", "guidance": guidance}
return {"action": "pause", "message": f"任务暂停:{error_type},需要人工处理"}
else:
return {"action": "abort", "message": "任务已中止"}
Messages 历史管理
避免 Context 溢出的策略
长时间运行的 Agent 会积累大量消息历史,最终超出 Claude 的上下文窗口限制(200K tokens)。以下是几种常见的历史管理策略:
import anthropic
from typing import Literal
class MessageHistoryManager:
"""Messages 历史管理器,防止 Context 溢出"""
def __init__(
self,
max_messages: int = 50,
strategy: Literal["sliding_window", "summarize", "milestone_reset"] = "sliding_window"
):
self.max_messages = max_messages
self.strategy = strategy
self._messages: list[dict] = []
self._milestone_summaries: list[str] = []
def add(self, message: dict):
self._messages.append(message)
if len(self._messages) > self.max_messages:
self._trim()
def _trim(self):
if self.strategy == "sliding_window":
# 保留第1条(任务描述)和最近 N 条
keep = self.max_messages // 2
self._messages = [self._messages[0]] + self._messages[-keep:]
elif self.strategy == "milestone_reset":
# 达到里程碑时压缩历史为摘要
recent_content = "".join(
msg.get("content", "") if isinstance(msg.get("content"), str) else ""
for msg in self._messages[1:]
)
summary = f"[已完成步骤摘要] 已成功执行 {len(self._messages)-1} 步操作。最后状态: {recent_content[-200:]}"
self._milestone_summaries.append(summary)
# 保留任务描述 + 摘要 + 最近几条
self._messages = [
self._messages[0],
{"role": "user", "content": "\n".join(self._milestone_summaries)},
] + self._messages[-10:]
@property
def messages(self) -> list[dict]:
return self._messages
def estimate_tokens(self) -> int:
"""粗略估算消息历史的 token 数量"""
total_chars = sum(
len(str(msg.get("content", "")))
for msg in self._messages
)
return total_chars // 4 # 粗略估算:4字符≈1token
def should_trim(self, threshold_tokens: int = 150000) -> bool:
return self.estimate_tokens() > threshold_tokens
状态跟踪
持久化 Agent 执行状态
from dataclasses import dataclass, field, asdict
import json
import os
@dataclass
class AgentState:
"""Agent 运行时状态,支持持久化和恢复"""
task_id: str
original_task: str
current_step: int = 0
completed_steps: list[str] = field(default_factory=list)
failed_steps: list[dict] = field(default_factory=list)
context_vars: dict = field(default_factory=dict) # 步骤间传递的变量
status: str = "running" # running/completed/failed/paused
created_at: str = ""
updated_at: str = ""
token_used: int = 0
class AgentStateManager:
"""Agent 状态持久化管理器"""
def __init__(self, state_dir: str = "/tmp/agent_states"):
self.state_dir = state_dir
os.makedirs(state_dir, exist_ok=True)
def save(self, state: AgentState):
state.updated_at = datetime.utcnow().isoformat()
path = os.path.join(self.state_dir, f"{state.task_id}.json")
with open(path, "w") as f:
json.dump(asdict(state), f, ensure_ascii=False, indent=2)
def load(self, task_id: str) -> Optional[AgentState]:
path = os.path.join(self.state_dir, f"{task_id}.json")
if not os.path.exists(path):
return None
with open(path) as f:
data = json.load(f)
return AgentState(**data)
def set_context_var(self, state: AgentState, key: str, value):
"""在步骤间传递变量(如上一步获取的文件路径)"""
state.context_vars[key] = value
self.save(state)
并行 Agent 协作
多 Agent 分工并行执行
import asyncio
from dataclasses import dataclass
@dataclass
class AgentTask:
agent_id: str
task: str
tools: list[dict]
dependencies: list[str] = field(default_factory=list) # 依赖的 agent_id
class MultiAgentOrchestrator:
"""多 Agent 协调器,支持并行和依赖管理"""
def __init__(self, client: anthropic.Anthropic):
self.client = client
self._results: dict[str, str] = {}
async def run_agent(self, agent_task: AgentTask) -> str:
"""执行单个 Agent 任务,将上游依赖结果注入 context"""
# 等待所有依赖完成
dep_results = {}
for dep_id in agent_task.dependencies:
while dep_id not in self._results:
await asyncio.sleep(0.1)
dep_results[dep_id] = self._results[dep_id]
# 构建包含依赖结果的任务
task_with_context = agent_task.task
if dep_results:
context_str = "\n".join(
f"[来自 Agent {k} 的结果]\n{v}"
for k, v in dep_results.items()
)
task_with_context = f"{task_with_context}\n\n上游结果:\n{context_str}"
result = await run_computer_use_agent(
self.client, agent_task.tools, task_with_context
)
self._results[agent_task.agent_id] = result
return result
async def run_all(self, tasks: list[AgentTask]) -> dict[str, str]:
"""并行执行所有任务,自动处理依赖顺序"""
coroutines = [self.run_agent(t) for t in tasks]
await asyncio.gather(*coroutines)
return self._results
# 使用示例:并行处理多个用户报表
async def batch_report_generation(user_ids: list[str]):
orchestrator = MultiAgentOrchestrator(client)
tasks = [
AgentTask(
agent_id=f"user_{uid}",
task=f"生成用户 {uid} 的月度报表并保存到 /output/{uid}_report.pdf",
tools=COMPUTER_USE_TOOLS
)
for uid in user_ids
]
results = await orchestrator.run_all(tasks)
return results
多个 Agent 并行运行时,需要注意资源竞争问题:如果多个 Agent 都操作同一个浏览器实例,会相互干扰。建议为每个并行 Agent 分配独立的 Docker 容器或虚拟桌面实例,确保环境完全隔离。成本方面,N 个并行 Agent 意味着 N 倍的 API 调用费用,需要权衡。
将"规划"(Planner)和"执行"(Executor)分离为两个独立的 Claude 调用,是一种高级架构模式。Planner 使用完整的上下文和工具列表生成执行计划(可以是 JSON 格式的步骤列表);Executor 专注执行单个步骤,只拿到当前步骤的描述和前置结果。这样的好处是:单步执行更专注、更少受历史干扰,整体任务成功率更高,且每步的 Token 消耗更低。
任务分解的深层原理
为什么 Agent 需要外部框架来维持状态?
理解这一点需要先理解 LLM 的无状态本质:每次 API 调用都是独立的,Claude 只能看到当前 messages 数组中的内容。没有外部状态管理,长链任务会面临以下根本性困难:
消息历史管理的关键技术
长任务中管理 messages 数组的增长是工程难点。以下是三种常用策略及其原理:
from dataclasses import dataclass
from typing import List, Dict, Any
import anthropic
class MessageHistoryManager:
"""
管理 Computer Use Agent 的 messages 历史。
策略一:滑动窗口 —— 只保留最近 N 轮完整对话
策略二:摘要压缩 —— 将旧轮次的工具调用压缩为文字摘要
策略三:里程碑快照 —— 完成里程碑后清空历史,注入里程碑摘要
"""
def __init__(
self,
max_messages: int = 40, # 滑动窗口大小(消息条数)
compress_after: int = 30, # 超过多少条时触发摘要压缩
):
self.max_messages = max_messages
self.compress_after = compress_after
self.messages: List[Dict] = []
self.milestone_summaries: List[str] = []
def add(self, role: str, content: Any):
"""添加消息,超出限制时自动触发压缩"""
self.messages.append({"role": role, "content": content})
if len(self.messages) > self.compress_after:
self._compress_old_messages()
def _compress_old_messages(self):
"""
将前半部分消息压缩为一条摘要文本消息。
压缩前: [msg1, msg2, ..., msg30, msg31, ..., msg40]
压缩后: [summary_msg, msg21, ..., msg40] (保留最新20条)
"""
keep_recent = self.max_messages // 2 # 保留最新的一半
old_messages = self.messages[:-keep_recent]
recent_messages = self.messages[-keep_recent:]
# 将旧消息提炼为摘要(实际中可用另一个 LLM 调用生成摘要)
summary_parts = []
for msg in old_messages:
if isinstance(msg["content"], list):
# 处理包含 tool_use / tool_result 的复杂消息
for block in msg["content"]:
if isinstance(block, dict) and block.get("type") == "tool_use":
# 提取工具调用的关键信息
tool_name = block.get("name", "unknown")
inp = block.get("input", {})
summary_parts.append(f"[已执行] {tool_name}: {str(inp)[:100]}")
elif isinstance(msg["content"], str):
# 纯文本消息直接截断保留
summary_parts.append(msg["content"][:200])
summary_text = "[历史操作摘要]\n" + "\n".join(summary_parts[:20])
summary_msg = {"role": "user", "content": summary_text}
# 重置 messages:摘要 + 最新的消息
self.messages = [summary_msg] + recent_messages
def snapshot_milestone(self, milestone_name: str, result_summary: str):
"""
完成里程碑时拍快照:清空所有消息,只保留里程碑摘要。
适合任务分为明确阶段的场景(如 阶段1→阶段2→阶段3)。
"""
self.milestone_summaries.append(f"✓ {milestone_name}: {result_summary}")
context = "\n".join(self.milestone_summaries)
# 清空历史,注入里程碑上下文
self.messages = [{
"role": "user",
"content": f"已完成进度:\n{context}\n\n继续下一阶段。"
}]
def get_messages(self) -> List[Dict]:
"""获取当前 messages 数组,供 API 调用使用"""
return self.messages
失败恢复的策略分类
不同类型的失败需要不同的恢复策略,盲目重试往往无效甚至有害:
常见误区:并行 Agent 性能假设
实际上,以下情况下并行 Agent 可能更慢或更昂贵:
- Anthropic API 并发限制:免费/低级账户有严格的并发请求上限,多个 Agent 同时请求会导致 429 速率限制,反而比串行等待更久
- 共享资源竞争:多个 Agent 操作同一个 Excel 文件、同一个数据库连接,需要加锁等待,实际并发度降低到 1
- 线性依赖任务:如果任务 B 必须等任务 A 的输出,并行毫无意义——只增加了调度复杂度
- 成本线性放大:N 个并行 Agent 意味着 N 倍的 API Token 消耗,如果总时间减少不到 N 倍,单位时间成本实际是上升的
只有真正独立的批量任务(如同时处理 100 个互不相关的订单)才适合并行 Agent 架构。
- 任务分解是成功率的基础:直接把复杂任务交给 Agent 失败率极高;层级分解、里程碑式分解、并行分解各有适用场景
- 两阶段执行模式(规划→执行)能显著提高复杂任务成功率,Planner 和 Executor 职责分离,让每步执行更专注
- 消息历史必须主动管理:长任务不能无限累积 messages;滑动窗口、摘要压缩、里程碑快照是三种常用策略,各有权衡
- 里程碑检查点将失败检测提前,避免错误传播到后续步骤才被发现;每个检查点也是人工干预的机会
- 失败类型决定恢复策略:瞬时失败用重试,状态失败用备用路径,逻辑失败用回滚重规划,环境失败用容器重置
- 并行 Agent 不是万能加速:仅适合真正独立的批量任务;资源竞争和 API 并发限制可能让并行比串行更慢
- 目标漂移是长任务的隐形杀手:在每轮请求中重复注入当前阶段目标,防止 Claude 因上下文过长而遗忘原始目标