Chapter 09

Ecto 数据库深度

从关联建模到复杂查询 DSL——掌握 Elixir 生态中最优雅的数据库操作层

博客系统完整数据建模

以一个博客系统为例,设计包含用户、文章、评论、标签的完整数据模型:

# lib/blog/accounts/user.ex
defmodule Blog.Accounts.User do
  use Ecto.Schema
  import Ecto.Changeset

  schema "users" do
    field :email,            :string
    field :username,         :string
    field :password,         :string, virtual: true  # 虚拟字段
    field :hashed_password,  :string
    field :role,             Ecto.Enum, values: [:user, :admin], default: :user

    has_many :posts,     Blog.Posts.Post
    has_many :comments,  Blog.Posts.Comment

    timestamps()
  end

  def registration_changeset(user, attrs) do
    user
    |> cast(attrs, [:email, :username, :password])
    |> validate_required([:email, :username, :password])
    |> validate_format(:email, ~r/^[^\s]+@[^\s]+$/)
    |> validate_length(:password, min: 8)
    |> unique_constraint(:email)
    |> unique_constraint(:username)
    |> hash_password()
  end

  defp hash_password(changeset) do
    case changeset do
      %Ecto.Changeset{valid?: true, changes: %{password: pwd}} ->
        put_change(changeset, :hashed_password, Bcrypt.hash_pwd_salt(pwd))
      _ -> changeset
    end
  end
end

关联:belongs_to / has_many / many_to_many

# lib/blog/posts/post.ex
schema "posts" do
  field :title,        :string
  field :body,         :text
  field :slug,         :string
  field :published_at, :utc_datetime

  belongs_to :author, Blog.Accounts.User
  has_many   :comments, Blog.Posts.Comment, on_delete: :delete_all

  # 多对多:文章与标签
  many_to_many :tags, Blog.Posts.Tag,
    join_through: "posts_tags",
    on_replace: :delete

  timestamps()
end

# lib/blog/posts/tag.ex
schema "tags" do
  field :name, :string
  many_to_many :posts, Blog.Posts.Post, join_through: "posts_tags"
  timestamps()
end

Ecto.Query DSL:复杂查询

import Ecto.Query

# 基本查询
from(p in Post, where: p.published_at != ^nil)
|> Repo.all()

# 管道风格(更常用)
Post
|> where([p], p.published_at != ^nil)
|> where([p], ilike(p.title, "%#{search}%"))
|> order_by([p], desc: p.published_at)
|> limit(20)
|> offset(40)
|> Repo.all()

# JOIN 关联查询
Post
|> join(:inner, [p], a in assoc(p, :author))
|> where([p, a], a.role == :admin)
|> select([p, a], {p, a.username})
|> Repo.all()

# 聚合查询
Post
|> group_by([p], p.author_id)
|> select([p], %{author_id: p.author_id, count: count(p.id)})
|> order_by([p], desc: count(p.id))
|> having([p], count(p.id) > 5)
|> Repo.all()

N+1 问题与 preload

N+1 问题是 ORM 中常见陷阱:查询 N 篇文章,又为每篇文章发一次查询获取作者,共 N+1 次查询。

# ❌ N+1 问题:每篇文章触发一次额外查询
posts = Repo.all(Post)
Enum.each(posts, fn post ->
  IO.puts(post.author.name)   # 触发 N 次 SELECT
end)

# ✅ 方案1:preload(两次查询,推荐)
posts = Post
  |> preload(:author)
  |> Repo.all()
# 执行:SELECT * FROM posts; SELECT * FROM users WHERE id IN (...);

# 嵌套 preload
Post
|> preload([author: :profile, comments: :author])
|> Repo.all()

# ✅ 方案2:join + preload(单次查询,复杂场景)
Post
|> join(:left, [p], a in assoc(p, :author))
|> preload([p, a], author: a)
|> Repo.all()

事务

# 原子性操作:创建用户同时发送欢迎邮件积分
Repo.transaction(fn ->
  with {:ok, user}    <- Repo.insert(%User{} |> User.changeset(attrs)),
       {:ok, _points} <- award_welcome_points(user) do
    user
  else
    {:error, reason} -> Repo.rollback(reason)
  end
end)

# Ecto.Multi:更结构化的事务
Ecto.Multi.new()
|> Ecto.Multi.insert(:user, User.changeset(%User{}, attrs))
|> Ecto.Multi.run(:points, fn _repo, %{user: user} ->
  award_welcome_points(user)
end)
|> Ecto.Multi.insert(:log, fn %{user: user} ->
  %AuditLog{user_id: user.id, action: "register"}
end)
|> Repo.transaction()
# 返回 {:ok, %{user: user, points: points, log: log}}
# 或 {:error, :user, changeset, %{}} 精确标识失败步骤

虚拟字段与自定义类型

# 虚拟字段:不持久化到数据库,用于表单处理
schema "users" do
  field :password,              :string, virtual: true
  field :password_confirmation,  :string, virtual: true
  field :hashed_password,       :string
end

# 验证两次密码一致
def changeset(user, attrs) do
  user
  |> cast(attrs, [:password, :password_confirmation])
  |> validate_confirmation(:password, message: "两次密码不一致")
end

# 自定义 Ecto 类型:Money
defmodule Blog.Types.Money do
  use Ecto.Type

  def type, do: :integer  # 数据库存储整数(分)

  def cast(amount) when is_float(amount), do: {:ok, round(amount * 100)}
  def cast(amount) when is_integer(amount), do: {:ok, amount}
  def cast(_), do: :error

  def load(cents), do: {:ok, cents / 100.0}  # 读出时转为元
  def dump(amount), do: {:ok, round(amount * 100)}
end

本章小结:Ecto 是 Elixir 生态中最优秀的数据库层之一。Schema 清晰描述数据结构;Changeset 集中处理验证逻辑;Query DSL 将 SQL 表达为组合式函数管道;preload 优雅解决 N+1 问题;Ecto.Multi 让复杂事务有清晰的错误追踪。最后一章我们聚焦测试与生产部署。