Chapter 10

生产部署:监控、成本与可靠性

将 Computer Use Agent 从实验推向生产,构建可观测、可扩展、经济高效的系统

Docker 容器化生产部署

生产级 Dockerfile

生产环境的容器设计与开发环境有几个关键区别:使用非 root 用户、多阶段构建减小镜像体积、健康检查、资源限制:

# Dockerfile.production - 生产级 Computer Use 容器
FROM ubuntu:22.04 AS base

# 避免交互式安装
ENV DEBIAN_FRONTEND=noninteractive
ENV DISPLAY=:99

# 安装系统依赖(合并 RUN 减少层数)
RUN apt-get update && apt-get install -y --no-install-recommends \
    xvfb x11vnc fluxbox \
    python3.11 python3.11-pip \
    firefox-esr \
    curl wget \
    && rm -rf /var/lib/apt/lists/* \
    && ln -s /usr/bin/python3.11 /usr/local/bin/python3

# 创建专用用户(非 root)
RUN useradd -m -u 1001 -s /bin/bash agent \
    && mkdir -p /app /tmp/agent_output \
    && chown -R agent:agent /app /tmp/agent_output

FROM base AS python-deps

# 安装 Python 依赖(利用缓存层)
COPY requirements.txt /tmp/
RUN pip3 install --no-cache-dir -r /tmp/requirements.txt

FROM python-deps AS production

USER agent
WORKDIR /app

COPY --chown=agent:agent . .

# 健康检查:验证 Xvfb 进程是否运行
HEALTHCHECK --interval=30s --timeout=10s --start-period=15s --retries=3 \
    CMD pgrep Xvfb || exit 1

EXPOSE 5900 8080

ENTRYPOINT ["/app/entrypoint.sh"]

VNC 远程桌面 vs 无头模式对比

模式 适用场景 优势 劣势
VNC 远程桌面 调试、监控、人工干预 实时可见、便于调试、支持人工接管 额外网络传输、安全风险(需加密)
无头模式(Xvfb) 全自动生产运行 资源占用低、无安全隐患 无法实时查看、出问题只能看日志
混合模式(推荐) 生产+按需调试 正常运行无头、出问题时启用 VNC 配置相对复杂
import os
import subprocess


class DisplayManager:
    """管理虚拟显示器,支持按需开关 VNC"""

    def __init__(self, display_num: int = 99, width: int = 1280, height: int = 800):
        self.display_num = display_num
        self.width = width
        self.height = height
        self._xvfb_proc = None
        self._vnc_proc = None

    def start(self, enable_vnc: bool = False):
        # 启动虚拟显示器
        self._xvfb_proc = subprocess.Popen([
            "Xvfb", f":{self.display_num}",
            "-screen", "0", f"{self.width}x{self.height}x24"
        ])
        os.environ["DISPLAY"] = f":{self.display_num}"

        if enable_vnc:
            self.start_vnc()

    def start_vnc(self, password: str = None, port: int = 5900):
        """按需启动 VNC 服务器(用于调试)"""
        cmd = [
            "x11vnc",
            "-display", f":{self.display_num}",
            "-rfbport", str(port),
            "-listen", "localhost",   # 只监听本地,配合 SSH 隧道
            "-xkb", "-forever"
        ]
        if password:
            cmd += ["-passwd", password]
        else:
            cmd.append("-nopw")
        self._vnc_proc = subprocess.Popen(cmd)

    def stop_vnc(self):
        """关闭 VNC(节约资源)"""
        if self._vnc_proc:
            self._vnc_proc.terminate()
            self._vnc_proc = None

Kubernetes 弹性伸缩

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: computer-use-agent
  namespace: ai-agents
spec:
  replicas: 3
  selector:
    matchLabels:
      app: computer-use-agent
  template:
    metadata:
      labels:
        app: computer-use-agent
    spec:
      containers:
      - name: agent
        image: your-registry/computer-use-agent:latest
        resources:
          requests:
            cpu: "500m"
            memory: "1Gi"
          limits:
            cpu: "2000m"
            memory: "4Gi"
        env:
        - name: ANTHROPIC_API_KEY
          valueFrom:
            secretKeyRef:
              name: anthropic-secret
              key: api-key
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 15
          periodSeconds: 30
---
# 基于队列长度自动伸缩
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: computer-use-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: computer-use-agent
  minReplicas: 1
  maxReplicas: 20
  metrics:
  - type: External
    external:
      metric:
        name: task_queue_length
      target:
        type: AverageValue
        averageValue: "5"  # 每个 Pod 处理 5 个任务

成本控制策略

Token 消耗优化

截图压缩
使用 JPEG 75 质量替代 PNG,分辨率限制在 1280×720。每张截图从 ~150KB 压缩到 ~30KB,Token 消耗从 ~1500 降至 ~600。一个 10 步任务可节省 9000 tokens,约 $0.27。
截图缓存
500ms 内的重复截图请求直接返回缓存。当 Agent 需要多次确认同一页面状态时,避免重复抓图和传输。
按需截图
不要在每次操作前后都截图。只在 Claude 明确请求时才截图,或在关键里程碑节点截图确认。减少 60% 以上的不必要截图。
工具优先于截图
数据查询用 query_database 工具,文件读取用 bash/text_editor,不要用截图查看文本数据。查询结果作为文本传输比截图便宜 10 倍以上。
批量操作
将多个小任务合并为一个会话执行,避免重复的系统提示 Token 消耗。相同任务在同一会话中顺序执行比分开执行节省约 30% 的 Token。

成本监控实现

from dataclasses import dataclass
from datetime import datetime, timedelta
import json


@dataclass
class CostRecord:
    task_id: str
    model: str
    input_tokens: int
    output_tokens: int
    screenshots_count: int
    timestamp: str

    @property
    def cost_usd(self) -> float:
        """估算美元成本(基于 claude-opus-4-5 定价)"""
        # Claude Opus: $15/M input, $75/M output
        input_cost = self.input_tokens / 1_000_000 * 15
        output_cost = self.output_tokens / 1_000_000 * 75
        return input_cost + output_cost


class CostMonitor:
    """API 成本监控器"""

    def __init__(
        self,
        daily_budget_usd: float = 50.0,
        alert_threshold: float = 0.8  # 80% 触发告警
    ):
        self.daily_budget = daily_budget_usd
        self.alert_threshold = alert_threshold
        self._records: list[CostRecord] = []

    def record(self, record: CostRecord):
        self._records.append(record)
        self._check_budget()

    def daily_cost(self) -> float:
        today = datetime.utcnow().date()
        return sum(
            r.cost_usd for r in self._records
            if r.timestamp.startswith(str(today))
        )

    def _check_budget(self):
        cost = self.daily_cost()
        ratio = cost / self.daily_budget
        if ratio >= 1.0:
            raise RuntimeError(f"每日预算超限!已消耗 ${cost:.2f} / ${self.daily_budget:.2f}")
        elif ratio >= self.alert_threshold:
            print(f"[成本告警] 已消耗每日预算的 {ratio*100:.1f}%: ${cost:.2f}")

    def cost_report(self) -> dict:
        """生成成本报告"""
        return {
            "daily_cost_usd": self.daily_cost(),
            "daily_budget_usd": self.daily_budget,
            "total_tasks": len(self._records),
            "avg_cost_per_task": self.daily_cost() / max(len(self._records), 1),
            "total_screenshots": sum(r.screenshots_count for r in self._records)
        }

监控指标体系

关键监控指标

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time


# Prometheus 监控指标定义
TASK_COUNTER = Counter(
    "computer_use_tasks_total",
    "Total number of tasks",
    ["status", "task_type"]  # labels
)

TASK_DURATION = Histogram(
    "computer_use_task_duration_seconds",
    "Task execution duration",
    buckets=[10, 30, 60, 120, 300, 600, 1800]
)

TOKEN_COUNTER = Counter(
    "computer_use_tokens_total",
    "Total tokens consumed",
    ["type"]  # input/output
)

SCREENSHOT_COUNTER = Counter(
    "computer_use_screenshots_total",
    "Total screenshots taken"
)

ACTIVE_TASKS = Gauge(
    "computer_use_active_tasks",
    "Currently running tasks"
)

TOOL_CALL_COUNTER = Counter(
    "computer_use_tool_calls_total",
    "Tool call counts",
    ["tool_name", "status"]
)


class MetricsMiddleware:
    """在 Agent 执行循环中收集监控指标"""

    def track_task(self, task_type: str):
        """上下文管理器:追踪单个任务的指标"""
        class TaskTracker:
            def __enter__(self_inner):
                ACTIVE_TASKS.inc()
                self_inner.start_time = time.time()
                return self_inner

            def __exit__(self_inner, exc_type, exc_val, exc_tb):
                ACTIVE_TASKS.dec()
                duration = time.time() - self_inner.start_time
                TASK_DURATION.observe(duration)
                status = "error" if exc_type else "success"
                TASK_COUNTER.labels(status=status, task_type=task_type).inc()

        return TaskTracker()

    def record_api_usage(self, usage):
        """记录 Anthropic API 用量"""
        TOKEN_COUNTER.labels(type="input").inc(usage.input_tokens)
        TOKEN_COUNTER.labels(type="output").inc(usage.output_tokens)

    def record_tool_call(self, tool_name: str, success: bool):
        """记录工具调用"""
        status = "success" if success else "error"
        TOOL_CALL_COUNTER.labels(tool_name=tool_name, status=status).inc()
        if tool_name == "computer":
            SCREENSHOT_COUNTER.inc()


# 暴露 Prometheus 指标端点
start_http_server(8080)

可观测性:完整操作录像

任务执行录像系统

import cv2
import numpy as np
import base64
from datetime import datetime


class TaskRecorder:
    """任务执行录像器:将所有截图帧合并为视频"""

    def __init__(
        self,
        output_dir: str,
        fps: int = 2,      # 2帧/秒,足够回放操作过程
        width: int = 1280,
        height: int = 800
    ):
        self.output_dir = output_dir
        self.fps = fps
        self.width = width
        self.height = height
        self._frames: list[tuple[float, bytes]] = []  # (timestamp, jpeg_bytes)
        self._annotations: list[dict] = []

    def add_frame(self, screenshot_b64: str, annotation: str = None):
        """添加一帧截图"""
        frame_bytes = base64.b64decode(screenshot_b64)
        self._frames.append((time.time(), frame_bytes))
        if annotation:
            self._annotations.append({
                "frame_index": len(self._frames) - 1,
                "text": annotation,
                "timestamp": datetime.utcnow().isoformat()
            })

    def save(self, task_id: str) -> str:
        """保存录像为 MP4 文件"""
        output_path = f"{self.output_dir}/{task_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(output_path, fourcc, self.fps, (self.width, self.height))

        for i, (ts, frame_bytes) in enumerate(self._frames):
            # 解码 JPEG 为 numpy 数组
            nparr = np.frombuffer(frame_bytes, np.uint8)
            img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
            if img is not None:
                # 调整到目标分辨率
                img = cv2.resize(img, (self.width, self.height))
                # 叠加步骤标注
                annotation = next(
                    (a["text"] for a in self._annotations if a["frame_index"] == i),
                    None
                )
                if annotation:
                    cv2.putText(img, annotation[:80], (10, 30),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                out.write(img)

        out.release()
        return output_path

错误告警系统

from typing import Optional
import httpx


class AlertManager:
    """生产告警管理器"""

    def __init__(
        self,
        slack_webhook: Optional[str] = None,
        dingtalk_webhook: Optional[str] = None
    ):
        self.slack_webhook = slack_webhook
        self.dingtalk_webhook = dingtalk_webhook

    async def alert(
        self,
        title: str,
        message: str,
        level: str = "warning",  # info/warning/critical
        task_id: str = None
    ):
        emoji = {"info": "ℹ️", "warning": "⚠️", "critical": "🚨"}.get(level, "ℹ️")
        full_message = f"{emoji} [{level.upper()}] {title}\n{message}"
        if task_id:
            full_message += f"\nTask ID: {task_id}"

        if self.slack_webhook:
            async with httpx.AsyncClient() as client:
                await client.post(
                    self.slack_webhook,
                    json={"text": full_message}
                )

        if self.dingtalk_webhook:
            async with httpx.AsyncClient() as client:
                await client.post(
                    self.dingtalk_webhook,
                    json={
                        "msgtype": "text",
                        "text": {"content": full_message}
                    }
                )


# 告警触发示例
async def monitor_and_alert(metrics, alert_manager: AlertManager):
    # 任务成功率低于 90% 触发告警
    success_rate = metrics.get_success_rate(window_minutes=60)
    if success_rate < 0.9:
        await alert_manager.alert(
            title="任务成功率异常",
            message=f"过去1小时任务成功率 {success_rate*100:.1f}%,低于阈值 90%",
            level="critical"
        )

    # 平均任务时长超过 10 分钟触发告警
    avg_duration = metrics.get_avg_duration(window_minutes=60)
    if avg_duration > 600:
        await alert_manager.alert(
            title="任务执行过慢",
            message=f"平均任务时长 {avg_duration:.0f}s,超过预期 600s",
            level="warning"
        )

商业化 Computer Use 服务

主流云端方案对比

Anthropic Computer Use(自建)
直接调用 Anthropic API + 自建 Docker 容器。优点:完全可控、无数据泄露风险;缺点:需要维护基础设施,运维成本高。适合对数据安全要求高的企业。
Browserbase
专为 AI Agent 设计的云端浏览器基础设施。提供托管的 Playwright/Puppeteer 环境,内置 IP 轮换、反爬虫绕过、会话录制。按用量计费,适合浏览器自动化场景。
Steel.dev
开源的 AI 浏览器基础设施,支持自托管或使用云版本。提供 REST API 管理浏览器会话,与 Playwright/Puppeteer 兼容。更好的成本控制,适合中小规模部署。
E2B Sandbox
安全的代码执行沙箱,支持在云端运行 Python 代码和桌面应用。提供完整的 Linux 环境,毫秒级冷启动,适合需要执行代码的 Computer Use 场景。
延迟优化实践建议

生产部署核心原理深讲

容器化的本质:环境一致性与隔离

为什么生产环境必须用 Docker 容器而不是直接在宿主机运行?理解这一点需要理解 Computer Use Agent 的特殊性:

显示服务器依赖(Display Server Dependency)
Computer Use Agent 需要一个完整的图形桌面环境来截图和控制 UI。在宿主机上,这个环境受物理显示器、桌面会话、登录状态等因素影响,极不稳定。Xvfb(X Virtual Framebuffer)是纯软件模拟的显示器,完全独立于物理硬件,在任何 Linux 环境(包括无 GPU 的服务器)都能运行。每个容器有独立的 DISPLAY=:99(或其他编号),互不干扰。
浏览器状态隔离(Browser State Isolation)
不同任务不能共享同一个浏览器实例——cookies、localState、表单状态会相互污染。容器化确保每个任务从一个全新的浏览器 Profile 开始,结束后整个容器销毁,没有状态残留。这对需要处理多个用户账户的任务至关重要(如客服自动化)。
资源上限强制(Resource Capping)
CPU limits: 2000m 和 Memory limits: 4Gi 不只是"建议"——Kubernetes 的 cgroup 机制在容器超出上限时会强制限速(CPU)或 OOM Kill(内存)。这防止单个失控的 Agent(例如卡在无限截图循环)耗尽整个节点资源,影响同节点其他 Pod。
不可变基础设施(Immutable Infrastructure)
生产镜像一旦构建并推送到 Registry,就不能再修改(只能新建版本)。这意味着每次部署都是全新镜像,不存在"服务器配置漂移"问题。结合 Git 管理 Dockerfile,每个版本的运行环境都可精确追溯。

Kubernetes HPA 的伸缩原理

HPA(Horizontal Pod Autoscaler)基于外部指标(如任务队列长度)自动调整 Pod 数量,其核心控制逻辑如下:

目标副本数 = ceil(当前副本数 × (当前指标值 / 目标指标值))

示例:
  当前副本数 = 3
  当前队列长度 = 30
  目标:每个 Pod 处理 5 个任务
  目标副本数 = ceil(3 × (30/5)) = ceil(18) = 18 → 扩容到 18 个 Pod

当队列消耗完:
  当前队列长度 = 2
  目标副本数 = ceil(18 × (2/5)) = ceil(7.2) = 8 → 缩容到 8 个 Pod
  (缩容有 stabilizationWindow 防止抖动,默认 5 分钟)
HPA 的冷启动延迟问题

Computer Use 容器启动一个完整的 Xvfb + 浏览器环境需要 15-30 秒。当任务突然涌入,HPA 触发扩容后新 Pod 还需要等待初始化,这段时间任务仍在排队。解决方案:

Prometheus 监控的可观测性三支柱

指标(Metrics)
数值型时间序列数据,如 QPS、延迟分位数、错误率、Token 消耗速率。适合告警和趋势分析。Prometheus Counter/Histogram/Gauge 对应不同指标类型:Counter 只增不减(任务总数),Histogram 统计分布(任务时长分布),Gauge 可升可降(当前活跃任务数)。
日志(Logs)
结构化事件记录,如每次工具调用的入参出参、每个里程碑的完成时间、错误的完整堆栈。适合问题排查。生产中应使用 JSON 格式日志(便于 ELK/Loki 解析),包含 task_idstepduration_ms 字段,方便按任务 ID 追溯完整执行路径。
追踪(Traces)
分布式请求链路追踪,记录一个任务从触发到完成经过的每个服务和函数。适合复杂微服务架构中定位性能瓶颈。使用 OpenTelemetry 为每个任务生成 trace_id,关联 Anthropic API 调用、工具执行、截图采集等各环节的耗时。

成本控制的数学模型

理解 Computer Use 的成本构成,才能做出有效的优化决策:

单次任务成本 = Σ(每轮 API 调用成本)

每轮 API 调用成本 =
  input_tokens × $15/M  +  output_tokens × $75/M

input_tokens = 系统提示词 + 消息历史 + 所有截图 token
            = ~500  +  累积消息  +  每张截图600-1500 tokens

典型 10 步任务估算:
  系统提示词:500 tokens
  每步消息历史(累积):平均 2000 tokens  → 10步共 20,000 tokens
  每步2张截图 × 1000 tokens:20,000 tokens  → 10步共 200,000 tokens
  每步输出:平均 200 tokens  → 10步共 2,000 tokens

  总 input:≈ 220,500 tokens  → $3.31
  总 output:≈ 2,000 tokens   → $0.15
  总成本:≈ $3.46 / 任务

优化后(截图压缩 + 消息历史管理):
  截图 token 降至 600/张  → 120,000 tokens
  消息历史压缩 50%  → 10,000 tokens
  总 input:≈ 130,500 tokens  → $1.96
  节省 ≈ $1.50 / 任务(43% 降低)

常见生产问题与排查

容器 OOM Kill(内存溢出)
症状:Pod 频繁重启,kubectl describe pod 显示 OOMKilled。原因:截图缓存积累(每张 JPEG ~30KB,1000张截图 = 30MB,但未压缩的 numpy 数组是原来的 10 倍)。解决:限制截图历史长度,定期清空缓存,或使用流式处理而非在内存中累积所有帧。
Xvfb 进程死亡
症状:截图工具返回 "Cannot connect to display"。原因:Xvfb 意外退出(通常是内存不足或信号量冲突)。解决:使用 supervisords6-overlay 监控并自动重启 Xvfb,健康检查验证 Xvfb 存活。
浏览器渲染空白
症状:截图全是白屏,但 Xvfb 和浏览器进程都在。原因:GPU 加速在无头环境报错,fallback 到软件渲染但某些页面需要 GPU。解决:启动浏览器时加 --disable-gpu --disable-software-rasterizer,强制纯软件渲染。
API 429 速率限制
症状:多任务并行时频繁出现 429 错误。原因:超出 Anthropic 账户的 RPM(每分钟请求数)或 TPM(每分钟 token 数)限制。解决:实现令牌桶(Token Bucket)限速器,按账户等级控制并发度;升级 Anthropic 账户级别;在任务队列层面限制同时活跃的任务数。
第10章小结
恭喜完成 Computer Use 教程!

你已经系统掌握了 Computer Use 多模态 Agent 的完整技术栈:从 API 原理到截图与操作控制,从浏览器自动化到桌面应用控制,从安全沙箱到工具集成,从任务规划到生产部署。这套技术将让你能够构建出真正具有"手眼协调"能力的 AI Agent,自动化任何人类能够在屏幕上完成的工作。下一步:动手实践,从一个真实业务场景的小自动化任务开始!