8.1 事务的意义
事务(Transaction)是数据库保证原子性的手段——把多条 SQL 打包成一个单元,要么全部 commit、要么全部 rollback。
经典场景:转账
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;
如果只有第一条执行完程序崩了,就会丢钱——事务保证不会出现这种中间状态。
8.2 db.transaction
await db.transaction(async (tx) => {
await tx.update(accounts)
.set({ balance: sql`${accounts.balance} - 100` })
.where(eq(accounts.id, 1));
await tx.update(accounts)
.set({ balance: sql`${accounts.balance} + 100` })
.where(eq(accounts.id, 2));
});
// 回调内抛任何异常 → 自动 ROLLBACK;正常完成 → COMMIT
回调内必须用 tx,不是 db
事务边界只在 tx 上生效。用了 db 的那条 SQL 会走另一条连接——不在事务里,会破坏原子性。
8.3 手动 rollback
import { TransactionRollbackError } from "drizzle-orm";
await db.transaction(async (tx) => {
const [acc] = await tx.select().from(accounts).where(eq(accounts.id, 1));
if (acc.balance < 100) {
tx.rollback(); // 直接抛 TransactionRollbackError 退出事务
}
// ...
});
8.4 隔离级别 isolation level
事务的隔离级别决定"并发事务之间可见什么"——四档(由低到高,代价也越大):
| 级别 | 能避免 | 典型用途 |
|---|---|---|
| read uncommitted | 什么都不保证 | 基本不用 |
| read committed | 脏读(PG 默认) | 大多数业务 |
| repeatable read | + 不可重复读(MySQL 默认) | 统计类读 |
| serializable | + 幻读(最强,像串行执行) | 金融、账单 |
await db.transaction(async (tx) => {
// ...
}, {
isolationLevel: "serializable",
accessMode: "read write", // 或 "read only"
deferrable: true, // 仅 PG serializable + read only
});
8.5 行级锁:for update
读一行并阻止其他事务修改它直到当前事务结束——经典"账户余额"模式:
await db.transaction(async (tx) => {
const [acc] = await tx.select()
.from(accounts)
.where(eq(accounts.id, 1))
.for("update"); // 锁定该行
if (acc.balance < 100) throw new Error("余额不足");
await tx.update(accounts)
.set({ balance: acc.balance - 100 })
.where(eq(accounts.id, 1));
});
锁的变体
.for("update")— 排他锁(其他事务不能读写该行).for("update", { noWait: true })— 锁不到就立刻报错(不等).for("update", { skipLocked: true })— 跳过被锁的行(队列消费模式).for("share")— 共享锁(其他事务能读但不能写)
8.6 乐观锁:版本号
不愿用行级锁(担心死锁/性能)时,走乐观锁:表里加 version 列,update 时带条件:
const r = await db.update(accounts)
.set({ balance: newBalance, version: sql`${accounts.version} + 1` })
.where(and(
eq(accounts.id, 1),
eq(accounts.version, expectedVersion),
));
if (r.rowCount === 0) {
throw new Error("被别人先改了,重试");
}
8.7 嵌套事务 / savepoint
Drizzle 支持嵌套事务——用 SAVEPOINT 实现"局部可撤销":
await db.transaction(async (tx) => {
await tx.insert(logs).values({ msg: "start" });
try {
await tx.transaction(async (sub) => {
await sub.insert(logs).values({ msg: "inner" });
throw new Error("出错");
});
} catch {
// 内层回滚,外层照样继续
}
await tx.insert(logs).values({ msg: "end" });
});
8.8 batch(Edge 批执行)
某些 Edge driver(Turso、Planetscale、Neon)支持一次 HTTP 发多条 SQL——Drizzle 的 db.batch:
const [u, p] = await db.batch([
db.insert(users).values({ name: "Eve" }).returning(),
db.insert(posts).values({ title: "Hello", authorId: 1 }).returning(),
]);
// 一次 RTT 完成两次插入。不是事务,但通常是原子(driver 级别保证)
若 driver 要求事务语义,第一条传 sql`BEGIN`、最后 sql`COMMIT`。
8.9 事务 + 业务服务层模式
async function transfer(fromId: number, toId: number, amount: number) {
return db.transaction(async (tx) => {
const [from] = await tx.select().from(accounts).where(eq(accounts.id, fromId)).for("update");
if (from.balance < amount) throw new Error("INSUFFICIENT_FUNDS");
await tx.update(accounts).set({ balance: from.balance - amount }).where(eq(accounts.id, fromId));
await tx.update(accounts).set({ balance: sql`${accounts.balance} + ${amount}` }).where(eq(accounts.id, toId));
await tx.insert(ledger).values({ fromId, toId, amount });
});
}
8.10 常见陷阱
- 事务里做 HTTP 调用——HTTP 可能卡几秒,数据库连接/锁被占用。事务尽量只做 DB 工作。
- 事务里 console.log 打印了敏感数据——审计日志也要考虑隔离。
- 忘记返回事务结果:
db.transaction(async tx => { ... })的返回值就是回调的返回值。 - MySQL 隔离级别默认 REPEATABLE READ——与 Postgres 的 READ COMMITTED 不同,行为会有差异。
8.11 小结
- 用
db.transaction(async tx => { ... })包住一组写操作。 - 回调里只用 tx——出错自动回滚。
- 重要业务用
.for("update")行锁或serializable隔离级。 - 乐观锁 = 版本号列 + 条件 update + 检查 rowCount。
- 嵌套事务走 savepoint;Edge 多条无事务用
db.batch。