async/await 使用原则
FastAPI 同时支持 async def 和普通 def 路由函数,理解何时用哪种至关重要:
使用 async def 的场景
- 调用异步数据库(AsyncSession)
- 调用 HTTP 客户端(httpx.AsyncClient)
- 调用异步 Redis(aioredis)
- WebSocket 通信
- 任何 await 表达式所在函数
使用普通 def 的场景
- 调用同步库(requests、同步 ORM)
- CPU 密集型计算
- 文件读写(非异步方式)
- FastAPI 会在线程池中运行 def 函数,不阻塞事件循环
最常见的异步错误
在 async def 函数中调用同步阻塞操作(如 time.sleep()、requests.get()、同步数据库查询)会阻塞整个事件循环,导致并发性能崩溃。解决方案:用 asyncio.sleep() 替代 time.sleep(),用 httpx.AsyncClient 替代 requests,用异步 ORM 替代同步版本。
BackgroundTasks 后台任务
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
# ── 后台任务函数 ──────────────────────────────────────────
async def send_welcome_email(email: str, username: str):
"""模拟发送欢迎邮件(实际使用 SMTP 或邮件服务 API)"""
await asyncio.sleep(2) # 模拟耗时操作
print(f"欢迎邮件已发送到 {email},用户名:{username}")
async def write_audit_log(action: str, user_id: int):
"""写入审计日志"""
# 实际场景:异步写入数据库或日志文件
print(f"[AUDIT] user={user_id} action={action}")
# ── 路由:注册成功后触发后台任务 ─────────────────────────
@app.post("/register")
async def register(
email: str,
username: str,
background_tasks: BackgroundTasks
):
# 立即返回响应,后台异步执行任务
background_tasks.add_task(send_welcome_email, email, username)
background_tasks.add_task(write_audit_log, "register", 1)
# 用户不需要等待邮件发送完成
return {"message": "注册成功!欢迎邮件将在后台发送"}
WebSocket 实时通信
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Any
app = FastAPI()
# ── 连接管理器(多人聊天室)──────────────────────────────
class ConnectionManager:
def __init__(self):
self.active_connections: dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.active_connections[client_id] = websocket
def disconnect(self, client_id: str):
self.active_connections.pop(client_id, None)
async def send_personal(self, message: str, client_id: str):
if client_id in self.active_connections:
await self.active_connections[client_id].send_text(message)
async def broadcast(self, message: str, exclude: str | None = None):
for cid, ws in self.active_connections.items():
if cid != exclude:
await ws.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket, client_id)
await manager.broadcast(f"用户 {client_id} 加入了聊天室")
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"{client_id}: {data}", exclude=client_id)
await manager.send_personal("✓ 消息已发送", client_id)
except WebSocketDisconnect:
manager.disconnect(client_id)
await manager.broadcast(f"用户 {client_id} 离开了聊天室")
文件上传
from fastapi import FastAPI, UploadFile, File, HTTPException
from typing import Annotated
import aiofiles
app = FastAPI()
ALLOWED_TYPES = {"image/jpeg", "image/png", "application/pdf"}
MAX_SIZE = 10 * 1024 * 1024 # 10MB
# ── 单文件上传 ────────────────────────────────────────────
@app.post("/upload")
async def upload_file(file: Annotated[UploadFile, File(description="上传文件")]):
# 验证文件类型
if file.content_type not in ALLOWED_TYPES:
raise HTTPException(400, f"不支持的文件类型:{file.content_type}")
# 读取内容(小文件)
content = await file.read()
# 验证文件大小
if len(content) > MAX_SIZE:
raise HTTPException(413, "文件太大,最大 10MB")
# 异步写入磁盘
save_path = f"uploads/{file.filename}"
async with aiofiles.open(save_path, "wb") as f:
await f.write(content)
return {
"filename": file.filename,
"content_type": file.content_type,
"size": len(content)
}
# ── 大文件流式写入(避免内存溢出)──────────────────────
@app.post("/upload/large")
async def upload_large_file(file: UploadFile):
chunk_size = 1024 * 1024 # 1MB 分块读取
total = 0
async with aiofiles.open(f"uploads/{file.filename}", "wb") as f:
while chunk := await file.read(chunk_size):
await f.write(chunk)
total += len(chunk)
return {"filename": file.filename, "total_bytes": total}
SSE 流式响应(EventSource)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
# ── SSE 生成器:服务器推送事件 ────────────────────────────
async def sse_generator(topic: str):
"""生成 SSE 格式的数据流"""
for i in range(5):
data = {"topic": topic, "index": i, "message": f"第 {i+1} 条消息"}
# SSE 格式:data: {json}\n\n
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
await asyncio.sleep(1)
yield "data: [DONE]\n\n"
@app.get("/events/{topic}")
async def stream_events(topic: str):
return StreamingResponse(
sse_generator(topic),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # 禁用 Nginx 缓冲
}
)
Lifespan 事件(启动与关闭)
from fastapi import FastAPI
from contextlib import asynccontextmanager
import httpx
# 全局资源(HTTP 客户端连接池、ML 模型等)
http_client: httpx.AsyncClient | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# ── 启动时执行 ────────────────────────────────────────
global http_client
print("启动:初始化 HTTP 客户端连接池...")
http_client = httpx.AsyncClient(timeout=30.0, limits=httpx.Limits(max_connections=100))
print("启动:创建数据库表...")
from app.database import engine, Base
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("应用启动完成!")
yield # ← 应用运行期间
# ── 关闭时执行 ────────────────────────────────────────
print("关闭:释放 HTTP 客户端...")
await http_client.aclose()
print("应用已安全关闭")
app = FastAPI(lifespan=lifespan)
本章小结
FastAPI 的异步生态:BackgroundTasks 处理邮件/日志等非实时操作;WebSocket 实现双向实时通信;UploadFile 处理文件上传(大文件用分块流式写入);StreamingResponse + SSE 实现服务器推送(AI 流式输出的关键);Lifespan 管理应用级资源(连接池、模型预加载)。下一章学习如何为 FastAPI 应用编写完善的测试。