Chapter 05

并发与 OTP

Actor 模型实战——从轻量进程到工业级容错系统,GenServer 与 Supervisor 让你的应用永不宕机

BEAM 进程基础:spawn / send / receive

BEAM 进程是并发的基本单元,极度轻量(约 2KB 初始堆),百万级并发不是神话。

# spawn 创建新进程
pid = spawn(fn ->
  IO.puts("Hello from process #{inspect(self())}")
end)

# self() 获取当前进程 PID
IO.puts("Main PID: #{inspect(self())}")

# send 发送消息
send(pid, {:hello, "world"})

# receive 接收消息(阻塞直到匹配)
receive do
  {:hello, msg} ->
    IO.puts("收到消息: #{msg}")
  :stop ->
    IO.puts("停止")
after
  5000 ->   # 超时 5 秒
    IO.puts("超时")
end

# Process.alive? 检查进程是否存活
Process.alive?(pid)   # false(进程执行完毕后退出)

Task:异步任务

Task 是对进程的高层封装,专为"运行一个异步任务,等待结果"而设计。

# 并发执行多个 HTTP 请求
urls = ["https://api1.com", "https://api2.com", "https://api3.com"]

tasks = Enum.map(urls, fn url ->
  Task.async(fn -> fetch(url) end)
end)

# 等待所有任务完成,收集结果
results = Task.await_many(tasks, 5000)  # 最多等 5 秒

# Task.async_stream:并发处理列表(自动限制并发数)
[1, 2, 3, 4, 5]
|> Task.async_stream(&process_item/1,
     max_concurrency: 3,   # 同时最多 3 个
     timeout: 10_000)
|> Enum.to_list()

GenServer:有状态服务器

GenServer(Generic Server)是 OTP 最核心的行为模块。它封装了进程的消息循环,让你专注于业务逻辑:定义初始状态和如何处理各种消息。

defmodule ShoppingCart do
  use GenServer

  # ── 客户端 API(公开函数)──

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, [], opts)
  end

  def add_item(pid, item) do
    GenServer.cast(pid, {:add, item})  # cast:异步,不等返回值
  end

  def remove_item(pid, item_id) do
    GenServer.cast(pid, {:remove, item_id})
  end

  def get_cart(pid) do
    GenServer.call(pid, :get)   # call:同步,等待返回值
  end

  def total(pid) do
    GenServer.call(pid, :total)
  end

  # ── 服务器回调(实现细节)──

  @impl true
  def init(_opts) do
    {:ok, []}  # 初始状态:空列表
  end

  @impl true
  def handle_cast({:add, item}, state) do
    {:noreply, [item | state]}
  end

  @impl true
  def handle_cast({:remove, id}, state) do
    new_state = Enum.reject(state, &(&1.id == id))
    {:noreply, new_state}
  end

  @impl true
  def handle_call(:get, _from, state) do
    {:reply, state, state}  # {:reply, 返回值, 新状态}
  end

  @impl true
  def handle_call(:total, _from, state) do
    total = Enum.sum(Enum.map(state, &(&1.price)))
    {:reply, total, state}
  end
end

# 使用
{:ok, cart} = ShoppingCart.start_link()
ShoppingCart.add_item(cart, %{id: 1, name: "键盘", price: 299})
ShoppingCart.add_item(cart, %{id: 2, name: "鼠标", price: 199})
ShoppingCart.total(cart)   # 498

Supervisor:监督树与容错

Supervisor 监视子进程,当子进程崩溃时按策略自动重启,这就是"Let it crash"哲学的实现基础。

defmodule MyApp.Supervisor do
  use Supervisor

  def start_link(opts) do
    Supervisor.start_link(__MODULE__, :ok, opts)
  end

  @impl true
  def init(:ok) do
    children = [
      MyApp.Repo,          # Ecto 数据库
      MyApp.Cache,         # 缓存 GenServer
      {ShoppingCart, []},   # 购物车服务
      MyAppWeb.Endpoint    # Phoenix HTTP 端点
    ]

    Supervisor.init(children, strategy: :one_for_one)
    # 策略::one_for_one   — 哪个子进程挂了重启哪个
    #       :one_for_all   — 任意一个挂了,全部重启
    #       :rest_for_one  — 挂了的及其之后的子进程都重启
  end
end

Agent:简单状态

Agent 是比 GenServer 更简单的状态封装,适合只需要存取状态而无复杂逻辑的场景:

# 创建计数器
{:ok, counter} = Agent.start_link(fn -> 0 end)

Agent.get(counter, &(&1))          # 0
Agent.update(counter, &(&1 + 1))   # 加1
Agent.get(counter, &(&1))          # 1

# 原子性 get_and_update
Agent.get_and_update(counter, fn n -> {n, n + 1} end)
# 返回当前值并更新

实战:并发 Web 爬虫

defmodule Crawler do
  @doc "并发抓取多个 URL,返回 {url, status, title} 列表"
  def crawl_all(urls, max_concurrency \\ 10) do
    urls
    |> Task.async_stream(&crawl_url/1,
         max_concurrency: max_concurrency,
         timeout: 15_000,
         on_timeout: :kill_task)
    |> Enum.map(fn
      {:ok, result}   -> result
      {:exit, reason} -> %{error: reason}
    end)
  end

  defp crawl_url(url) do
    case HTTPoison.get(url, [], follow_redirect: true) do
      {:ok, %{status_code: code, body: body}} ->
        %{url: url, status: code, title: extract_title(body)}
      {:error, reason} ->
        %{url: url, error: inspect(reason)}
    end
  end

  defp extract_title(html) do
    case Regex.run(~r/<title>(.*?)<\/title>/si, html) do
      [_, title] -> String.trim(title)
      _          -> "无标题"
    end
  end
end

# 使用:10 个并发抓取 100 个 URL,比顺序快约 10 倍
urls = File.read!("urls.txt") |> String.split("\n", trim: true)
results = Crawler.crawl_all(urls)

本章小结:BEAM 进程是并发的基本单元,极轻量且完全隔离。Task 用于一次性异步任务;GenServer 封装有状态服务,是 OTP 的核心;Supervisor 构建容错监督树,子进程崩溃自动重启;Agent 简化纯状态存取。这些工具共同构成了 Elixir 在高并发领域的核心竞争力。下一章进入 Phoenix Web 框架。