Chapter 06

认证与安全

实现生产级 JWT 认证系统:OAuth2 密码流、Token 生成与验证、bcrypt 密码哈希、CORS 配置与安全最佳实践。

安装依赖

# JWT 处理库
pip install "python-jose[cryptography]"

# 密码哈希
pip install "passlib[bcrypt]"

# 或者使用 python-multipart(OAuth2 表单需要)
pip install python-multipart
JWT(JSON Web Token)
一种无状态的认证令牌格式,由三部分组成:Header(算法类型).Payload(声明/数据).Signature(签名)。服务器不需要存储 Session,通过验证签名即可确认 Token 合法性。适合 API 认证和微服务间通信。
OAuth2 密码流
OAuth2 的一种授权模式,用户直接将用户名/密码提交给 API 换取 Access Token。适合自有应用(客户端和服务器由同一团队开发),不适合第三方应用授权(应使用 Authorization Code Flow)。
bcrypt
专为密码哈希设计的算法,具有内置盐(salt)和可调节的计算成本(work factor)。不同于 MD5/SHA,bcrypt 刻意设计为慢速,使暴力破解成本极高。永远不要用 MD5/SHA 存储密码,只用 bcrypt/argon2/scrypt。

安全工具函数

# app/core/security.py
from datetime import datetime, timedelta, timezone
from typing import Any
from jose import JWTError, jwt
from passlib.context import CryptContext

# ── 配置(生产环境从环境变量读取)───────────────────────
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

# 密码上下文(支持 bcrypt,自动处理盐值)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# ── 密码相关 ──────────────────────────────────────────────
def get_password_hash(password: str) -> str:
    """对明文密码进行 bcrypt 哈希"""
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """验证明文密码与哈希值是否匹配"""
    return pwd_context.verify(plain_password, hashed_password)

# ── JWT Token 相关 ────────────────────────────────────────
def create_access_token(
    subject: str | Any,
    expires_delta: timedelta | None = None
) -> str:
    """生成 JWT Access Token"""
    expire = datetime.now(timezone.utc) + (
        expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    )
    to_encode = {
        "sub": str(subject),  # subject:通常是用户 ID 或用户名
        "exp": expire,
        "type": "access"
    }
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def create_refresh_token(subject: str) -> str:
    """生成 JWT Refresh Token(有效期更长)"""
    expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    return jwt.encode(
        {"sub": subject, "exp": expire, "type": "refresh"},
        SECRET_KEY, algorithm=ALGORITHM
    )

def decode_token(token: str) -> dict:
    """解码并验证 JWT Token"""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except JWTError:
        raise ValueError("Token 无效或已过期")

OAuth2 登录端点

# app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Annotated
from app.core.security import (
    verify_password, create_access_token, create_refresh_token, decode_token
)

router = APIRouter(prefix="/auth", tags=["认证"])

# OAuth2Bearer:从 Authorization: Bearer <token> Header 提取 token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")

class TokenResponse(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

# ── 登录端点(OAuth2 密码流)─────────────────────────────
@router.post("/login", response_model=TokenResponse)
async def login(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
    """
    OAuth2 密码流登录。
    接收 form-data: username + password
    返回 access_token 和 refresh_token
    """
    # 从数据库查询用户(此处用模拟数据)
    fake_user = {"id": 1, "username": "alice",
                 "hashed_password": get_password_hash("secret123")}

    if form_data.username != fake_user["username"] or \
       not verify_password(form_data.password, fake_user["hashed_password"]):
        raise HTTPException(
            status_code=401,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"}
        )

    return TokenResponse(
        access_token=create_access_token(fake_user["id"]),
        refresh_token=create_refresh_token(str(fake_user["id"]))
    )

# ── 当前用户依赖 ──────────────────────────────────────────
async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)]
) -> dict:
    try:
        payload = decode_token(token)
        user_id = payload.get("sub")
        if not user_id:
            raise ValueError()
    except ValueError:
        raise HTTPException(status_code=401, detail="Token 无效")
    # 实际场景:根据 user_id 查询数据库
    return {"id": user_id, "username": "alice"}

@router.get("/me")
async def get_me(user: Annotated[dict, Depends(get_current_user)]):
    return user

CORS 中间件配置

# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# ── CORS 配置(允许前端域名访问)─────────────────────────
app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "http://localhost:3000",    # React 开发服务器
        "http://localhost:5173",    # Vite 开发服务器
        "https://myapp.com",        # 生产域名
    ],
    allow_credentials=True,        # 允许携带 Cookie/Authorization Header
    allow_methods=["*"],           # 允许所有 HTTP 方法
    allow_headers=["*"],           # 允许所有请求头
)

# 不要在生产环境使用 allow_origins=["*"],
# 这会允许任何域名访问,存在 CSRF 安全风险

API Key 认证

from fastapi import FastAPI, Security, HTTPException
from fastapi.security import APIKeyHeader, APIKeyQuery
from typing import Annotated

app = FastAPI()

# 从 Header 提取:X-API-Key: your-key
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)

# 从 Query 提取:?api_key=your-key
api_key_query = APIKeyQuery(name="api_key", auto_error=False)

VALID_API_KEYS = {"key-abc-123", "key-xyz-456"}

async def get_api_key(
    key_from_header: Annotated[str | None, Security(api_key_header)] = None,
    key_from_query: Annotated[str | None, Security(api_key_query)] = None,
) -> str:
    api_key = key_from_header or key_from_query
    if not api_key or api_key not in VALID_API_KEYS:
        raise HTTPException(status_code=403, detail="无效 API Key")
    return api_key

@app.get("/data")
async def get_data(api_key: Annotated[str, Depends(get_api_key)]):
    return {"data": "机密数据", "key_used": api_key[:8] + "..."}
安全最佳实践清单 1. SECRET_KEY 必须足够复杂(32+ 字符),从环境变量读取,绝不硬编码在代码中。 2. Access Token 有效期设置短(15-30 分钟),Refresh Token 用于续期(7-30 天)。 3. 密码只存储 bcrypt/argon2 哈希值,日志中不打印密码、Token 等敏感信息。 4. 生产环境启用 HTTPS,防止 Token 在传输中被截获。 5. 实现 Token 吊销机制(黑名单存储在 Redis),防止 Token 泄露后无法撤销。
本章小结 FastAPI 的认证体系:OAuth2PasswordBearer 提取 Token、python-jose 生成/验证 JWT、passlib/bcrypt 哈希密码、CORSMiddleware 处理跨域。通过依赖注入将认证逻辑与业务逻辑分离,实现代码复用和易于测试。下一章探索 FastAPI 的异步能力:WebSocket、SSE 流式响应与文件上传。