Chapter 10

与应用集成最佳实践

Python / Go / Node.js 连接池配置,分区表设计,逻辑复制 CDC,多租户架构与云托管选型

10.1 Python — asyncpg + SQLAlchemy 2.0

Python# asyncpg:最高性能的原生异步 PostgreSQL 驱动
# pip install asyncpg

import asyncio
import asyncpg
from contextlib import asynccontextmanager

# 连接池配置(生产关键参数)
async def create_pool():
    return await asyncpg.create_pool(
        dsn="postgresql://myuser:password@localhost:5432/mydb",
        min_size=5,          # 最小保持连接数
        max_size=20,         # 最大连接数(根据 max_connections 设置)
        max_inactive_connection_lifetime=300,  # 5 分钟无活动则关闭
        command_timeout=30,  # 单条语句超时 30 秒
        server_settings={
            "application_name": "myapp",
            "search_path": "public",
        }
    )

# 基础查询
async def get_user(pool, user_id: int):
    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT id, username, email, created_at FROM users WHERE id = $1",
            user_id
        )
        return dict(row) if row else None

# 批量插入(高性能)
async def batch_insert_products(pool, products: list[dict]):
    async with pool.acquire() as conn:
        await conn.executemany(
            """
            INSERT INTO products (sku, name, price, stock, category_id)
            VALUES ($1, $2, $3, $4, $5)
            ON CONFLICT (sku) DO UPDATE SET
                price = EXCLUDED.price,
                stock = EXCLUDED.stock,
                updated_at = NOW()
            """,
            [(p["sku"], p["name"], p["price"], p["stock"], p["category_id"])
             for p in products]
        )

# 事务
async def transfer(pool, from_id: int, to_id: int, amount: float):
    async with pool.acquire() as conn:
        async with conn.transaction():
            balance = await conn.fetchval(
                "SELECT balance FROM accounts WHERE id = $1 FOR UPDATE",
                from_id
            )
            if balance < amount:
                raise ValueError(f"余额不足: {balance}")
            await conn.execute(
                "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
                amount, from_id
            )
            await conn.execute(
                "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
                amount, to_id
            )
Python# SQLAlchemy 2.0 异步 ORM(推荐用于大型项目)
# pip install sqlalchemy[asyncio] asyncpg

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, sessionmaker
from sqlalchemy import select, String
from datetime import datetime

engine = create_async_engine(
    "postgresql+asyncpg://myuser:password@localhost:5432/mydb",
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,   # 取连接前检测是否存活
    echo=False            # 生产关闭 SQL 日志
)

async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50), unique=True)
    email: Mapped[str] = mapped_column(unique=True)
    created_at: Mapped[datetime]

async def get_users_by_email_domain(domain: str) -> list[User]:
    async with async_session() as session:
        result = await session.execute(
            select(User).where(User.email.like(f"%@{domain}"))
        )
        return result.scalars().all()

10.2 Go — pgx v5

Go// pgx v5:Go 生态最佳 PostgreSQL 驱动
// go get github.com/jackc/pgx/v5

package db

import (
    "context"
    "fmt"
    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgxpool"
)

// 连接池配置
func NewPool(ctx context.Context, connStr string) (*pgxpool.Pool, error) {
    config, err := pgxpool.ParseConfig(connStr)
    if err != nil {
        return nil, fmt.Errorf("parse config: %w", err)
    }

    config.MinConns = 5
    config.MaxConns = 25
    config.MaxConnIdleTime = 5 * time.Minute
    config.MaxConnLifetime = 1 * time.Hour
    config.HealthCheckPeriod = 30 * time.Second

    return pgxpool.NewWithConfig(ctx, config)
}

// 批量插入:使用 COPY 协议(比 INSERT 快 5-10 倍)
func BulkInsertOrders(ctx context.Context, pool *pgxpool.Pool, orders []Order) error {
    conn, err := pool.Acquire(ctx)
    if err != nil {
        return err
    }
    defer conn.Release()

    _, err = conn.CopyFrom(
        ctx,
        pgx.Identifier{"order_items"},
        []string{"order_id", "product_id", "quantity", "unit_price"},
        pgx.CopyFromSlice(len(orders), func(i int) ([]any, error) {
            o := orders[i]
            return []any{o.OrderID, o.ProductID, o.Quantity, o.UnitPrice}, nil
        }),
    )
    return err
}

// 使用 pgx.Batch 批量执行多条语句
func BatchUpdatePrices(ctx context.Context, pool *pgxpool.Pool, updates []PriceUpdate) error {
    batch := &pgx.Batch{}
    for _, u := range updates {
        batch.Queue(
            "UPDATE products SET price = $1, updated_at = NOW() WHERE sku = $2",
            u.Price, u.SKU,
        )
    }

    conn, err := pool.Acquire(ctx)
    if err != nil {
        return err
    }
    defer conn.Release()

    results := conn.SendBatch(ctx, batch)
    defer results.Close()

    for i := 0; i < len(updates); i++ {
        if _, err := results.Exec(); err != nil {
            return fmt.Errorf("update %d: %w", i, err)
        }
    }
    return nil
}

10.3 Node.js — postgres.js / Prisma

TypeScript// postgres.js:最简洁的 Node.js PostgreSQL 客户端
// npm install postgres

import postgres from 'postgres'

const sql = postgres({
  host: 'localhost',
  port: 5432,
  database: 'mydb',
  username: 'myuser',
  password: 'mypassword',
  max: 20,              // 连接池大小
  idle_timeout: 30,     // 空闲超时(秒)
  connect_timeout: 10,  // 连接超时(秒)
  prepare: true,        // 自动 prepared statements
})

// 类型安全查询
interface User {
  id: bigint
  username: string
  email: string
  created_at: Date
}

async function getUserOrders(userId: number) {
  const orders = await sql>`
    SELECT
      o.id AS order_id,
      o.total,
      o.status,
      COUNT(oi.id) AS item_count
    FROM orders o
    LEFT JOIN order_items oi ON o.id = oi.order_id
    WHERE o.user_id = ${userId}
    GROUP BY o.id
    ORDER BY o.created_at DESC
    LIMIT 10
  `
  return orders
}

// 事务
async function createOrder(userId: number, items: OrderItem[]) {
  return await sql.begin(async (tx) => {
    const [order] = await tx<[{id: bigint}]>`
      INSERT INTO orders (user_id, total) VALUES (${userId}, 0)
      RETURNING id
    `
    await tx`
      INSERT INTO order_items ${tx(items.map(i => ({
        order_id: order.id,
        product_id: i.productId,
        quantity: i.quantity,
        unit_price: i.price
      })))}
    `
    const [updated] = await tx`
      UPDATE orders SET total = (
        SELECT SUM(quantity * unit_price) FROM order_items WHERE order_id = ${order.id}
      )
      WHERE id = ${order.id}
      RETURNING total
    `
    return { orderId: order.id, total: updated.total }
  })
}

10.4 分区表

当单张表数据量超过数千万行时,分区表可以显著提升查询性能和数据管理效率。

SQL-- RANGE 分区:按时间范围分区(最常用)
CREATE TABLE events_log (
  id         BIGINT GENERATED ALWAYS AS IDENTITY,
  event_type TEXT NOT NULL,
  user_id    BIGINT,
  payload    JSONB,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);

-- 手动创建分区
CREATE TABLE events_log_2024_q1 PARTITION OF events_log
  FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');

CREATE TABLE events_log_2024_q2 PARTITION OF events_log
  FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');

-- 默认分区(接收不匹配任何分区的数据)
CREATE TABLE events_log_default PARTITION OF events_log DEFAULT;

-- 为分区表创建索引(自动应用到所有分区)
CREATE INDEX ON events_log (user_id, created_at);
CREATE INDEX ON events_log (created_at DESC);

-- 查询时自动分区裁剪(Partition Pruning)
EXPLAIN
SELECT * FROM events_log
WHERE created_at BETWEEN '2024-01-01' AND '2024-03-31';
-- 优化器只扫描 events_log_2024_q1,跳过其他分区

-- 删除旧数据:DROP 分区比 DELETE 快 1000 倍
ALTER TABLE events_log DETACH PARTITION events_log_2024_q1;
DROP TABLE events_log_2024_q1;   -- 立即释放空间,无须 VACUUM

-- LIST 分区:按枚举值分区
CREATE TABLE orders_by_status (
  id     BIGINT,
  status TEXT NOT NULL,
  total  NUMERIC
) PARTITION BY LIST (status);

CREATE TABLE orders_active PARTITION OF orders_by_status
  FOR VALUES IN ('pending', 'confirmed', 'shipped');
CREATE TABLE orders_done PARTITION OF orders_by_status
  FOR VALUES IN ('delivered', 'cancelled');

-- HASH 分区:均匀分散数据
CREATE TABLE users_sharded (
  id       BIGINT,
  username TEXT
) PARTITION BY HASH (id);

CREATE TABLE users_sharded_0 PARTITION OF users_sharded
  FOR VALUES WITH (MODULUS 4, REMAINDER 0);
CREATE TABLE users_sharded_1 PARTITION OF users_sharded
  FOR VALUES WITH (MODULUS 4, REMAINDER 1);
-- 以此类推到 REMAINDER 3

10.5 逻辑复制与 CDC

SQL-- 逻辑复制:将 PG 的数据变更流式发布给订阅者
-- 需要 wal_level = logical(比 replica 级别更高)

-- 在源数据库创建发布
CREATE PUBLICATION myapp_pub
  FOR TABLE users, orders, products;    -- 指定表
  -- 或 FOR ALL TABLES;                  -- 所有表
  -- 或 FOR TABLE users WHERE (created_at > '2024-01-01');  -- 带过滤条件

-- 查看发布
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;

-- 在目标数据库创建订阅
CREATE SUBSCRIPTION myapp_sub
  CONNECTION 'postgresql://myuser:password@source-host:5432/mydb'
  PUBLICATION myapp_pub;

-- 监控复制槽(replication slot)状态
SELECT
  slot_name,
  plugin,
  slot_type,
  active,
  pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS wal_retained
FROM pg_replication_slots;
⚠️

复制槽积压风险如果订阅者停止消费,复制槽会阻止 WAL 清理,可能导致磁盘耗尽。监控 pg_replication_slots.wal_retained,设置告警阈值(如 > 10 GB)。不再使用的复制槽必须手动删除:SELECT pg_drop_replication_slot('slot_name');

10.6 多租户架构

方案隔离方式优点缺点适用场景
Schema 隔离每个租户一个 Schema数据完全隔离;每租户可独立备份Schema 数量多时管理复杂;跨租户查询困难企业级 SaaS,<1000 租户
行级隔离所有租户共享表,tenant_id 列区分运维简单;跨租户查询容易数据隔离弱;大租户影响小租户消费级 SaaS,租户量大
独立数据库每个租户一个 Database最强隔离;完全独立资源消耗大;连接管理复杂金融/医疗等强合规要求
SQL-- 方案一:行级隔离 + 行级安全策略(RLS)
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;

-- 创建策略:用户只能看到自己的租户数据
CREATE POLICY orders_tenant_isolation ON orders
  USING (tenant_id = current_setting('app.current_tenant_id')::bigint);

-- 应用层在每次请求开始时设置当前租户
SET app.current_tenant_id = '42';
SELECT * FROM orders;  -- 只返回 tenant_id=42 的行,自动过滤

-- 超级用户绕过 RLS
ALTER TABLE orders FORCE ROW LEVEL SECURITY;  -- 连超级用户也受 RLS 约束

-- 方案二:Schema 隔离
CREATE SCHEMA tenant_42;
SET search_path = tenant_42;

CREATE TABLE orders (/* 与其他租户相同结构 */);

-- 应用层动态切换 schema
SET search_path = tenant_42, public;

10.7 云托管 PostgreSQL 对比

服务特点计费模式适用场景
AWS RDS PostgreSQL托管主从复制、自动备份、Multi-AZ 高可用按实例类型+存储计费企业级,需要完整 RDS 生态
AWS Aurora PostgreSQL存储计算分离,6副本自动复制,读写吞吐高比 RDS 贵约 20%高并发读写,需要极高可用
SupabasePG + 实时订阅 + Auth + Storage + REST API免费层+按使用量全栈 SaaS 快速开发
Neon无服务器 PG,存算分离,按请求计费,自动暂停按计算秒数+存储开发环境、低频访问、Serverless
PlanetScaleMySQL 兼容(注:2024 年停止了 PG 支持)不推荐 PG 场景
Fly.io Postgres运行在 Fly.io,靠近应用部署,Patroni 托管按实例+存储Fly.io 应用的配套 DB
💡

Neon 的无服务器架构Neon 将 PostgreSQL 的存储层(Pageserver)与计算层(Safekeepers/Compute)分离。Compute 节点在无流量时自动暂停(冷启动约 500ms),存储层持久化在对象存储(S3)。非常适合开发环境和低频访问场景,免费层有 0.5 GB 存储。

10.8 连接配置最佳实践总结

SQL-- 关键 postgresql.conf 参数(小型生产环境参考,8 vCPU / 32 GB RAM)
-- max_connections = 200              # 最大连接数,配合 PgBouncer 使用
-- shared_buffers = 8GB               # 25% 的 RAM
-- effective_cache_size = 24GB        # 75% 的 RAM(给优化器的提示)
-- maintenance_work_mem = 2GB         # VACUUM、CREATE INDEX 用的内存
-- work_mem = 64MB                    # 每个排序/哈希操作的内存
-- wal_buffers = 64MB
-- checkpoint_completion_target = 0.9
-- max_wal_size = 4GB
-- default_statistics_target = 100    # 提高统计精度
-- random_page_cost = 1.1             # SSD 时设为 1.1(机械硬盘用 4)
-- effective_io_concurrency = 200     # SSD 时设为 200
-- max_parallel_workers_per_gather = 4
-- max_parallel_workers = 8

-- 快速检查配置是否需要调整
SELECT name, setting, unit, short_desc
FROM pg_settings
WHERE name IN (
  'max_connections','shared_buffers','effective_cache_size',
  'work_mem','maintenance_work_mem','random_page_cost',
  'effective_io_concurrency','wal_level'
)
ORDER BY name;
📌

全教程总结PostgreSQL 是功能最完整的开源 RDBMS:丰富的类型系统(JSONB/数组/枚举)、强大的 SQL(窗口函数/递归 CTE)、完善的事务(MVCC/4级隔离)、多样化的索引(B-Tree/GIN/GiST/BRIN),以及成熟的高可用方案(流复制/Patroni/PgBouncer)。掌握这些特性,PostgreSQL 可以承担从小型 SaaS 到大型互联网平台的核心数据库角色,是后端工程师最值得深入学习的数据库。