安装依赖
# JWT 处理库
pip install "python-jose[cryptography]"
# 密码哈希
pip install "passlib[bcrypt]"
# 或者使用 python-multipart(OAuth2 表单需要)
pip install python-multipart
JWT(JSON Web Token)
一种无状态的认证令牌格式,由三部分组成:Header(算法类型).Payload(声明/数据).Signature(签名)。服务器不需要存储 Session,通过验证签名即可确认 Token 合法性。适合 API 认证和微服务间通信。
OAuth2 密码流
OAuth2 的一种授权模式,用户直接将用户名/密码提交给 API 换取 Access Token。适合自有应用(客户端和服务器由同一团队开发),不适合第三方应用授权(应使用 Authorization Code Flow)。
bcrypt
专为密码哈希设计的算法,具有内置盐(salt)和可调节的计算成本(work factor)。不同于 MD5/SHA,bcrypt 刻意设计为慢速,使暴力破解成本极高。永远不要用 MD5/SHA 存储密码,只用 bcrypt/argon2/scrypt。
安全工具函数
# app/core/security.py
from datetime import datetime, timedelta, timezone
from typing import Any
from jose import JWTError, jwt
from passlib.context import CryptContext
# ── 配置(生产环境从环境变量读取)───────────────────────
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
# 密码上下文(支持 bcrypt,自动处理盐值)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ── 密码相关 ──────────────────────────────────────────────
def get_password_hash(password: str) -> str:
"""对明文密码进行 bcrypt 哈希"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证明文密码与哈希值是否匹配"""
return pwd_context.verify(plain_password, hashed_password)
# ── JWT Token 相关 ────────────────────────────────────────
def create_access_token(
subject: str | Any,
expires_delta: timedelta | None = None
) -> str:
"""生成 JWT Access Token"""
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode = {
"sub": str(subject), # subject:通常是用户 ID 或用户名
"exp": expire,
"type": "access"
}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(subject: str) -> str:
"""生成 JWT Refresh Token(有效期更长)"""
expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
return jwt.encode(
{"sub": subject, "exp": expire, "type": "refresh"},
SECRET_KEY, algorithm=ALGORITHM
)
def decode_token(token: str) -> dict:
"""解码并验证 JWT Token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
raise ValueError("Token 无效或已过期")
OAuth2 登录端点
# app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Annotated
from app.core.security import (
verify_password, create_access_token, create_refresh_token, decode_token
)
router = APIRouter(prefix="/auth", tags=["认证"])
# OAuth2Bearer:从 Authorization: Bearer <token> Header 提取 token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
# ── 登录端点(OAuth2 密码流)─────────────────────────────
@router.post("/login", response_model=TokenResponse)
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
"""
OAuth2 密码流登录。
接收 form-data: username + password
返回 access_token 和 refresh_token
"""
# 从数据库查询用户(此处用模拟数据)
fake_user = {"id": 1, "username": "alice",
"hashed_password": get_password_hash("secret123")}
if form_data.username != fake_user["username"] or \
not verify_password(form_data.password, fake_user["hashed_password"]):
raise HTTPException(
status_code=401,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"}
)
return TokenResponse(
access_token=create_access_token(fake_user["id"]),
refresh_token=create_refresh_token(str(fake_user["id"]))
)
# ── 当前用户依赖 ──────────────────────────────────────────
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)]
) -> dict:
try:
payload = decode_token(token)
user_id = payload.get("sub")
if not user_id:
raise ValueError()
except ValueError:
raise HTTPException(status_code=401, detail="Token 无效")
# 实际场景:根据 user_id 查询数据库
return {"id": user_id, "username": "alice"}
@router.get("/me")
async def get_me(user: Annotated[dict, Depends(get_current_user)]):
return user
CORS 中间件配置
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# ── CORS 配置(允许前端域名访问)─────────────────────────
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000", # React 开发服务器
"http://localhost:5173", # Vite 开发服务器
"https://myapp.com", # 生产域名
],
allow_credentials=True, # 允许携带 Cookie/Authorization Header
allow_methods=["*"], # 允许所有 HTTP 方法
allow_headers=["*"], # 允许所有请求头
)
# 不要在生产环境使用 allow_origins=["*"],
# 这会允许任何域名访问,存在 CSRF 安全风险
API Key 认证
from fastapi import FastAPI, Security, HTTPException
from fastapi.security import APIKeyHeader, APIKeyQuery
from typing import Annotated
app = FastAPI()
# 从 Header 提取:X-API-Key: your-key
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
# 从 Query 提取:?api_key=your-key
api_key_query = APIKeyQuery(name="api_key", auto_error=False)
VALID_API_KEYS = {"key-abc-123", "key-xyz-456"}
async def get_api_key(
key_from_header: Annotated[str | None, Security(api_key_header)] = None,
key_from_query: Annotated[str | None, Security(api_key_query)] = None,
) -> str:
api_key = key_from_header or key_from_query
if not api_key or api_key not in VALID_API_KEYS:
raise HTTPException(status_code=403, detail="无效 API Key")
return api_key
@app.get("/data")
async def get_data(api_key: Annotated[str, Depends(get_api_key)]):
return {"data": "机密数据", "key_used": api_key[:8] + "..."}
安全最佳实践清单
1. SECRET_KEY 必须足够复杂(32+ 字符),从环境变量读取,绝不硬编码在代码中。
2. Access Token 有效期设置短(15-30 分钟),Refresh Token 用于续期(7-30 天)。
3. 密码只存储 bcrypt/argon2 哈希值,日志中不打印密码、Token 等敏感信息。
4. 生产环境启用 HTTPS,防止 Token 在传输中被截获。
5. 实现 Token 吊销机制(黑名单存储在 Redis),防止 Token 泄露后无法撤销。
本章小结
FastAPI 的认证体系:OAuth2PasswordBearer 提取 Token、python-jose 生成/验证 JWT、passlib/bcrypt 哈希密码、CORSMiddleware 处理跨域。通过依赖注入将认证逻辑与业务逻辑分离,实现代码复用和易于测试。下一章探索 FastAPI 的异步能力:WebSocket、SSE 流式响应与文件上传。