Chapter 10

AI 应用集成实战

将 FastAPI 与 AI 能力深度结合:流式 LLM 响应、本地模型代理、PDF 分析、RAG 接口,以及完整的 AI 后端项目架构。

集成 Anthropic SDK(SSE 流式输出)

AI 应用最核心的体验是流式输出——用户实时看到 AI 生成内容,而不是等待全部完成后一次性返回。FastAPI 的 StreamingResponse 配合 Anthropic SDK 的流式 API 完美实现这一需求:

# app/routers/ai.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import AsyncGenerator
import anthropic
import json

router = APIRouter(prefix="/ai", tags=["AI"])

class ChatRequest(BaseModel):
    message: str
    system_prompt: str = "你是一个专业、友善的 AI 助手。"
    model: str = "claude-sonnet-4-6"
    max_tokens: int = 4096

# ── 流式对话生成器 ────────────────────────────────────────
async def stream_claude_response(request: ChatRequest) -> AsyncGenerator[str, None]:
    """调用 Anthropic API 并将流式响应转换为 SSE 格式"""
    client = anthropic.Anthropic()

    # 使用 stream() 上下文管理器
    with client.messages.stream(
        model=request.model,
        max_tokens=request.max_tokens,
        system=request.system_prompt,
        messages=[{"role": "user", "content": request.message}]
    ) as stream:
        for text_chunk in stream.text_stream():
            # 发送每个文字片段(SSE data 格式)
            chunk_data = json.dumps({"text": text_chunk}, ensure_ascii=False)
            yield f"data: {chunk_data}\n\n"

    # 发送结束信号
    yield 'data: {"done": true}\n\n'

@router.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    """
    流式 AI 对话端点。
    前端使用 EventSource 或 fetch + ReadableStream 接收。
    """
    return StreamingResponse(
        stream_claude_response(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

前端接收 SSE 示例

// 使用 fetch + ReadableStream 接收 SSE
async function streamChat(message) {
  const response = await fetch('/ai/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    const text = decoder.decode(value);
    const lines = text.split('\n\n').filter(Boolean);
    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = JSON.parse(line.slice(6));
        if (!data.done) document.getElementById('output').textContent += data.text;
      }
    }
  }
}

Ollama 本地 LLM 代理

import httpx
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

router = APIRouter()
OLLAMA_BASE_URL = "http://localhost:11434"

class OllamaRequest(BaseModel):
    model: str = "llama3.2"
    prompt: str
    stream: bool = True

@router.post("/ollama/generate")
async def ollama_generate(request: OllamaRequest):
    """代理 Ollama 本地模型 API,转发流式响应"""
    async def ollama_stream():
        async with httpx.AsyncClient(timeout=None) as client:
            async with client.stream(
                "POST",
                f"{OLLAMA_BASE_URL}/api/generate",
                json=request.model_dump()
            ) as response:
                async for chunk in response.aiter_bytes():
                    yield chunk  # 直接透传 Ollama 的 NDJSON 流

    return StreamingResponse(ollama_stream(), media_type="application/x-ndjson")

文件上传 + AI 分析(PDF 解析)

from fastapi import APIRouter, UploadFile, File
from fastapi.responses import StreamingResponse
import anthropic
import base64

router = APIRouter()

@router.post("/analyze/pdf")
async def analyze_pdf(
    file: UploadFile = File(..., description="待分析的 PDF 文件"),
    question: str = "请总结这份文档的主要内容"
):
    if file.content_type != "application/pdf":
        raise HTTPException(400, "只接受 PDF 文件")

    pdf_content = await file.read()
    pdf_base64 = base64.b64encode(pdf_content).decode()

    client = anthropic.Anthropic()

    async def stream_analysis():
        with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=4096,
            messages=[{
                "role": "user",
                "content": [
                    {
                        "type": "document",
                        "source": {
                            "type": "base64",
                            "media_type": "application/pdf",
                            "data": pdf_base64
                        }
                    },
                    {"type": "text", "text": question}
                ]
            }]
        ) as stream:
            for text in stream.text_stream():
                yield f"data: {json.dumps({'text': text})}\n\n"
        yield 'data: {"done": true}\n\n'

    return StreamingResponse(stream_analysis(), media_type="text/event-stream")

RAG 接口(向量检索 + 流式回答)

from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic

router = APIRouter()

class RAGRequest(BaseModel):
    question: str
    collection: str = "default"
    top_k: int = 5

@router.post("/rag/query")
async def rag_query(request: RAGRequest):
    """RAG 接口:向量检索 + 流式生成回答"""

    # ── Step 1:向量检索相关文档 ──────────────────────────
    # 实际使用 Qdrant/Chroma/Weaviate 等向量数据库
    relevant_docs = await vector_search(
        query=request.question,
        collection=request.collection,
        top_k=request.top_k
    )

    # ── Step 2:构建 RAG 提示词 ───────────────────────────
    context = "\n\n---\n\n".join([doc["content"] for doc in relevant_docs])
    rag_prompt = f"""请根据以下参考资料回答用户问题。
如果资料中没有相关信息,请明确说明。

参考资料:
{context}

用户问题:{request.question}"""

    # ── Step 3:流式生成答案 ──────────────────────────────
    client = anthropic.Anthropic()

    async def rag_stream():
        # 先发送检索到的文档来源
        sources = [{"title": d["title"], "score": d["score"]} for d in relevant_docs]
        yield f"data: {json.dumps({'sources': sources})}\n\n"

        # 流式生成 AI 回答
        with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=2048,
            messages=[{"role": "user", "content": rag_prompt}]
        ) as stream:
            for text in stream.text_stream():
                yield f"data: {json.dumps({'text': text})}\n\n"
        yield 'data: {"done": true}\n\n'

    return StreamingResponse(rag_stream(), media_type="text/event-stream")

async def vector_search(query: str, collection: str, top_k: int) -> list[dict]:
    """实际实现:调用向量数据库 SDK"""
    # from qdrant_client import AsyncQdrantClient
    # client = AsyncQdrantClient(url="http://localhost:6333")
    # results = await client.search(collection, query_vector=embed(query), limit=top_k)
    return [  # 模拟返回
        {"title": "FastAPI 文档", "content": "FastAPI 是...", "score": 0.95}
    ]

完整 AI 后端项目结构

fastapi-ai-backend/
├── app/
│   ├── main.py                 # 应用入口,lifespan 事件
│   ├── core/
│   │   ├── config.py           # pydantic-settings 配置
│   │   └── security.py         # JWT 工具函数
│   ├── routers/
│   │   ├── auth.py             # 登录/注册/Token 刷新
│   │   ├── users.py            # 用户 CRUD
│   │   ├── ai.py               # AI 对话、流式输出
│   │   ├── files.py            # 文件上传、PDF 分析
│   │   └── rag.py              # RAG 查询接口
│   ├── models/
│   │   ├── user.py             # SQLAlchemy 用户模型
│   │   └── conversation.py     # 对话历史模型
│   ├── schemas/
│   │   ├── user.py             # Pydantic 请求/响应 Schema
│   │   └── ai.py               # AI 相关 Schema
│   ├── services/
│   │   ├── user_service.py     # 用户业务逻辑
│   │   ├── ai_service.py       # LLM 调用封装
│   │   └── vector_service.py   # 向量检索服务
│   └── database.py             # 数据库连接
├── tests/
│   ├── conftest.py
│   ├── test_auth.py
│   ├── test_users.py
│   └── test_ai.py
├── alembic/                    # 数据库迁移
├── .env                        # ANTHROPIC_API_KEY 等
├── pyproject.toml
├── Dockerfile
└── fly.toml                    # Fly.io 部署配置

部署到 Fly.io

# 安装 Fly.io CLI
curl -L https://fly.io/install.sh | sh

# 登录
fly auth login

# 初始化应用(自动检测 Dockerfile)
fly launch

# 设置环境变量(Secrets)
fly secrets set ANTHROPIC_API_KEY="sk-ant-xxx"
fly secrets set SECRET_KEY="your-32-char-secret-key"
fly secrets set DATABASE_URL="postgresql+asyncpg://..."

# 部署
fly deploy

# 查看日志
fly logs

# 扩容(增加实例)
fly scale count 2
AI API 速率限制与并发控制 LLM API 有 RPM(每分钟请求数)和 TPM(每分钟 Token 数)限制。应对策略: 1. 用 slowapi 对 AI 端点设置严格限流(如每用户 10 次/分钟)。 2. 使用信号量(asyncio.Semaphore)限制并发 LLM 调用数量。 3. 请求队列(Redis + celery)处理高峰期积压请求。 4. 缓存相同问题的答案(Redis,TTL 1 小时)。
全教程总结 恭喜完成 FastAPI 完整教程!核心知识体系:Pydantic v2 数据验证 → APIRouter 模块化路由 → Depends 依赖注入 → 异步 SQLAlchemy + Alembic 数据库 → JWT 认证安全 → WebSocket/SSE 实时通信 → pytest 测试 → Docker + Gunicorn 生产部署 → AI SDK 集成流式输出。FastAPI 正是构建 AI 时代 API 后端的最佳框架选择。