BEAM 进程基础:spawn / send / receive
BEAM 进程是并发的基本单元,极度轻量(约 2KB 初始堆),百万级并发不是神话。
# spawn 创建新进程
pid = spawn(fn ->
IO.puts("Hello from process #{inspect(self())}")
end)
# self() 获取当前进程 PID
IO.puts("Main PID: #{inspect(self())}")
# send 发送消息
send(pid, {:hello, "world"})
# receive 接收消息(阻塞直到匹配)
receive do
{:hello, msg} ->
IO.puts("收到消息: #{msg}")
:stop ->
IO.puts("停止")
after
5000 -> # 超时 5 秒
IO.puts("超时")
end
# Process.alive? 检查进程是否存活
Process.alive?(pid) # false(进程执行完毕后退出)
Task:异步任务
Task 是对进程的高层封装,专为"运行一个异步任务,等待结果"而设计。
# 并发执行多个 HTTP 请求
urls = ["https://api1.com", "https://api2.com", "https://api3.com"]
tasks = Enum.map(urls, fn url ->
Task.async(fn -> fetch(url) end)
end)
# 等待所有任务完成,收集结果
results = Task.await_many(tasks, 5000) # 最多等 5 秒
# Task.async_stream:并发处理列表(自动限制并发数)
[1, 2, 3, 4, 5]
|> Task.async_stream(&process_item/1,
max_concurrency: 3, # 同时最多 3 个
timeout: 10_000)
|> Enum.to_list()
GenServer:有状态服务器
GenServer(Generic Server)是 OTP 最核心的行为模块。它封装了进程的消息循环,让你专注于业务逻辑:定义初始状态和如何处理各种消息。
defmodule ShoppingCart do
use GenServer
# ── 客户端 API(公开函数)──
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, [], opts)
end
def add_item(pid, item) do
GenServer.cast(pid, {:add, item}) # cast:异步,不等返回值
end
def remove_item(pid, item_id) do
GenServer.cast(pid, {:remove, item_id})
end
def get_cart(pid) do
GenServer.call(pid, :get) # call:同步,等待返回值
end
def total(pid) do
GenServer.call(pid, :total)
end
# ── 服务器回调(实现细节)──
@impl true
def init(_opts) do
{:ok, []} # 初始状态:空列表
end
@impl true
def handle_cast({:add, item}, state) do
{:noreply, [item | state]}
end
@impl true
def handle_cast({:remove, id}, state) do
new_state = Enum.reject(state, &(&1.id == id))
{:noreply, new_state}
end
@impl true
def handle_call(:get, _from, state) do
{:reply, state, state} # {:reply, 返回值, 新状态}
end
@impl true
def handle_call(:total, _from, state) do
total = Enum.sum(Enum.map(state, &(&1.price)))
{:reply, total, state}
end
end
# 使用
{:ok, cart} = ShoppingCart.start_link()
ShoppingCart.add_item(cart, %{id: 1, name: "键盘", price: 299})
ShoppingCart.add_item(cart, %{id: 2, name: "鼠标", price: 199})
ShoppingCart.total(cart) # 498
Supervisor:监督树与容错
Supervisor 监视子进程,当子进程崩溃时按策略自动重启,这就是"Let it crash"哲学的实现基础。
defmodule MyApp.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, :ok, opts)
end
@impl true
def init(:ok) do
children = [
MyApp.Repo, # Ecto 数据库
MyApp.Cache, # 缓存 GenServer
{ShoppingCart, []}, # 购物车服务
MyAppWeb.Endpoint # Phoenix HTTP 端点
]
Supervisor.init(children, strategy: :one_for_one)
# 策略::one_for_one — 哪个子进程挂了重启哪个
# :one_for_all — 任意一个挂了,全部重启
# :rest_for_one — 挂了的及其之后的子进程都重启
end
end
Agent:简单状态
Agent 是比 GenServer 更简单的状态封装,适合只需要存取状态而无复杂逻辑的场景:
# 创建计数器
{:ok, counter} = Agent.start_link(fn -> 0 end)
Agent.get(counter, &(&1)) # 0
Agent.update(counter, &(&1 + 1)) # 加1
Agent.get(counter, &(&1)) # 1
# 原子性 get_and_update
Agent.get_and_update(counter, fn n -> {n, n + 1} end)
# 返回当前值并更新
实战:并发 Web 爬虫
defmodule Crawler do
@doc "并发抓取多个 URL,返回 {url, status, title} 列表"
def crawl_all(urls, max_concurrency \\ 10) do
urls
|> Task.async_stream(&crawl_url/1,
max_concurrency: max_concurrency,
timeout: 15_000,
on_timeout: :kill_task)
|> Enum.map(fn
{:ok, result} -> result
{:exit, reason} -> %{error: reason}
end)
end
defp crawl_url(url) do
case HTTPoison.get(url, [], follow_redirect: true) do
{:ok, %{status_code: code, body: body}} ->
%{url: url, status: code, title: extract_title(body)}
{:error, reason} ->
%{url: url, error: inspect(reason)}
end
end
defp extract_title(html) do
case Regex.run(~r/<title>(.*?)<\/title>/si, html) do
[_, title] -> String.trim(title)
_ -> "无标题"
end
end
end
# 使用:10 个并发抓取 100 个 URL,比顺序快约 10 倍
urls = File.read!("urls.txt") |> String.split("\n", trim: true)
results = Crawler.crawl_all(urls)
本章小结:BEAM 进程是并发的基本单元,极轻量且完全隔离。Task 用于一次性异步任务;GenServer 封装有状态服务,是 OTP 的核心;Supervisor 构建容错监督树,子进程崩溃自动重启;Agent 简化纯状态存取。这些工具共同构成了 Elixir 在高并发领域的核心竞争力。下一章进入 Phoenix Web 框架。