10.1 dbt + DuckDB:零成本本地开发
dbt(data build tool)是现代数据工程的核心工具,用于在数据仓库中进行数据转换(T in ELT)。dbt + DuckDB 的组合意味着你可以在本地笔记本上开发和测试所有数据模型,不花一分钱云数仓费用,准备好后再切换到生产的 Snowflake/BigQuery。
安装配置
BASH# 安装 dbt-duckdb
pip install dbt-duckdb
# 初始化 dbt 项目
dbt init my_analytics
cd my_analytics
配置 profiles.yml
YAML# ~/.dbt/profiles.yml
my_analytics:
target: dev
outputs:
dev:
type: duckdb
path: './dev.duckdb' # 本地 DuckDB 文件
threads: 4
prod:
type: duckdb
path: 'md:my_database' # MotherDuck 云版
token: '{{ env_var("MOTHERDUCK_TOKEN") }}'
threads: 8
dbt 模型示例
SQL-- models/staging/stg_orders.sql
-- 从原始文件读取并标准化
{{ config(materialized='view') }}
SELECT
order_id,
user_id,
amount,
lower(status) AS status,
date_trunc('day', created_at) AS order_date
FROM {{ source('raw', 'orders') }}
WHERE amount > 0
SQL-- models/marts/fct_daily_revenue.sql
-- 汇总每日收入
{{ config(materialized='table') }}
SELECT
order_date,
COUNT(*) AS order_count,
SUM(amount) AS daily_revenue,
AVG(amount) AS avg_order_value
FROM {{ ref('stg_orders') }}
WHERE status = 'completed'
GROUP BY order_date
ORDER BY order_date
BASH# 运行所有模型
dbt run
# 运行指定模型
dbt run --select fct_daily_revenue
# 运行测试
dbt test
# 生成文档
dbt docs generate && dbt docs serve
dbt + DuckDB 的核心价值 本地开发:无需联网,无需信用卡,无需 Docker。开发速度极快(模型运行毫秒级)。准备上生产时,仅修改 profiles.yml 中的 target,指向 MotherDuck 或 Snowflake 即可。代码零修改。
10.2 MotherDuck:DuckDB 的云版本
MotherDuck 是 DuckDB 的官方云托管服务,将 DuckDB 的能力扩展到云端,支持:
- 无限弹性存储(不受本地磁盘限制)
- 多用户协作(共享数据库)
- 混合执行(部分在本地,部分在云端)
- 与本地 DuckDB 无缝切换
PYTHONimport duckdb
# 连接到 MotherDuck
con = duckdb.connect('md:?motherduck_token=your_token')
# 或通过环境变量
# export MOTHERDUCK_TOKEN=your_token
con = duckdb.connect('md:')
# 创建云数据库
con.execute("CREATE DATABASE my_cloud_db")
# 混合查询:本地文件 + 云数据库
result = con.execute("""
SELECT c.customer_name, o.order_total
FROM 'local_customers.parquet' c -- 本地文件
JOIN my_cloud_db.orders o ON c.id = o.customer_id -- 云数据
""").df()
10.3 Polars vs DuckDB:如何选择
Polars 和 DuckDB 都是 Python 数据处理的现代化工具,经常被拿来对比。它们各有专长:
| 维度 | DuckDB | Polars |
|---|---|---|
| 接口风格 | SQL 优先 | Python API 优先(链式操作) |
| 数据格式 | 多格式(CSV/Parquet/JSON/S3) | 多格式,对 Arrow 原生支持更好 |
| JOIN 性能 | 优秀(查询优化器更成熟) | 好 |
| GroupBy 性能 | 优秀 | 非常好 |
| 惰性求值 | 有(Relation API) | 核心特性(LazyFrame) |
| 超内存处理 | 自动溢写磁盘 | 有限支持 |
| SQL 支持 | 完整 SQL 支持 | 有限 SQL(sql() 方法) |
| 学习曲线 | 会 SQL 即可上手 | 需要学习 Polars API |
PYTHON# 实践中两者可以混用!
import polars as pl
import duckdb
# Polars 读取和初步过滤
df = (
pl.scan_parquet('large_dataset.parquet')
.filter(pl.col('status') == 'active')
.select(['user_id', 'amount', 'date'])
.collect()
)
# DuckDB 做复杂 SQL 分析(直接引用 Polars DataFrame!)
result = duckdb.sql("""
SELECT
user_id,
SUM(amount) as total,
ROW_NUMBER() OVER (ORDER BY SUM(amount) DESC) as rank
FROM df
GROUP BY user_id
QUALIFY rank <= 100
""").pl() # 返回 Polars DataFrame
10.4 构建本地数据湖
用 DuckDB + Parquet + Delta Lake 构建不花一分云费用的本地数据湖:
PYTHONimport duckdb
from pathlib import Path
# 数据湖目录结构
# data_lake/
# raw/ # 原始数据(不可修改)
# staging/ # 清洗后的数据
# marts/ # 聚合的数据集市
# exports/ # 导出报表
Path("data_lake/raw").mkdir(parents=True, exist_ok=True)
Path("data_lake/staging").mkdir(parents=True, exist_ok=True)
Path("data_lake/marts").mkdir(parents=True, exist_ok=True)
con = duckdb.connect("analytics.duckdb")
# Stage 1: 原始数据 → Parquet(CSV 转换为高效格式)
con.execute("""
COPY (SELECT * FROM 'data_lake/raw/**/*.csv')
TO 'data_lake/staging/orders/'
(FORMAT PARQUET, PARTITION_BY (year, month), COMPRESSION ZSTD)
""")
# Stage 2: 清洗和转换
con.execute("""
COPY (
SELECT
order_id,
user_id,
TRIM(LOWER(product_name)) AS product,
CASE WHEN amount < 0 THEN 0 ELSE amount END AS amount,
TRY_CAST(order_date AS DATE) AS order_date
FROM 'data_lake/staging/orders/**/*.parquet'
WHERE order_id IS NOT NULL
)
TO 'data_lake/marts/clean_orders.parquet'
(FORMAT PARQUET, COMPRESSION ZSTD)
""")
# Stage 3: 聚合生成数据集市
con.execute("""
COPY (
SELECT
DATE_TRUNC('month', order_date) AS month,
product,
COUNT(*) AS orders,
SUM(amount) AS revenue,
AVG(amount) AS avg_order_value
FROM 'data_lake/marts/clean_orders.parquet'
GROUP BY ALL
ORDER BY month, revenue DESC
)
TO 'data_lake/marts/monthly_product_revenue.parquet'
(FORMAT PARQUET, COMPRESSION ZSTD)
""")
10.5 完整 ELT 流水线:API → Parquet → DuckDB → 报表
PYTHONimport duckdb
import requests
import json
from datetime import date, timedelta
from pathlib import Path
## ── EXTRACT:从 API 拉取数据 ──────────────────────────
def extract_orders(start_date: str, end_date: str) -> list:
"""从业务 API 拉取订单数据"""
all_orders = []
page = 1
while True:
resp = requests.get(
"https://api.example.com/orders",
params={"start": start_date, "end": end_date, "page": page, "size": 1000},
headers={"Authorization": "Bearer TOKEN"}
)
data = resp.json()
all_orders.extend(data["items"])
if not data["has_next"]: break
page += 1
return all_orders
## ── LOAD:保存为 Parquet ──────────────────────────────
def load_to_parquet(data: list, output_path: str):
"""将数据保存为 Parquet 格式(via DuckDB)"""
con = duckdb.connect()
# 通过 JSON 字符串中间格式
json_str = json.dumps(data)
con.execute(f"""
COPY (SELECT * FROM read_json_auto('{json_str}'))
TO '{output_path}'
(FORMAT PARQUET, COMPRESSION ZSTD)
""")
## ── TRANSFORM:DuckDB 数据转换 ───────────────────────
def transform():
con = duckdb.connect("analytics.duckdb")
# 清洗并物化为数据库表
con.execute("""
CREATE OR REPLACE TABLE orders_clean AS
SELECT
order_id::BIGINT AS order_id,
user_id::INTEGER AS user_id,
TRIM(product_name) AS product,
amount::DECIMAL(12,2) AS amount,
TRY_CAST(created_at AS TIMESTAMP) AS created_at,
CURRENT_TIMESTAMP AS etl_loaded_at
FROM read_parquet('data/raw/**/*.parquet')
WHERE order_id IS NOT NULL AND amount > 0
""")
# 生成报表
report = con.execute("""
SELECT
DATE_TRUNC('day', created_at) AS date,
COUNT(*) AS orders,
SUM(amount) AS revenue,
COUNT(DISTINCT user_id) AS unique_customers
FROM orders_clean
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY ALL
ORDER BY date DESC
""").df()
return report
## ── 主流程 ────────────────────────────────────────────
if __name__ == "__main__":
today = date.today()
yesterday = today - timedelta(days=1)
print("[1/3] 拉取数据...")
orders = extract_orders(str(yesterday), str(today))
print(f"[2/3] 保存 {len(orders)} 条记录到 Parquet...")
Path("data/raw").mkdir(parents=True, exist_ok=True)
load_to_parquet(orders, f"data/raw/{yesterday}.parquet")
print("[3/3] 转换并生成报表...")
report = transform()
report.to_csv("exports/daily_report.csv", index=False)
print("ELT 完成!")
print(report.head())
10.6 最终架构总结
场景1:数据探索
Jupyter + duckdb.sql() + pandas/polars。零配置,毫秒级查询,最适合 ad-hoc 分析。
场景2:ETL 脚本
Python + DuckDB + Parquet。替代 pandas 复杂转换脚本,处理 GB 级数据游刃有余。
场景3:本地数据仓库
dbt + DuckDB 文件。结构化 SQL 转换,版本控制,数据质量测试,与生产无缝切换。
场景4:云数据分析
DuckDB + httpfs + S3。直查 S3 文件,无需 ETL 落地,按需分析,成本极低。
场景5:协作与共享
MotherDuck 云版。基于 DuckDB 的云数仓,团队协作,从本地 dev 无缝切换生产。
场景6:本地数据湖
DuckDB + Parquet/Delta Lake。分区 Parquet 文件 + DuckDB 查询引擎,自建本地数据湖。
恭喜你完成 DuckDB 完整教程! 你现在掌握了:直接查询 CSV/Parquet/JSON 文件、强大的分析 SQL 扩展、Python 零拷贝集成、Parquet 列式存储原理、扩展系统(httpfs/spatial/fts/excel)、性能调优技巧,以及 dbt + DuckDB 数据工程实战。DuckDB 是数据工程师工具箱中不可或缺的利器——像 SQLite 一样简单,像数仓一样强大。