Chapter 07

异步编程与高级特性

掌握 FastAPI 的异步能力精髓:WebSocket 实时通信、SSE 流式响应、文件上传与 Lifespan 事件,构建现代实时应用。

async/await 使用原则

FastAPI 同时支持 async def 和普通 def 路由函数,理解何时用哪种至关重要:

使用 async def 的场景

  • 调用异步数据库(AsyncSession)
  • 调用 HTTP 客户端(httpx.AsyncClient)
  • 调用异步 Redis(aioredis)
  • WebSocket 通信
  • 任何 await 表达式所在函数

使用普通 def 的场景

  • 调用同步库(requests、同步 ORM)
  • CPU 密集型计算
  • 文件读写(非异步方式)
  • FastAPI 会在线程池中运行 def 函数,不阻塞事件循环
最常见的异步错误 在 async def 函数中调用同步阻塞操作(如 time.sleep()、requests.get()、同步数据库查询)会阻塞整个事件循环,导致并发性能崩溃。解决方案:用 asyncio.sleep() 替代 time.sleep(),用 httpx.AsyncClient 替代 requests,用异步 ORM 替代同步版本。

BackgroundTasks 后台任务

from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

# ── 后台任务函数 ──────────────────────────────────────────
async def send_welcome_email(email: str, username: str):
    """模拟发送欢迎邮件(实际使用 SMTP 或邮件服务 API)"""
    await asyncio.sleep(2)  # 模拟耗时操作
    print(f"欢迎邮件已发送到 {email},用户名:{username}")

async def write_audit_log(action: str, user_id: int):
    """写入审计日志"""
    # 实际场景:异步写入数据库或日志文件
    print(f"[AUDIT] user={user_id} action={action}")

# ── 路由:注册成功后触发后台任务 ─────────────────────────
@app.post("/register")
async def register(
    email: str,
    username: str,
    background_tasks: BackgroundTasks
):
    # 立即返回响应,后台异步执行任务
    background_tasks.add_task(send_welcome_email, email, username)
    background_tasks.add_task(write_audit_log, "register", 1)

    # 用户不需要等待邮件发送完成
    return {"message": "注册成功!欢迎邮件将在后台发送"}

WebSocket 实时通信

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Any

app = FastAPI()

# ── 连接管理器(多人聊天室)──────────────────────────────
class ConnectionManager:
    def __init__(self):
        self.active_connections: dict[str, WebSocket] = {}

    async def connect(self, websocket: WebSocket, client_id: str):
        await websocket.accept()
        self.active_connections[client_id] = websocket

    def disconnect(self, client_id: str):
        self.active_connections.pop(client_id, None)

    async def send_personal(self, message: str, client_id: str):
        if client_id in self.active_connections:
            await self.active_connections[client_id].send_text(message)

    async def broadcast(self, message: str, exclude: str | None = None):
        for cid, ws in self.active_connections.items():
            if cid != exclude:
                await ws.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket, client_id)
    await manager.broadcast(f"用户 {client_id} 加入了聊天室")
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"{client_id}: {data}", exclude=client_id)
            await manager.send_personal("✓ 消息已发送", client_id)
    except WebSocketDisconnect:
        manager.disconnect(client_id)
        await manager.broadcast(f"用户 {client_id} 离开了聊天室")

文件上传

from fastapi import FastAPI, UploadFile, File, HTTPException
from typing import Annotated
import aiofiles

app = FastAPI()

ALLOWED_TYPES = {"image/jpeg", "image/png", "application/pdf"}
MAX_SIZE = 10 * 1024 * 1024  # 10MB

# ── 单文件上传 ────────────────────────────────────────────
@app.post("/upload")
async def upload_file(file: Annotated[UploadFile, File(description="上传文件")]):
    # 验证文件类型
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(400, f"不支持的文件类型:{file.content_type}")

    # 读取内容(小文件)
    content = await file.read()

    # 验证文件大小
    if len(content) > MAX_SIZE:
        raise HTTPException(413, "文件太大,最大 10MB")

    # 异步写入磁盘
    save_path = f"uploads/{file.filename}"
    async with aiofiles.open(save_path, "wb") as f:
        await f.write(content)

    return {
        "filename": file.filename,
        "content_type": file.content_type,
        "size": len(content)
    }

# ── 大文件流式写入(避免内存溢出)──────────────────────
@app.post("/upload/large")
async def upload_large_file(file: UploadFile):
    chunk_size = 1024 * 1024  # 1MB 分块读取
    total = 0
    async with aiofiles.open(f"uploads/{file.filename}", "wb") as f:
        while chunk := await file.read(chunk_size):
            await f.write(chunk)
            total += len(chunk)
    return {"filename": file.filename, "total_bytes": total}

SSE 流式响应(EventSource)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

# ── SSE 生成器:服务器推送事件 ────────────────────────────
async def sse_generator(topic: str):
    """生成 SSE 格式的数据流"""
    for i in range(5):
        data = {"topic": topic, "index": i, "message": f"第 {i+1} 条消息"}
        # SSE 格式:data: {json}\n\n
        yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
        await asyncio.sleep(1)
    yield "data: [DONE]\n\n"

@app.get("/events/{topic}")
async def stream_events(topic: str):
    return StreamingResponse(
        sse_generator(topic),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no"  # 禁用 Nginx 缓冲
        }
    )

Lifespan 事件(启动与关闭)

from fastapi import FastAPI
from contextlib import asynccontextmanager
import httpx

# 全局资源(HTTP 客户端连接池、ML 模型等)
http_client: httpx.AsyncClient | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # ── 启动时执行 ────────────────────────────────────────
    global http_client

    print("启动:初始化 HTTP 客户端连接池...")
    http_client = httpx.AsyncClient(timeout=30.0, limits=httpx.Limits(max_connections=100))

    print("启动:创建数据库表...")
    from app.database import engine, Base
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    print("应用启动完成!")
    yield  # ← 应用运行期间

    # ── 关闭时执行 ────────────────────────────────────────
    print("关闭:释放 HTTP 客户端...")
    await http_client.aclose()
    print("应用已安全关闭")

app = FastAPI(lifespan=lifespan)
本章小结 FastAPI 的异步生态:BackgroundTasks 处理邮件/日志等非实时操作;WebSocket 实现双向实时通信;UploadFile 处理文件上传(大文件用分块流式写入);StreamingResponse + SSE 实现服务器推送(AI 流式输出的关键);Lifespan 管理应用级资源(连接池、模型预加载)。下一章学习如何为 FastAPI 应用编写完善的测试。