Docker 容器化生产部署
生产级 Dockerfile
生产环境的容器设计与开发环境有几个关键区别:使用非 root 用户、多阶段构建减小镜像体积、健康检查、资源限制:
# Dockerfile.production - 生产级 Computer Use 容器
FROM ubuntu:22.04 AS base
# 避免交互式安装
ENV DEBIAN_FRONTEND=noninteractive
ENV DISPLAY=:99
# 安装系统依赖(合并 RUN 减少层数)
RUN apt-get update && apt-get install -y --no-install-recommends \
xvfb x11vnc fluxbox \
python3.11 python3.11-pip \
firefox-esr \
curl wget \
&& rm -rf /var/lib/apt/lists/* \
&& ln -s /usr/bin/python3.11 /usr/local/bin/python3
# 创建专用用户(非 root)
RUN useradd -m -u 1001 -s /bin/bash agent \
&& mkdir -p /app /tmp/agent_output \
&& chown -R agent:agent /app /tmp/agent_output
FROM base AS python-deps
# 安装 Python 依赖(利用缓存层)
COPY requirements.txt /tmp/
RUN pip3 install --no-cache-dir -r /tmp/requirements.txt
FROM python-deps AS production
USER agent
WORKDIR /app
COPY --chown=agent:agent . .
# 健康检查:验证 Xvfb 进程是否运行
HEALTHCHECK --interval=30s --timeout=10s --start-period=15s --retries=3 \
CMD pgrep Xvfb || exit 1
EXPOSE 5900 8080
ENTRYPOINT ["/app/entrypoint.sh"]
VNC 远程桌面 vs 无头模式对比
| 模式 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|
| VNC 远程桌面 | 调试、监控、人工干预 | 实时可见、便于调试、支持人工接管 | 额外网络传输、安全风险(需加密) |
| 无头模式(Xvfb) | 全自动生产运行 | 资源占用低、无安全隐患 | 无法实时查看、出问题只能看日志 |
| 混合模式(推荐) | 生产+按需调试 | 正常运行无头、出问题时启用 VNC | 配置相对复杂 |
import os
import subprocess
class DisplayManager:
"""管理虚拟显示器,支持按需开关 VNC"""
def __init__(self, display_num: int = 99, width: int = 1280, height: int = 800):
self.display_num = display_num
self.width = width
self.height = height
self._xvfb_proc = None
self._vnc_proc = None
def start(self, enable_vnc: bool = False):
# 启动虚拟显示器
self._xvfb_proc = subprocess.Popen([
"Xvfb", f":{self.display_num}",
"-screen", "0", f"{self.width}x{self.height}x24"
])
os.environ["DISPLAY"] = f":{self.display_num}"
if enable_vnc:
self.start_vnc()
def start_vnc(self, password: str = None, port: int = 5900):
"""按需启动 VNC 服务器(用于调试)"""
cmd = [
"x11vnc",
"-display", f":{self.display_num}",
"-rfbport", str(port),
"-listen", "localhost", # 只监听本地,配合 SSH 隧道
"-xkb", "-forever"
]
if password:
cmd += ["-passwd", password]
else:
cmd.append("-nopw")
self._vnc_proc = subprocess.Popen(cmd)
def stop_vnc(self):
"""关闭 VNC(节约资源)"""
if self._vnc_proc:
self._vnc_proc.terminate()
self._vnc_proc = None
Kubernetes 弹性伸缩
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: computer-use-agent
namespace: ai-agents
spec:
replicas: 3
selector:
matchLabels:
app: computer-use-agent
template:
metadata:
labels:
app: computer-use-agent
spec:
containers:
- name: agent
image: your-registry/computer-use-agent:latest
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2000m"
memory: "4Gi"
env:
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: anthropic-secret
key: api-key
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 15
periodSeconds: 30
---
# 基于队列长度自动伸缩
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: computer-use-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: computer-use-agent
minReplicas: 1
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: task_queue_length
target:
type: AverageValue
averageValue: "5" # 每个 Pod 处理 5 个任务
成本控制策略
Token 消耗优化
截图压缩
使用 JPEG 75 质量替代 PNG,分辨率限制在 1280×720。每张截图从 ~150KB 压缩到 ~30KB,Token 消耗从 ~1500 降至 ~600。一个 10 步任务可节省 9000 tokens,约 $0.27。
截图缓存
500ms 内的重复截图请求直接返回缓存。当 Agent 需要多次确认同一页面状态时,避免重复抓图和传输。
按需截图
不要在每次操作前后都截图。只在 Claude 明确请求时才截图,或在关键里程碑节点截图确认。减少 60% 以上的不必要截图。
工具优先于截图
数据查询用 query_database 工具,文件读取用 bash/text_editor,不要用截图查看文本数据。查询结果作为文本传输比截图便宜 10 倍以上。
批量操作
将多个小任务合并为一个会话执行,避免重复的系统提示 Token 消耗。相同任务在同一会话中顺序执行比分开执行节省约 30% 的 Token。
成本监控实现
from dataclasses import dataclass
from datetime import datetime, timedelta
import json
@dataclass
class CostRecord:
task_id: str
model: str
input_tokens: int
output_tokens: int
screenshots_count: int
timestamp: str
@property
def cost_usd(self) -> float:
"""估算美元成本(基于 claude-opus-4-5 定价)"""
# Claude Opus: $15/M input, $75/M output
input_cost = self.input_tokens / 1_000_000 * 15
output_cost = self.output_tokens / 1_000_000 * 75
return input_cost + output_cost
class CostMonitor:
"""API 成本监控器"""
def __init__(
self,
daily_budget_usd: float = 50.0,
alert_threshold: float = 0.8 # 80% 触发告警
):
self.daily_budget = daily_budget_usd
self.alert_threshold = alert_threshold
self._records: list[CostRecord] = []
def record(self, record: CostRecord):
self._records.append(record)
self._check_budget()
def daily_cost(self) -> float:
today = datetime.utcnow().date()
return sum(
r.cost_usd for r in self._records
if r.timestamp.startswith(str(today))
)
def _check_budget(self):
cost = self.daily_cost()
ratio = cost / self.daily_budget
if ratio >= 1.0:
raise RuntimeError(f"每日预算超限!已消耗 ${cost:.2f} / ${self.daily_budget:.2f}")
elif ratio >= self.alert_threshold:
print(f"[成本告警] 已消耗每日预算的 {ratio*100:.1f}%: ${cost:.2f}")
def cost_report(self) -> dict:
"""生成成本报告"""
return {
"daily_cost_usd": self.daily_cost(),
"daily_budget_usd": self.daily_budget,
"total_tasks": len(self._records),
"avg_cost_per_task": self.daily_cost() / max(len(self._records), 1),
"total_screenshots": sum(r.screenshots_count for r in self._records)
}
监控指标体系
关键监控指标
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Prometheus 监控指标定义
TASK_COUNTER = Counter(
"computer_use_tasks_total",
"Total number of tasks",
["status", "task_type"] # labels
)
TASK_DURATION = Histogram(
"computer_use_task_duration_seconds",
"Task execution duration",
buckets=[10, 30, 60, 120, 300, 600, 1800]
)
TOKEN_COUNTER = Counter(
"computer_use_tokens_total",
"Total tokens consumed",
["type"] # input/output
)
SCREENSHOT_COUNTER = Counter(
"computer_use_screenshots_total",
"Total screenshots taken"
)
ACTIVE_TASKS = Gauge(
"computer_use_active_tasks",
"Currently running tasks"
)
TOOL_CALL_COUNTER = Counter(
"computer_use_tool_calls_total",
"Tool call counts",
["tool_name", "status"]
)
class MetricsMiddleware:
"""在 Agent 执行循环中收集监控指标"""
def track_task(self, task_type: str):
"""上下文管理器:追踪单个任务的指标"""
class TaskTracker:
def __enter__(self_inner):
ACTIVE_TASKS.inc()
self_inner.start_time = time.time()
return self_inner
def __exit__(self_inner, exc_type, exc_val, exc_tb):
ACTIVE_TASKS.dec()
duration = time.time() - self_inner.start_time
TASK_DURATION.observe(duration)
status = "error" if exc_type else "success"
TASK_COUNTER.labels(status=status, task_type=task_type).inc()
return TaskTracker()
def record_api_usage(self, usage):
"""记录 Anthropic API 用量"""
TOKEN_COUNTER.labels(type="input").inc(usage.input_tokens)
TOKEN_COUNTER.labels(type="output").inc(usage.output_tokens)
def record_tool_call(self, tool_name: str, success: bool):
"""记录工具调用"""
status = "success" if success else "error"
TOOL_CALL_COUNTER.labels(tool_name=tool_name, status=status).inc()
if tool_name == "computer":
SCREENSHOT_COUNTER.inc()
# 暴露 Prometheus 指标端点
start_http_server(8080)
可观测性:完整操作录像
任务执行录像系统
import cv2
import numpy as np
import base64
from datetime import datetime
class TaskRecorder:
"""任务执行录像器:将所有截图帧合并为视频"""
def __init__(
self,
output_dir: str,
fps: int = 2, # 2帧/秒,足够回放操作过程
width: int = 1280,
height: int = 800
):
self.output_dir = output_dir
self.fps = fps
self.width = width
self.height = height
self._frames: list[tuple[float, bytes]] = [] # (timestamp, jpeg_bytes)
self._annotations: list[dict] = []
def add_frame(self, screenshot_b64: str, annotation: str = None):
"""添加一帧截图"""
frame_bytes = base64.b64decode(screenshot_b64)
self._frames.append((time.time(), frame_bytes))
if annotation:
self._annotations.append({
"frame_index": len(self._frames) - 1,
"text": annotation,
"timestamp": datetime.utcnow().isoformat()
})
def save(self, task_id: str) -> str:
"""保存录像为 MP4 文件"""
output_path = f"{self.output_dir}/{task_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, self.fps, (self.width, self.height))
for i, (ts, frame_bytes) in enumerate(self._frames):
# 解码 JPEG 为 numpy 数组
nparr = np.frombuffer(frame_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is not None:
# 调整到目标分辨率
img = cv2.resize(img, (self.width, self.height))
# 叠加步骤标注
annotation = next(
(a["text"] for a in self._annotations if a["frame_index"] == i),
None
)
if annotation:
cv2.putText(img, annotation[:80], (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
out.write(img)
out.release()
return output_path
错误告警系统
from typing import Optional
import httpx
class AlertManager:
"""生产告警管理器"""
def __init__(
self,
slack_webhook: Optional[str] = None,
dingtalk_webhook: Optional[str] = None
):
self.slack_webhook = slack_webhook
self.dingtalk_webhook = dingtalk_webhook
async def alert(
self,
title: str,
message: str,
level: str = "warning", # info/warning/critical
task_id: str = None
):
emoji = {"info": "ℹ️", "warning": "⚠️", "critical": "🚨"}.get(level, "ℹ️")
full_message = f"{emoji} [{level.upper()}] {title}\n{message}"
if task_id:
full_message += f"\nTask ID: {task_id}"
if self.slack_webhook:
async with httpx.AsyncClient() as client:
await client.post(
self.slack_webhook,
json={"text": full_message}
)
if self.dingtalk_webhook:
async with httpx.AsyncClient() as client:
await client.post(
self.dingtalk_webhook,
json={
"msgtype": "text",
"text": {"content": full_message}
}
)
# 告警触发示例
async def monitor_and_alert(metrics, alert_manager: AlertManager):
# 任务成功率低于 90% 触发告警
success_rate = metrics.get_success_rate(window_minutes=60)
if success_rate < 0.9:
await alert_manager.alert(
title="任务成功率异常",
message=f"过去1小时任务成功率 {success_rate*100:.1f}%,低于阈值 90%",
level="critical"
)
# 平均任务时长超过 10 分钟触发告警
avg_duration = metrics.get_avg_duration(window_minutes=60)
if avg_duration > 600:
await alert_manager.alert(
title="任务执行过慢",
message=f"平均任务时长 {avg_duration:.0f}s,超过预期 600s",
level="warning"
)
商业化 Computer Use 服务
主流云端方案对比
Anthropic Computer Use(自建)
直接调用 Anthropic API + 自建 Docker 容器。优点:完全可控、无数据泄露风险;缺点:需要维护基础设施,运维成本高。适合对数据安全要求高的企业。
Browserbase
专为 AI Agent 设计的云端浏览器基础设施。提供托管的 Playwright/Puppeteer 环境,内置 IP 轮换、反爬虫绕过、会话录制。按用量计费,适合浏览器自动化场景。
Steel.dev
开源的 AI 浏览器基础设施,支持自托管或使用云版本。提供 REST API 管理浏览器会话,与 Playwright/Puppeteer 兼容。更好的成本控制,适合中小规模部署。
E2B Sandbox
安全的代码执行沙箱,支持在云端运行 Python 代码和桌面应用。提供完整的 Linux 环境,毫秒级冷启动,适合需要执行代码的 Computer Use 场景。
延迟优化实践建议
- 预热容器池:保持若干"热"容器待命,避免任务触发时的冷启动延迟(节省 5-15 秒)
- Region 就近部署:将 Agent 容器部署在距 Anthropic API 服务器最近的 AWS us-east-1 区域
- 截图传输压缩:WebP 格式比 JPEG 进一步减小 20-30%,在带宽受限环境中效果显著
- 连接池复用:复用 Anthropic API 的 HTTP 连接,避免每次请求的 TLS 握手开销
生产部署核心原理深讲
容器化的本质:环境一致性与隔离
为什么生产环境必须用 Docker 容器而不是直接在宿主机运行?理解这一点需要理解 Computer Use Agent 的特殊性:
显示服务器依赖(Display Server Dependency)
Computer Use Agent 需要一个完整的图形桌面环境来截图和控制 UI。在宿主机上,这个环境受物理显示器、桌面会话、登录状态等因素影响,极不稳定。Xvfb(X Virtual Framebuffer)是纯软件模拟的显示器,完全独立于物理硬件,在任何 Linux 环境(包括无 GPU 的服务器)都能运行。每个容器有独立的
DISPLAY=:99(或其他编号),互不干扰。浏览器状态隔离(Browser State Isolation)
不同任务不能共享同一个浏览器实例——cookies、localState、表单状态会相互污染。容器化确保每个任务从一个全新的浏览器 Profile 开始,结束后整个容器销毁,没有状态残留。这对需要处理多个用户账户的任务至关重要(如客服自动化)。
资源上限强制(Resource Capping)
CPU
limits: 2000m 和 Memory limits: 4Gi 不只是"建议"——Kubernetes 的 cgroup 机制在容器超出上限时会强制限速(CPU)或 OOM Kill(内存)。这防止单个失控的 Agent(例如卡在无限截图循环)耗尽整个节点资源,影响同节点其他 Pod。不可变基础设施(Immutable Infrastructure)
生产镜像一旦构建并推送到 Registry,就不能再修改(只能新建版本)。这意味着每次部署都是全新镜像,不存在"服务器配置漂移"问题。结合 Git 管理 Dockerfile,每个版本的运行环境都可精确追溯。
Kubernetes HPA 的伸缩原理
HPA(Horizontal Pod Autoscaler)基于外部指标(如任务队列长度)自动调整 Pod 数量,其核心控制逻辑如下:
目标副本数 = ceil(当前副本数 × (当前指标值 / 目标指标值))
示例:
当前副本数 = 3
当前队列长度 = 30
目标:每个 Pod 处理 5 个任务
目标副本数 = ceil(3 × (30/5)) = ceil(18) = 18 → 扩容到 18 个 Pod
当队列消耗完:
当前队列长度 = 2
目标副本数 = ceil(18 × (2/5)) = ceil(7.2) = 8 → 缩容到 8 个 Pod
(缩容有 stabilizationWindow 防止抖动,默认 5 分钟)
HPA 的冷启动延迟问题
Computer Use 容器启动一个完整的 Xvfb + 浏览器环境需要 15-30 秒。当任务突然涌入,HPA 触发扩容后新 Pod 还需要等待初始化,这段时间任务仍在排队。解决方案:
- 最小副本数不为零(
minReplicas: 1):始终保持至少 1 个 Pod 运行,避免从零开始的冷启动 - 预热副本池:维护 N 个预初始化但空闲的 Pod,等待任务分配(Kubernetes Job 队列模式)
- 就绪探针调优:配置合理的
initialDelaySeconds和readinessProbe,避免未就绪的 Pod 接收流量导致失败
Prometheus 监控的可观测性三支柱
指标(Metrics)
数值型时间序列数据,如 QPS、延迟分位数、错误率、Token 消耗速率。适合告警和趋势分析。Prometheus Counter/Histogram/Gauge 对应不同指标类型:Counter 只增不减(任务总数),Histogram 统计分布(任务时长分布),Gauge 可升可降(当前活跃任务数)。
日志(Logs)
结构化事件记录,如每次工具调用的入参出参、每个里程碑的完成时间、错误的完整堆栈。适合问题排查。生产中应使用 JSON 格式日志(便于 ELK/Loki 解析),包含
task_id、step、duration_ms 字段,方便按任务 ID 追溯完整执行路径。追踪(Traces)
分布式请求链路追踪,记录一个任务从触发到完成经过的每个服务和函数。适合复杂微服务架构中定位性能瓶颈。使用 OpenTelemetry 为每个任务生成
trace_id,关联 Anthropic API 调用、工具执行、截图采集等各环节的耗时。成本控制的数学模型
理解 Computer Use 的成本构成,才能做出有效的优化决策:
单次任务成本 = Σ(每轮 API 调用成本)
每轮 API 调用成本 =
input_tokens × $15/M + output_tokens × $75/M
input_tokens = 系统提示词 + 消息历史 + 所有截图 token
= ~500 + 累积消息 + 每张截图600-1500 tokens
典型 10 步任务估算:
系统提示词:500 tokens
每步消息历史(累积):平均 2000 tokens → 10步共 20,000 tokens
每步2张截图 × 1000 tokens:20,000 tokens → 10步共 200,000 tokens
每步输出:平均 200 tokens → 10步共 2,000 tokens
总 input:≈ 220,500 tokens → $3.31
总 output:≈ 2,000 tokens → $0.15
总成本:≈ $3.46 / 任务
优化后(截图压缩 + 消息历史管理):
截图 token 降至 600/张 → 120,000 tokens
消息历史压缩 50% → 10,000 tokens
总 input:≈ 130,500 tokens → $1.96
节省 ≈ $1.50 / 任务(43% 降低)
常见生产问题与排查
容器 OOM Kill(内存溢出)
症状:Pod 频繁重启,
kubectl describe pod 显示 OOMKilled。原因:截图缓存积累(每张 JPEG ~30KB,1000张截图 = 30MB,但未压缩的 numpy 数组是原来的 10 倍)。解决:限制截图历史长度,定期清空缓存,或使用流式处理而非在内存中累积所有帧。Xvfb 进程死亡
症状:截图工具返回 "Cannot connect to display"。原因:Xvfb 意外退出(通常是内存不足或信号量冲突)。解决:使用
supervisord 或 s6-overlay 监控并自动重启 Xvfb,健康检查验证 Xvfb 存活。浏览器渲染空白
症状:截图全是白屏,但 Xvfb 和浏览器进程都在。原因:GPU 加速在无头环境报错,fallback 到软件渲染但某些页面需要 GPU。解决:启动浏览器时加
--disable-gpu --disable-software-rasterizer,强制纯软件渲染。API 429 速率限制
症状:多任务并行时频繁出现 429 错误。原因:超出 Anthropic 账户的 RPM(每分钟请求数)或 TPM(每分钟 token 数)限制。解决:实现令牌桶(Token Bucket)限速器,按账户等级控制并发度;升级 Anthropic 账户级别;在任务队列层面限制同时活跃的任务数。
第10章小结
- Docker 容器化解决了 Computer Use 最大的环境一致性问题:Xvfb 提供独立虚拟显示,每个任务在全新浏览器状态中执行,容器销毁后无残留
- Kubernetes HPA 基于任务队列长度弹性伸缩,但冷启动延迟是关键瓶颈;
minReplicas: 1和预热副本池是常见解决方案 - 成本控制的最大杠杆是截图优化(压缩格式 + 按需截图 + 区域截图)和消息历史管理,组合使用可降低 40-60% 的 API 成本
- 可观测性三支柱(指标 + 日志 + 追踪)缺一不可:指标用于告警,日志用于排查,追踪用于定位分布式性能瓶颈
- Prometheus + Grafana 是最成熟的生产监控方案;关键指标:任务成功率、P95 执行时长、每小时 Token 消耗、活跃任务数
- 告警阈值要根据实际业务设定:任务成功率 <90% 为 critical,单任务执行超 10 分钟为 warning,每日 API 成本超预算 80% 提前告警
- 生产常见故障(OOMKill、Xvfb 崩溃、白屏、429 限速)都有成熟的解决方案,关键是建立健全的健康检查和自动恢复机制
恭喜完成 Computer Use 教程!
你已经系统掌握了 Computer Use 多模态 Agent 的完整技术栈:从 API 原理到截图与操作控制,从浏览器自动化到桌面应用控制,从安全沙箱到工具集成,从任务规划到生产部署。这套技术将让你能够构建出真正具有"手眼协调"能力的 AI Agent,自动化任何人类能够在屏幕上完成的工作。下一步:动手实践,从一个真实业务场景的小自动化任务开始!