10.1 Python — asyncpg + SQLAlchemy 2.0
Python# asyncpg:最高性能的原生异步 PostgreSQL 驱动
# pip install asyncpg
import asyncio
import asyncpg
from contextlib import asynccontextmanager
# 连接池配置(生产关键参数)
async def create_pool():
return await asyncpg.create_pool(
dsn="postgresql://myuser:password@localhost:5432/mydb",
min_size=5, # 最小保持连接数
max_size=20, # 最大连接数(根据 max_connections 设置)
max_inactive_connection_lifetime=300, # 5 分钟无活动则关闭
command_timeout=30, # 单条语句超时 30 秒
server_settings={
"application_name": "myapp",
"search_path": "public",
}
)
# 基础查询
async def get_user(pool, user_id: int):
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT id, username, email, created_at FROM users WHERE id = $1",
user_id
)
return dict(row) if row else None
# 批量插入(高性能)
async def batch_insert_products(pool, products: list[dict]):
async with pool.acquire() as conn:
await conn.executemany(
"""
INSERT INTO products (sku, name, price, stock, category_id)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (sku) DO UPDATE SET
price = EXCLUDED.price,
stock = EXCLUDED.stock,
updated_at = NOW()
""",
[(p["sku"], p["name"], p["price"], p["stock"], p["category_id"])
for p in products]
)
# 事务
async def transfer(pool, from_id: int, to_id: int, amount: float):
async with pool.acquire() as conn:
async with conn.transaction():
balance = await conn.fetchval(
"SELECT balance FROM accounts WHERE id = $1 FOR UPDATE",
from_id
)
if balance < amount:
raise ValueError(f"余额不足: {balance}")
await conn.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, from_id
)
await conn.execute(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, to_id
)
Python# SQLAlchemy 2.0 异步 ORM(推荐用于大型项目)
# pip install sqlalchemy[asyncio] asyncpg
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, sessionmaker
from sqlalchemy import select, String
from datetime import datetime
engine = create_async_engine(
"postgresql+asyncpg://myuser:password@localhost:5432/mydb",
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # 取连接前检测是否存活
echo=False # 生产关闭 SQL 日志
)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
email: Mapped[str] = mapped_column(unique=True)
created_at: Mapped[datetime]
async def get_users_by_email_domain(domain: str) -> list[User]:
async with async_session() as session:
result = await session.execute(
select(User).where(User.email.like(f"%@{domain}"))
)
return result.scalars().all()
10.2 Go — pgx v5
Go// pgx v5:Go 生态最佳 PostgreSQL 驱动
// go get github.com/jackc/pgx/v5
package db
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// 连接池配置
func NewPool(ctx context.Context, connStr string) (*pgxpool.Pool, error) {
config, err := pgxpool.ParseConfig(connStr)
if err != nil {
return nil, fmt.Errorf("parse config: %w", err)
}
config.MinConns = 5
config.MaxConns = 25
config.MaxConnIdleTime = 5 * time.Minute
config.MaxConnLifetime = 1 * time.Hour
config.HealthCheckPeriod = 30 * time.Second
return pgxpool.NewWithConfig(ctx, config)
}
// 批量插入:使用 COPY 协议(比 INSERT 快 5-10 倍)
func BulkInsertOrders(ctx context.Context, pool *pgxpool.Pool, orders []Order) error {
conn, err := pool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
_, err = conn.CopyFrom(
ctx,
pgx.Identifier{"order_items"},
[]string{"order_id", "product_id", "quantity", "unit_price"},
pgx.CopyFromSlice(len(orders), func(i int) ([]any, error) {
o := orders[i]
return []any{o.OrderID, o.ProductID, o.Quantity, o.UnitPrice}, nil
}),
)
return err
}
// 使用 pgx.Batch 批量执行多条语句
func BatchUpdatePrices(ctx context.Context, pool *pgxpool.Pool, updates []PriceUpdate) error {
batch := &pgx.Batch{}
for _, u := range updates {
batch.Queue(
"UPDATE products SET price = $1, updated_at = NOW() WHERE sku = $2",
u.Price, u.SKU,
)
}
conn, err := pool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
results := conn.SendBatch(ctx, batch)
defer results.Close()
for i := 0; i < len(updates); i++ {
if _, err := results.Exec(); err != nil {
return fmt.Errorf("update %d: %w", i, err)
}
}
return nil
}
10.3 Node.js — postgres.js / Prisma
TypeScript// postgres.js:最简洁的 Node.js PostgreSQL 客户端
// npm install postgres
import postgres from 'postgres'
const sql = postgres({
host: 'localhost',
port: 5432,
database: 'mydb',
username: 'myuser',
password: 'mypassword',
max: 20, // 连接池大小
idle_timeout: 30, // 空闲超时(秒)
connect_timeout: 10, // 连接超时(秒)
prepare: true, // 自动 prepared statements
})
// 类型安全查询
interface User {
id: bigint
username: string
email: string
created_at: Date
}
async function getUserOrders(userId: number) {
const orders = await sql>`
SELECT
o.id AS order_id,
o.total,
o.status,
COUNT(oi.id) AS item_count
FROM orders o
LEFT JOIN order_items oi ON o.id = oi.order_id
WHERE o.user_id = ${userId}
GROUP BY o.id
ORDER BY o.created_at DESC
LIMIT 10
`
return orders
}
// 事务
async function createOrder(userId: number, items: OrderItem[]) {
return await sql.begin(async (tx) => {
const [order] = await tx<[{id: bigint}]>`
INSERT INTO orders (user_id, total) VALUES (${userId}, 0)
RETURNING id
`
await tx`
INSERT INTO order_items ${tx(items.map(i => ({
order_id: order.id,
product_id: i.productId,
quantity: i.quantity,
unit_price: i.price
})))}
`
const [updated] = await tx`
UPDATE orders SET total = (
SELECT SUM(quantity * unit_price) FROM order_items WHERE order_id = ${order.id}
)
WHERE id = ${order.id}
RETURNING total
`
return { orderId: order.id, total: updated.total }
})
}
10.4 分区表
当单张表数据量超过数千万行时,分区表可以显著提升查询性能和数据管理效率。
SQL-- RANGE 分区:按时间范围分区(最常用)
CREATE TABLE events_log (
id BIGINT GENERATED ALWAYS AS IDENTITY,
event_type TEXT NOT NULL,
user_id BIGINT,
payload JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);
-- 手动创建分区
CREATE TABLE events_log_2024_q1 PARTITION OF events_log
FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');
CREATE TABLE events_log_2024_q2 PARTITION OF events_log
FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');
-- 默认分区(接收不匹配任何分区的数据)
CREATE TABLE events_log_default PARTITION OF events_log DEFAULT;
-- 为分区表创建索引(自动应用到所有分区)
CREATE INDEX ON events_log (user_id, created_at);
CREATE INDEX ON events_log (created_at DESC);
-- 查询时自动分区裁剪(Partition Pruning)
EXPLAIN
SELECT * FROM events_log
WHERE created_at BETWEEN '2024-01-01' AND '2024-03-31';
-- 优化器只扫描 events_log_2024_q1,跳过其他分区
-- 删除旧数据:DROP 分区比 DELETE 快 1000 倍
ALTER TABLE events_log DETACH PARTITION events_log_2024_q1;
DROP TABLE events_log_2024_q1; -- 立即释放空间,无须 VACUUM
-- LIST 分区:按枚举值分区
CREATE TABLE orders_by_status (
id BIGINT,
status TEXT NOT NULL,
total NUMERIC
) PARTITION BY LIST (status);
CREATE TABLE orders_active PARTITION OF orders_by_status
FOR VALUES IN ('pending', 'confirmed', 'shipped');
CREATE TABLE orders_done PARTITION OF orders_by_status
FOR VALUES IN ('delivered', 'cancelled');
-- HASH 分区:均匀分散数据
CREATE TABLE users_sharded (
id BIGINT,
username TEXT
) PARTITION BY HASH (id);
CREATE TABLE users_sharded_0 PARTITION OF users_sharded
FOR VALUES WITH (MODULUS 4, REMAINDER 0);
CREATE TABLE users_sharded_1 PARTITION OF users_sharded
FOR VALUES WITH (MODULUS 4, REMAINDER 1);
-- 以此类推到 REMAINDER 3
10.5 逻辑复制与 CDC
SQL-- 逻辑复制:将 PG 的数据变更流式发布给订阅者
-- 需要 wal_level = logical(比 replica 级别更高)
-- 在源数据库创建发布
CREATE PUBLICATION myapp_pub
FOR TABLE users, orders, products; -- 指定表
-- 或 FOR ALL TABLES; -- 所有表
-- 或 FOR TABLE users WHERE (created_at > '2024-01-01'); -- 带过滤条件
-- 查看发布
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;
-- 在目标数据库创建订阅
CREATE SUBSCRIPTION myapp_sub
CONNECTION 'postgresql://myuser:password@source-host:5432/mydb'
PUBLICATION myapp_pub;
-- 监控复制槽(replication slot)状态
SELECT
slot_name,
plugin,
slot_type,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS wal_retained
FROM pg_replication_slots;
复制槽积压风险如果订阅者停止消费,复制槽会阻止 WAL 清理,可能导致磁盘耗尽。监控 pg_replication_slots.wal_retained,设置告警阈值(如 > 10 GB)。不再使用的复制槽必须手动删除:SELECT pg_drop_replication_slot('slot_name');
10.6 多租户架构
| 方案 | 隔离方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| Schema 隔离 | 每个租户一个 Schema | 数据完全隔离;每租户可独立备份 | Schema 数量多时管理复杂;跨租户查询困难 | 企业级 SaaS,<1000 租户 |
| 行级隔离 | 所有租户共享表,tenant_id 列区分 | 运维简单;跨租户查询容易 | 数据隔离弱;大租户影响小租户 | 消费级 SaaS,租户量大 |
| 独立数据库 | 每个租户一个 Database | 最强隔离;完全独立 | 资源消耗大;连接管理复杂 | 金融/医疗等强合规要求 |
SQL-- 方案一:行级隔离 + 行级安全策略(RLS)
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
-- 创建策略:用户只能看到自己的租户数据
CREATE POLICY orders_tenant_isolation ON orders
USING (tenant_id = current_setting('app.current_tenant_id')::bigint);
-- 应用层在每次请求开始时设置当前租户
SET app.current_tenant_id = '42';
SELECT * FROM orders; -- 只返回 tenant_id=42 的行,自动过滤
-- 超级用户绕过 RLS
ALTER TABLE orders FORCE ROW LEVEL SECURITY; -- 连超级用户也受 RLS 约束
-- 方案二:Schema 隔离
CREATE SCHEMA tenant_42;
SET search_path = tenant_42;
CREATE TABLE orders (/* 与其他租户相同结构 */);
-- 应用层动态切换 schema
SET search_path = tenant_42, public;
10.7 云托管 PostgreSQL 对比
| 服务 | 特点 | 计费模式 | 适用场景 |
|---|---|---|---|
| AWS RDS PostgreSQL | 托管主从复制、自动备份、Multi-AZ 高可用 | 按实例类型+存储计费 | 企业级,需要完整 RDS 生态 |
| AWS Aurora PostgreSQL | 存储计算分离,6副本自动复制,读写吞吐高 | 比 RDS 贵约 20% | 高并发读写,需要极高可用 |
| Supabase | PG + 实时订阅 + Auth + Storage + REST API | 免费层+按使用量 | 全栈 SaaS 快速开发 |
| Neon | 无服务器 PG,存算分离,按请求计费,自动暂停 | 按计算秒数+存储 | 开发环境、低频访问、Serverless |
| PlanetScale | MySQL 兼容(注:2024 年停止了 PG 支持) | — | 不推荐 PG 场景 |
| Fly.io Postgres | 运行在 Fly.io,靠近应用部署,Patroni 托管 | 按实例+存储 | Fly.io 应用的配套 DB |
Neon 的无服务器架构Neon 将 PostgreSQL 的存储层(Pageserver)与计算层(Safekeepers/Compute)分离。Compute 节点在无流量时自动暂停(冷启动约 500ms),存储层持久化在对象存储(S3)。非常适合开发环境和低频访问场景,免费层有 0.5 GB 存储。
10.8 连接配置最佳实践总结
SQL-- 关键 postgresql.conf 参数(小型生产环境参考,8 vCPU / 32 GB RAM)
-- max_connections = 200 # 最大连接数,配合 PgBouncer 使用
-- shared_buffers = 8GB # 25% 的 RAM
-- effective_cache_size = 24GB # 75% 的 RAM(给优化器的提示)
-- maintenance_work_mem = 2GB # VACUUM、CREATE INDEX 用的内存
-- work_mem = 64MB # 每个排序/哈希操作的内存
-- wal_buffers = 64MB
-- checkpoint_completion_target = 0.9
-- max_wal_size = 4GB
-- default_statistics_target = 100 # 提高统计精度
-- random_page_cost = 1.1 # SSD 时设为 1.1(机械硬盘用 4)
-- effective_io_concurrency = 200 # SSD 时设为 200
-- max_parallel_workers_per_gather = 4
-- max_parallel_workers = 8
-- 快速检查配置是否需要调整
SELECT name, setting, unit, short_desc
FROM pg_settings
WHERE name IN (
'max_connections','shared_buffers','effective_cache_size',
'work_mem','maintenance_work_mem','random_page_cost',
'effective_io_concurrency','wal_level'
)
ORDER BY name;
全教程总结PostgreSQL 是功能最完整的开源 RDBMS:丰富的类型系统(JSONB/数组/枚举)、强大的 SQL(窗口函数/递归 CTE)、完善的事务(MVCC/4级隔离)、多样化的索引(B-Tree/GIN/GiST/BRIN),以及成熟的高可用方案(流复制/Patroni/PgBouncer)。掌握这些特性,PostgreSQL 可以承担从小型 SaaS 到大型互联网平台的核心数据库角色,是后端工程师最值得深入学习的数据库。