安装依赖
# SQLAlchemy 异步支持(aiosqlite 用于 SQLite,asyncpg 用于 PostgreSQL)
pip install sqlalchemy[asyncio] aiosqlite asyncpg
# Alembic 数据库迁移工具
pip install alembic
# PostgreSQL 生产环境完整安装
pip install "fastapi[standard]" sqlalchemy asyncpg alembic pydantic-settings
数据库连接配置
# app/database.py
from sqlalchemy.ext.asyncio import (
create_async_engine, AsyncSession, async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase
# SQLite(开发/测试)
DATABASE_URL = "sqlite+aiosqlite:///./app.db"
# PostgreSQL(生产环境)
# DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"
# 创建异步引擎
engine = create_async_engine(
DATABASE_URL,
echo=True, # 打印 SQL 语句(开发环境用)
pool_size=5, # 连接池大小
max_overflow=10, # 超出 pool_size 时最多额外创建的连接数
)
# 会话工厂
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False # commit 后不过期,方便在响应中访问属性
)
# 所有 ORM 模型的基类
class Base(DeclarativeBase):
pass
ORM 模型定义
# app/models/user.py
from sqlalchemy import String, Boolean, DateTime, ForeignKey, Integer, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from datetime import datetime
from app.database import Base
class User(Base):
"""用户表"""
__tablename__ = "users"
# Mapped[] 是 SQLAlchemy 2.0 的类型注解方式
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
full_name: Mapped[str | None] = mapped_column(String(100), nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
is_admin: Mapped[bool] = mapped_column(Boolean, default=False)
# 自动记录时间戳
created_at: Mapped[datetime] = mapped_column(
DateTime, server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, server_default=func.now(), onupdate=func.now()
)
# 一对多关系:一个用户可以有多篇文章
articles: Mapped[list["Article"]] = relationship(
"Article", back_populates="author", cascade="all, delete-orphan"
)
class Article(Base):
"""文章表"""
__tablename__ = "articles"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(Text)
published: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
# 外键(指向 users 表)
author_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"))
author: Mapped[User] = relationship("User", back_populates="articles")
完整 CRUD 操作
# app/services/user_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from sqlalchemy.orm import selectinload
from app.models.user import User
# ── Create ────────────────────────────────────────────────
async def create_user(db: AsyncSession, username: str, email: str, hashed_pw: str) -> User:
user = User(username=username, email=email, hashed_password=hashed_pw)
db.add(user)
await db.flush() # 刷新到 DB,生成 ID,但不提交事务
await db.refresh(user) # 从 DB 重新加载(获取 server_default 字段)
return user
# ── Read ──────────────────────────────────────────────────
async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 20) -> list[User]:
result = await db.execute(
select(User)
.where(User.is_active == True)
.offset(skip).limit(limit)
.order_by(User.created_at.desc())
)
return list(result.scalars().all())
# 加载关联数据(避免 N+1 问题)
async def get_user_with_articles(db: AsyncSession, user_id: int) -> User | None:
result = await db.execute(
select(User)
.options(selectinload(User.articles)) # 一次查询加载关联
.where(User.id == user_id)
)
return result.scalar_one_or_none()
# ── Update ────────────────────────────────────────────────
async def update_user(db: AsyncSession, user_id: int, **kwargs) -> User | None:
await db.execute(
update(User).where(User.id == user_id).values(**kwargs)
)
return await get_user_by_id(db, user_id)
# ── Delete ────────────────────────────────────────────────
async def delete_user(db: AsyncSession, user_id: int) -> bool:
result = await db.execute(delete(User).where(User.id == user_id))
return result.rowcount > 0
路由集成(完整用户 API)
# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from typing import Annotated
from app.database import get_db
import app.services.user_service as svc
router = APIRouter(prefix="/users", tags=["用户"])
DB = Annotated[AsyncSession, Depends(get_db)]
class UserCreate(BaseModel):
username: str
email: str
password: str
class UserResponse(BaseModel):
id: int
username: str
email: str
is_active: bool
model_config = {"from_attributes": True}
@router.get("/", response_model=list[UserResponse])
async def list_users(db: DB, skip: int = 0, limit: int = 20):
return await svc.get_users(db, skip, limit)
@router.post("/", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate, db: DB):
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"])
hashed = pwd_context.hash(user.password)
return await svc.create_user(db, user.username, user.email, hashed)
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: DB):
user = await svc.get_user_by_id(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return user
@router.delete("/{user_id}", status_code=204)
async def delete_user(user_id: int, db: DB):
ok = await svc.delete_user(db, user_id)
if not ok:
raise HTTPException(status_code=404, detail="用户不存在")
Alembic 数据库迁移
# 初始化 Alembic
alembic init alembic
# 修改 alembic.ini 中的数据库 URL
# sqlalchemy.url = sqlite:///./app.db
# 修改 alembic/env.py,导入 ORM Base:
# from app.database import Base
# target_metadata = Base.metadata
# 生成迁移脚本(自动检测模型变化)
alembic revision --autogenerate -m "create users and articles tables"
# 应用迁移(升级到最新版本)
alembic upgrade head
# 回滚一步
alembic downgrade -1
# 查看当前版本
alembic current
# 查看迁移历史
alembic history --verbose
生产环境迁移注意事项
永远不要在生产数据库上直接运行
Base.metadata.create_all(),只用 Alembic 管理 schema 变更。每次模型变更都生成独立的迁移脚本,迁移脚本提交到 Git,部署时通过 CI/CD 自动执行 alembic upgrade head。
本章小结
异步 SQLAlchemy 2.0 是 FastAPI 数据库集成的最佳选择:create_async_engine + AsyncSession 实现全异步数据库操作,Mapped[] 类型注解让 ORM 模型更清晰,Alembic 管理数据库版本迁移。下一章实现完整的 JWT 认证系统。