Chapter 10

数据工程实战

dbt + DuckDB 本地开发、MotherDuck 云版、本地数据湖、完整 ELT 流水线

10.1 dbt + DuckDB:零成本本地开发

dbt(data build tool)是现代数据工程的核心工具,用于在数据仓库中进行数据转换(T in ELT)。dbt + DuckDB 的组合意味着你可以在本地笔记本上开发和测试所有数据模型,不花一分钱云数仓费用,准备好后再切换到生产的 Snowflake/BigQuery。

安装配置

BASH# 安装 dbt-duckdb
pip install dbt-duckdb

# 初始化 dbt 项目
dbt init my_analytics
cd my_analytics

配置 profiles.yml

YAML# ~/.dbt/profiles.yml
my_analytics:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: './dev.duckdb'           # 本地 DuckDB 文件
      threads: 4

    prod:
      type: duckdb
      path: 'md:my_database'          # MotherDuck 云版
      token: '{{ env_var("MOTHERDUCK_TOKEN") }}'
      threads: 8

dbt 模型示例

SQL-- models/staging/stg_orders.sql
-- 从原始文件读取并标准化
{{ config(materialized='view') }}

SELECT
    order_id,
    user_id,
    amount,
    lower(status) AS status,
    date_trunc('day', created_at) AS order_date
FROM {{ source('raw', 'orders') }}
WHERE amount > 0
SQL-- models/marts/fct_daily_revenue.sql
-- 汇总每日收入
{{ config(materialized='table') }}

SELECT
    order_date,
    COUNT(*) AS order_count,
    SUM(amount) AS daily_revenue,
    AVG(amount) AS avg_order_value
FROM {{ ref('stg_orders') }}
WHERE status = 'completed'
GROUP BY order_date
ORDER BY order_date
BASH# 运行所有模型
dbt run

# 运行指定模型
dbt run --select fct_daily_revenue

# 运行测试
dbt test

# 生成文档
dbt docs generate && dbt docs serve
💡

dbt + DuckDB 的核心价值 本地开发:无需联网,无需信用卡,无需 Docker。开发速度极快(模型运行毫秒级)。准备上生产时,仅修改 profiles.yml 中的 target,指向 MotherDuck 或 Snowflake 即可。代码零修改。

10.2 MotherDuck:DuckDB 的云版本

MotherDuck 是 DuckDB 的官方云托管服务,将 DuckDB 的能力扩展到云端,支持:

PYTHONimport duckdb

# 连接到 MotherDuck
con = duckdb.connect('md:?motherduck_token=your_token')

# 或通过环境变量
# export MOTHERDUCK_TOKEN=your_token
con = duckdb.connect('md:')

# 创建云数据库
con.execute("CREATE DATABASE my_cloud_db")

# 混合查询:本地文件 + 云数据库
result = con.execute("""
    SELECT c.customer_name, o.order_total
    FROM 'local_customers.parquet' c   -- 本地文件
    JOIN my_cloud_db.orders o ON c.id = o.customer_id  -- 云数据
""").df()

10.3 Polars vs DuckDB:如何选择

Polars 和 DuckDB 都是 Python 数据处理的现代化工具,经常被拿来对比。它们各有专长:

维度DuckDBPolars
接口风格SQL 优先Python API 优先(链式操作)
数据格式多格式(CSV/Parquet/JSON/S3)多格式,对 Arrow 原生支持更好
JOIN 性能优秀(查询优化器更成熟)
GroupBy 性能优秀非常好
惰性求值有(Relation API)核心特性(LazyFrame)
超内存处理自动溢写磁盘有限支持
SQL 支持完整 SQL 支持有限 SQL(sql() 方法)
学习曲线会 SQL 即可上手需要学习 Polars API
PYTHON# 实践中两者可以混用!
import polars as pl
import duckdb

# Polars 读取和初步过滤
df = (
    pl.scan_parquet('large_dataset.parquet')
    .filter(pl.col('status') == 'active')
    .select(['user_id', 'amount', 'date'])
    .collect()
)

# DuckDB 做复杂 SQL 分析(直接引用 Polars DataFrame!)
result = duckdb.sql("""
    SELECT
        user_id,
        SUM(amount) as total,
        ROW_NUMBER() OVER (ORDER BY SUM(amount) DESC) as rank
    FROM df
    GROUP BY user_id
    QUALIFY rank <= 100
""").pl()  # 返回 Polars DataFrame

10.4 构建本地数据湖

用 DuckDB + Parquet + Delta Lake 构建不花一分云费用的本地数据湖:

PYTHONimport duckdb
from pathlib import Path

# 数据湖目录结构
# data_lake/
#   raw/          # 原始数据(不可修改)
#   staging/      # 清洗后的数据
#   marts/        # 聚合的数据集市
#   exports/      # 导出报表

Path("data_lake/raw").mkdir(parents=True, exist_ok=True)
Path("data_lake/staging").mkdir(parents=True, exist_ok=True)
Path("data_lake/marts").mkdir(parents=True, exist_ok=True)

con = duckdb.connect("analytics.duckdb")

# Stage 1: 原始数据 → Parquet(CSV 转换为高效格式)
con.execute("""
    COPY (SELECT * FROM 'data_lake/raw/**/*.csv')
    TO 'data_lake/staging/orders/'
    (FORMAT PARQUET, PARTITION_BY (year, month), COMPRESSION ZSTD)
""")

# Stage 2: 清洗和转换
con.execute("""
    COPY (
        SELECT
            order_id,
            user_id,
            TRIM(LOWER(product_name)) AS product,
            CASE WHEN amount < 0 THEN 0 ELSE amount END AS amount,
            TRY_CAST(order_date AS DATE) AS order_date
        FROM 'data_lake/staging/orders/**/*.parquet'
        WHERE order_id IS NOT NULL
    )
    TO 'data_lake/marts/clean_orders.parquet'
    (FORMAT PARQUET, COMPRESSION ZSTD)
""")

# Stage 3: 聚合生成数据集市
con.execute("""
    COPY (
        SELECT
            DATE_TRUNC('month', order_date) AS month,
            product,
            COUNT(*) AS orders,
            SUM(amount) AS revenue,
            AVG(amount) AS avg_order_value
        FROM 'data_lake/marts/clean_orders.parquet'
        GROUP BY ALL
        ORDER BY month, revenue DESC
    )
    TO 'data_lake/marts/monthly_product_revenue.parquet'
    (FORMAT PARQUET, COMPRESSION ZSTD)
""")

10.5 完整 ELT 流水线:API → Parquet → DuckDB → 报表

PYTHONimport duckdb
import requests
import json
from datetime import date, timedelta
from pathlib import Path

## ── EXTRACT:从 API 拉取数据 ──────────────────────────
def extract_orders(start_date: str, end_date: str) -> list:
    """从业务 API 拉取订单数据"""
    all_orders = []
    page = 1
    while True:
        resp = requests.get(
            "https://api.example.com/orders",
            params={"start": start_date, "end": end_date, "page": page, "size": 1000},
            headers={"Authorization": "Bearer TOKEN"}
        )
        data = resp.json()
        all_orders.extend(data["items"])
        if not data["has_next"]: break
        page += 1
    return all_orders

## ── LOAD:保存为 Parquet ──────────────────────────────
def load_to_parquet(data: list, output_path: str):
    """将数据保存为 Parquet 格式(via DuckDB)"""
    con = duckdb.connect()
    # 通过 JSON 字符串中间格式
    json_str = json.dumps(data)
    con.execute(f"""
        COPY (SELECT * FROM read_json_auto('{json_str}'))
        TO '{output_path}'
        (FORMAT PARQUET, COMPRESSION ZSTD)
    """)

## ── TRANSFORM:DuckDB 数据转换 ───────────────────────
def transform():
    con = duckdb.connect("analytics.duckdb")

    # 清洗并物化为数据库表
    con.execute("""
        CREATE OR REPLACE TABLE orders_clean AS
        SELECT
            order_id::BIGINT             AS order_id,
            user_id::INTEGER             AS user_id,
            TRIM(product_name)           AS product,
            amount::DECIMAL(12,2)        AS amount,
            TRY_CAST(created_at AS TIMESTAMP) AS created_at,
            CURRENT_TIMESTAMP            AS etl_loaded_at
        FROM read_parquet('data/raw/**/*.parquet')
        WHERE order_id IS NOT NULL AND amount > 0
    """)

    # 生成报表
    report = con.execute("""
        SELECT
            DATE_TRUNC('day', created_at) AS date,
            COUNT(*)                       AS orders,
            SUM(amount)                    AS revenue,
            COUNT(DISTINCT user_id)        AS unique_customers
        FROM orders_clean
        WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
        GROUP BY ALL
        ORDER BY date DESC
    """).df()

    return report

## ── 主流程 ────────────────────────────────────────────
if __name__ == "__main__":
    today = date.today()
    yesterday = today - timedelta(days=1)

    print("[1/3] 拉取数据...")
    orders = extract_orders(str(yesterday), str(today))

    print(f"[2/3] 保存 {len(orders)} 条记录到 Parquet...")
    Path("data/raw").mkdir(parents=True, exist_ok=True)
    load_to_parquet(orders, f"data/raw/{yesterday}.parquet")

    print("[3/3] 转换并生成报表...")
    report = transform()
    report.to_csv("exports/daily_report.csv", index=False)

    print("ELT 完成!")
    print(report.head())

10.6 最终架构总结

🎯

场景1:数据探索

Jupyter + duckdb.sql() + pandas/polars。零配置,毫秒级查询,最适合 ad-hoc 分析。

🔧

场景2:ETL 脚本

Python + DuckDB + Parquet。替代 pandas 复杂转换脚本,处理 GB 级数据游刃有余。

🏗️

场景3:本地数据仓库

dbt + DuckDB 文件。结构化 SQL 转换,版本控制,数据质量测试,与生产无缝切换。

☁️

场景4:云数据分析

DuckDB + httpfs + S3。直查 S3 文件,无需 ETL 落地,按需分析,成本极低。

🌐

场景5:协作与共享

MotherDuck 云版。基于 DuckDB 的云数仓,团队协作,从本地 dev 无缝切换生产。

🗄️

场景6:本地数据湖

DuckDB + Parquet/Delta Lake。分区 Parquet 文件 + DuckDB 查询引擎,自建本地数据湖。

🦆

恭喜你完成 DuckDB 完整教程! 你现在掌握了:直接查询 CSV/Parquet/JSON 文件、强大的分析 SQL 扩展、Python 零拷贝集成、Parquet 列式存储原理、扩展系统(httpfs/spatial/fts/excel)、性能调优技巧,以及 dbt + DuckDB 数据工程实战。DuckDB 是数据工程师工具箱中不可或缺的利器——像 SQLite 一样简单,像数仓一样强大。