集成 Anthropic SDK(SSE 流式输出)
AI 应用最核心的体验是流式输出——用户实时看到 AI 生成内容,而不是等待全部完成后一次性返回。FastAPI 的 StreamingResponse 配合 Anthropic SDK 的流式 API 完美实现这一需求:
# app/routers/ai.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import AsyncGenerator
import anthropic
import json
router = APIRouter(prefix="/ai", tags=["AI"])
class ChatRequest(BaseModel):
message: str
system_prompt: str = "你是一个专业、友善的 AI 助手。"
model: str = "claude-sonnet-4-6"
max_tokens: int = 4096
# ── 流式对话生成器 ────────────────────────────────────────
async def stream_claude_response(request: ChatRequest) -> AsyncGenerator[str, None]:
"""调用 Anthropic API 并将流式响应转换为 SSE 格式"""
client = anthropic.Anthropic()
# 使用 stream() 上下文管理器
with client.messages.stream(
model=request.model,
max_tokens=request.max_tokens,
system=request.system_prompt,
messages=[{"role": "user", "content": request.message}]
) as stream:
for text_chunk in stream.text_stream():
# 发送每个文字片段(SSE data 格式)
chunk_data = json.dumps({"text": text_chunk}, ensure_ascii=False)
yield f"data: {chunk_data}\n\n"
# 发送结束信号
yield 'data: {"done": true}\n\n'
@router.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""
流式 AI 对话端点。
前端使用 EventSource 或 fetch + ReadableStream 接收。
"""
return StreamingResponse(
stream_claude_response(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
前端接收 SSE 示例
// 使用 fetch + ReadableStream 接收 SSE
async function streamChat(message) {
const response = await fetch('/ai/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n\n').filter(Boolean);
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (!data.done) document.getElementById('output').textContent += data.text;
}
}
}
}
Ollama 本地 LLM 代理
import httpx
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
router = APIRouter()
OLLAMA_BASE_URL = "http://localhost:11434"
class OllamaRequest(BaseModel):
model: str = "llama3.2"
prompt: str
stream: bool = True
@router.post("/ollama/generate")
async def ollama_generate(request: OllamaRequest):
"""代理 Ollama 本地模型 API,转发流式响应"""
async def ollama_stream():
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(
"POST",
f"{OLLAMA_BASE_URL}/api/generate",
json=request.model_dump()
) as response:
async for chunk in response.aiter_bytes():
yield chunk # 直接透传 Ollama 的 NDJSON 流
return StreamingResponse(ollama_stream(), media_type="application/x-ndjson")
文件上传 + AI 分析(PDF 解析)
from fastapi import APIRouter, UploadFile, File
from fastapi.responses import StreamingResponse
import anthropic
import base64
router = APIRouter()
@router.post("/analyze/pdf")
async def analyze_pdf(
file: UploadFile = File(..., description="待分析的 PDF 文件"),
question: str = "请总结这份文档的主要内容"
):
if file.content_type != "application/pdf":
raise HTTPException(400, "只接受 PDF 文件")
pdf_content = await file.read()
pdf_base64 = base64.b64encode(pdf_content).decode()
client = anthropic.Anthropic()
async def stream_analysis():
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=4096,
messages=[{
"role": "user",
"content": [
{
"type": "document",
"source": {
"type": "base64",
"media_type": "application/pdf",
"data": pdf_base64
}
},
{"type": "text", "text": question}
]
}]
) as stream:
for text in stream.text_stream():
yield f"data: {json.dumps({'text': text})}\n\n"
yield 'data: {"done": true}\n\n'
return StreamingResponse(stream_analysis(), media_type="text/event-stream")
RAG 接口(向量检索 + 流式回答)
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
router = APIRouter()
class RAGRequest(BaseModel):
question: str
collection: str = "default"
top_k: int = 5
@router.post("/rag/query")
async def rag_query(request: RAGRequest):
"""RAG 接口:向量检索 + 流式生成回答"""
# ── Step 1:向量检索相关文档 ──────────────────────────
# 实际使用 Qdrant/Chroma/Weaviate 等向量数据库
relevant_docs = await vector_search(
query=request.question,
collection=request.collection,
top_k=request.top_k
)
# ── Step 2:构建 RAG 提示词 ───────────────────────────
context = "\n\n---\n\n".join([doc["content"] for doc in relevant_docs])
rag_prompt = f"""请根据以下参考资料回答用户问题。
如果资料中没有相关信息,请明确说明。
参考资料:
{context}
用户问题:{request.question}"""
# ── Step 3:流式生成答案 ──────────────────────────────
client = anthropic.Anthropic()
async def rag_stream():
# 先发送检索到的文档来源
sources = [{"title": d["title"], "score": d["score"]} for d in relevant_docs]
yield f"data: {json.dumps({'sources': sources})}\n\n"
# 流式生成 AI 回答
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": rag_prompt}]
) as stream:
for text in stream.text_stream():
yield f"data: {json.dumps({'text': text})}\n\n"
yield 'data: {"done": true}\n\n'
return StreamingResponse(rag_stream(), media_type="text/event-stream")
async def vector_search(query: str, collection: str, top_k: int) -> list[dict]:
"""实际实现:调用向量数据库 SDK"""
# from qdrant_client import AsyncQdrantClient
# client = AsyncQdrantClient(url="http://localhost:6333")
# results = await client.search(collection, query_vector=embed(query), limit=top_k)
return [ # 模拟返回
{"title": "FastAPI 文档", "content": "FastAPI 是...", "score": 0.95}
]
完整 AI 后端项目结构
fastapi-ai-backend/
├── app/
│ ├── main.py # 应用入口,lifespan 事件
│ ├── core/
│ │ ├── config.py # pydantic-settings 配置
│ │ └── security.py # JWT 工具函数
│ ├── routers/
│ │ ├── auth.py # 登录/注册/Token 刷新
│ │ ├── users.py # 用户 CRUD
│ │ ├── ai.py # AI 对话、流式输出
│ │ ├── files.py # 文件上传、PDF 分析
│ │ └── rag.py # RAG 查询接口
│ ├── models/
│ │ ├── user.py # SQLAlchemy 用户模型
│ │ └── conversation.py # 对话历史模型
│ ├── schemas/
│ │ ├── user.py # Pydantic 请求/响应 Schema
│ │ └── ai.py # AI 相关 Schema
│ ├── services/
│ │ ├── user_service.py # 用户业务逻辑
│ │ ├── ai_service.py # LLM 调用封装
│ │ └── vector_service.py # 向量检索服务
│ └── database.py # 数据库连接
├── tests/
│ ├── conftest.py
│ ├── test_auth.py
│ ├── test_users.py
│ └── test_ai.py
├── alembic/ # 数据库迁移
├── .env # ANTHROPIC_API_KEY 等
├── pyproject.toml
├── Dockerfile
└── fly.toml # Fly.io 部署配置
部署到 Fly.io
# 安装 Fly.io CLI
curl -L https://fly.io/install.sh | sh
# 登录
fly auth login
# 初始化应用(自动检测 Dockerfile)
fly launch
# 设置环境变量(Secrets)
fly secrets set ANTHROPIC_API_KEY="sk-ant-xxx"
fly secrets set SECRET_KEY="your-32-char-secret-key"
fly secrets set DATABASE_URL="postgresql+asyncpg://..."
# 部署
fly deploy
# 查看日志
fly logs
# 扩容(增加实例)
fly scale count 2
AI API 速率限制与并发控制
LLM API 有 RPM(每分钟请求数)和 TPM(每分钟 Token 数)限制。应对策略:
1. 用 slowapi 对 AI 端点设置严格限流(如每用户 10 次/分钟)。
2. 使用信号量(asyncio.Semaphore)限制并发 LLM 调用数量。
3. 请求队列(Redis + celery)处理高峰期积压请求。
4. 缓存相同问题的答案(Redis,TTL 1 小时)。
全教程总结
恭喜完成 FastAPI 完整教程!核心知识体系:Pydantic v2 数据验证 → APIRouter 模块化路由 → Depends 依赖注入 → 异步 SQLAlchemy + Alembic 数据库 → JWT 认证安全 → WebSocket/SSE 实时通信 → pytest 测试 → Docker + Gunicorn 生产部署 → AI SDK 集成流式输出。FastAPI 正是构建 AI 时代 API 后端的最佳框架选择。