Chapter 08

事务与锁

让一组读写"要么全部成功,要么全部撤销"——并理解隔离级别和行锁,避免脏数据与竞争。

8.1 事务的意义

事务(Transaction)是数据库保证原子性的手段——把多条 SQL 打包成一个单元,要么全部 commit、要么全部 rollback。

经典场景:转账

BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;

如果只有第一条执行完程序崩了,就会丢钱——事务保证不会出现这种中间状态。

8.2 db.transaction

await db.transaction(async (tx) => {
  await tx.update(accounts)
    .set({ balance: sql`${accounts.balance} - 100` })
    .where(eq(accounts.id, 1));

  await tx.update(accounts)
    .set({ balance: sql`${accounts.balance} + 100` })
    .where(eq(accounts.id, 2));
});
// 回调内抛任何异常 → 自动 ROLLBACK;正常完成 → COMMIT
回调内必须用 tx,不是 db

事务边界只在 tx 上生效。用了 db 的那条 SQL 会走另一条连接——不在事务里,会破坏原子性。

8.3 手动 rollback

import { TransactionRollbackError } from "drizzle-orm";

await db.transaction(async (tx) => {
  const [acc] = await tx.select().from(accounts).where(eq(accounts.id, 1));
  if (acc.balance < 100) {
    tx.rollback();    // 直接抛 TransactionRollbackError 退出事务
  }
  // ...
});

8.4 隔离级别 isolation level

事务的隔离级别决定"并发事务之间可见什么"——四档(由低到高,代价也越大):

级别能避免典型用途
read uncommitted什么都不保证基本不用
read committed脏读(PG 默认)大多数业务
repeatable read+ 不可重复读(MySQL 默认)统计类读
serializable+ 幻读(最强,像串行执行)金融、账单
await db.transaction(async (tx) => {
  // ...
}, {
  isolationLevel: "serializable",
  accessMode: "read write",     // 或 "read only"
  deferrable: true,              // 仅 PG serializable + read only
});

8.5 行级锁:for update

读一行并阻止其他事务修改它直到当前事务结束——经典"账户余额"模式:

await db.transaction(async (tx) => {
  const [acc] = await tx.select()
    .from(accounts)
    .where(eq(accounts.id, 1))
    .for("update");     // 锁定该行

  if (acc.balance < 100) throw new Error("余额不足");

  await tx.update(accounts)
    .set({ balance: acc.balance - 100 })
    .where(eq(accounts.id, 1));
});

锁的变体

8.6 乐观锁:版本号

不愿用行级锁(担心死锁/性能)时,走乐观锁:表里加 version 列,update 时带条件:

const r = await db.update(accounts)
  .set({ balance: newBalance, version: sql`${accounts.version} + 1` })
  .where(and(
    eq(accounts.id, 1),
    eq(accounts.version, expectedVersion),
  ));

if (r.rowCount === 0) {
  throw new Error("被别人先改了,重试");
}

8.7 嵌套事务 / savepoint

Drizzle 支持嵌套事务——用 SAVEPOINT 实现"局部可撤销":

await db.transaction(async (tx) => {
  await tx.insert(logs).values({ msg: "start" });

  try {
    await tx.transaction(async (sub) => {
      await sub.insert(logs).values({ msg: "inner" });
      throw new Error("出错");
    });
  } catch {
    // 内层回滚,外层照样继续
  }

  await tx.insert(logs).values({ msg: "end" });
});

8.8 batch(Edge 批执行)

某些 Edge driver(Turso、Planetscale、Neon)支持一次 HTTP 发多条 SQL——Drizzle 的 db.batch

const [u, p] = await db.batch([
  db.insert(users).values({ name: "Eve" }).returning(),
  db.insert(posts).values({ title: "Hello", authorId: 1 }).returning(),
]);
// 一次 RTT 完成两次插入。不是事务,但通常是原子(driver 级别保证)

若 driver 要求事务语义,第一条传 sql`BEGIN`、最后 sql`COMMIT`

8.9 事务 + 业务服务层模式

async function transfer(fromId: number, toId: number, amount: number) {
  return db.transaction(async (tx) => {
    const [from] = await tx.select().from(accounts).where(eq(accounts.id, fromId)).for("update");
    if (from.balance < amount) throw new Error("INSUFFICIENT_FUNDS");

    await tx.update(accounts).set({ balance: from.balance - amount }).where(eq(accounts.id, fromId));
    await tx.update(accounts).set({ balance: sql`${accounts.balance} + ${amount}` }).where(eq(accounts.id, toId));
    await tx.insert(ledger).values({ fromId, toId, amount });
  });
}

8.10 常见陷阱

8.11 小结