Chapter 05

数据库集成(SQLAlchemy + Alembic)

使用异步 SQLAlchemy 2.0 实现生产级数据库操作,通过 Alembic 管理数据库迁移,构建完整用户系统 CRUD API。

安装依赖

# SQLAlchemy 异步支持(aiosqlite 用于 SQLite,asyncpg 用于 PostgreSQL)
pip install sqlalchemy[asyncio] aiosqlite asyncpg

# Alembic 数据库迁移工具
pip install alembic

# PostgreSQL 生产环境完整安装
pip install "fastapi[standard]" sqlalchemy asyncpg alembic pydantic-settings

数据库连接配置

# app/database.py
from sqlalchemy.ext.asyncio import (
    create_async_engine, AsyncSession, async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase

# SQLite(开发/测试)
DATABASE_URL = "sqlite+aiosqlite:///./app.db"

# PostgreSQL(生产环境)
# DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"

# 创建异步引擎
engine = create_async_engine(
    DATABASE_URL,
    echo=True,           # 打印 SQL 语句(开发环境用)
    pool_size=5,          # 连接池大小
    max_overflow=10,      # 超出 pool_size 时最多额外创建的连接数
)

# 会话工厂
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False  # commit 后不过期,方便在响应中访问属性
)

# 所有 ORM 模型的基类
class Base(DeclarativeBase):
    pass

ORM 模型定义

# app/models/user.py
from sqlalchemy import String, Boolean, DateTime, ForeignKey, Integer, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from datetime import datetime
from app.database import Base

class User(Base):
    """用户表"""
    __tablename__ = "users"

    # Mapped[] 是 SQLAlchemy 2.0 的类型注解方式
    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
    email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
    hashed_password: Mapped[str] = mapped_column(String(255))
    full_name: Mapped[str | None] = mapped_column(String(100), nullable=True)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    is_admin: Mapped[bool] = mapped_column(Boolean, default=False)

    # 自动记录时间戳
    created_at: Mapped[datetime] = mapped_column(
        DateTime, server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime, server_default=func.now(), onupdate=func.now()
    )

    # 一对多关系:一个用户可以有多篇文章
    articles: Mapped[list["Article"]] = relationship(
        "Article", back_populates="author", cascade="all, delete-orphan"
    )

class Article(Base):
    """文章表"""
    __tablename__ = "articles"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(Text)
    published: Mapped[bool] = mapped_column(Boolean, default=False)
    created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())

    # 外键(指向 users 表)
    author_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"))
    author: Mapped[User] = relationship("User", back_populates="articles")

完整 CRUD 操作

# app/services/user_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from sqlalchemy.orm import selectinload
from app.models.user import User

# ── Create ────────────────────────────────────────────────
async def create_user(db: AsyncSession, username: str, email: str, hashed_pw: str) -> User:
    user = User(username=username, email=email, hashed_password=hashed_pw)
    db.add(user)
    await db.flush()   # 刷新到 DB,生成 ID,但不提交事务
    await db.refresh(user)  # 从 DB 重新加载(获取 server_default 字段)
    return user

# ── Read ──────────────────────────────────────────────────
async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
    result = await db.execute(select(User).where(User.id == user_id))
    return result.scalar_one_or_none()

async def get_users(db: AsyncSession, skip: int = 0, limit: int = 20) -> list[User]:
    result = await db.execute(
        select(User)
        .where(User.is_active == True)
        .offset(skip).limit(limit)
        .order_by(User.created_at.desc())
    )
    return list(result.scalars().all())

# 加载关联数据(避免 N+1 问题)
async def get_user_with_articles(db: AsyncSession, user_id: int) -> User | None:
    result = await db.execute(
        select(User)
        .options(selectinload(User.articles))  # 一次查询加载关联
        .where(User.id == user_id)
    )
    return result.scalar_one_or_none()

# ── Update ────────────────────────────────────────────────
async def update_user(db: AsyncSession, user_id: int, **kwargs) -> User | None:
    await db.execute(
        update(User).where(User.id == user_id).values(**kwargs)
    )
    return await get_user_by_id(db, user_id)

# ── Delete ────────────────────────────────────────────────
async def delete_user(db: AsyncSession, user_id: int) -> bool:
    result = await db.execute(delete(User).where(User.id == user_id))
    return result.rowcount > 0

路由集成(完整用户 API)

# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from typing import Annotated
from app.database import get_db
import app.services.user_service as svc

router = APIRouter(prefix="/users", tags=["用户"])
DB = Annotated[AsyncSession, Depends(get_db)]

class UserCreate(BaseModel):
    username: str
    email: str
    password: str

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    is_active: bool
    model_config = {"from_attributes": True}

@router.get("/", response_model=list[UserResponse])
async def list_users(db: DB, skip: int = 0, limit: int = 20):
    return await svc.get_users(db, skip, limit)

@router.post("/", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate, db: DB):
    from passlib.context import CryptContext
    pwd_context = CryptContext(schemes=["bcrypt"])
    hashed = pwd_context.hash(user.password)
    return await svc.create_user(db, user.username, user.email, hashed)

@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: DB):
    user = await svc.get_user_by_id(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    return user

@router.delete("/{user_id}", status_code=204)
async def delete_user(user_id: int, db: DB):
    ok = await svc.delete_user(db, user_id)
    if not ok:
        raise HTTPException(status_code=404, detail="用户不存在")

Alembic 数据库迁移

# 初始化 Alembic
alembic init alembic

# 修改 alembic.ini 中的数据库 URL
# sqlalchemy.url = sqlite:///./app.db

# 修改 alembic/env.py,导入 ORM Base:
# from app.database import Base
# target_metadata = Base.metadata

# 生成迁移脚本(自动检测模型变化)
alembic revision --autogenerate -m "create users and articles tables"

# 应用迁移(升级到最新版本)
alembic upgrade head

# 回滚一步
alembic downgrade -1

# 查看当前版本
alembic current

# 查看迁移历史
alembic history --verbose
生产环境迁移注意事项 永远不要在生产数据库上直接运行 Base.metadata.create_all(),只用 Alembic 管理 schema 变更。每次模型变更都生成独立的迁移脚本,迁移脚本提交到 Git,部署时通过 CI/CD 自动执行 alembic upgrade head
本章小结 异步 SQLAlchemy 2.0 是 FastAPI 数据库集成的最佳选择:create_async_engine + AsyncSession 实现全异步数据库操作,Mapped[] 类型注解让 ORM 模型更清晰,Alembic 管理数据库版本迁移。下一章实现完整的 JWT 认证系统。