Chapter 09

与数据仓库集成

BigQuery / Snowflake / Redshift / DuckDB 各仓库的最佳配置与专属优化技巧

9.1 BigQuery

profiles.yml 配置

YAML">my_project:
  target: dev
  outputs:
    dev:
      type: bigquery
      method: oauth               # 开发:使用 gcloud auth
      project: my-gcp-project-id
      dataset: dbt_dev            # 开发 schema
      threads: 4
      timeout_seconds: 300
      location: US                # 数据集位置
    prod:
      type: bigquery
      method: service-account
      project: my-gcp-project-id
      dataset: analytics_prod
      keyfile: /secrets/bq-sa.json
      threads: 8

BigQuery 专属:分区表 + 聚簇表

SQL">{{ config(
    materialized = 'table',
    partition_by = {
        "field": "order_date",
        "data_type": "date",
        "granularity": "day"           -- month / year 也支持
    },
    cluster_by = ["customer_id", "status"],
    -- 限制扫描分区数,避免全表扫描
    partition_expiration_days = 365
) }}

SELECT
    order_id,
    customer_id,
    DATE(order_at) AS order_date,
    status,
    amount_dollars
FROM {{ ref('stg_orders') }}
💰

BigQuery 成本优化关键 BigQuery 按扫描数据量计费($5/TB)。分区表配合 WHERE order_date BETWEEN ... 过滤,可以将查询成本降低 90%+。聚簇表在分区内按指定列排序,进一步减少扫描量。增量模型使用 insert_overwrite 策略按分区替换,比 MERGE 更高效也更便宜。

9.2 Snowflake

profiles.yml 配置

YAML">my_project:
  target: dev
  outputs:
    dev:
      type: snowflake
      account: "xy12345.us-east-1"
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
      role: TRANSFORMER
      database: ANALYTICS
      warehouse: TRANSFORMING_XS     # 开发用小仓库
      schema: DBT_DEV
      threads: 4
      client_session_keep_alive: False
    prod:
      type: snowflake
      account: "xy12345.us-east-1"
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      private_key_path: /secrets/rsa_key.p8  # 建议用密钥对认证
      role: TRANSFORMER
      database: ANALYTICS
      warehouse: TRANSFORMING_M
      schema: MARTS
      threads: 16

Snowflake 专属:动态 Warehouse 大小

SQL">{{ config(
    materialized = 'incremental',
    unique_key   = 'event_id',
    snowflake_warehouse = 'TRANSFORMING_L'   -- 大模型用大仓库
) }}

SELECT * FROM {{ source('segment', 'tracks') }}
{% if is_incremental() %}
WHERE received_at > (SELECT MAX(received_at) FROM {{ this }})
{% endif %}

9.3 Redshift

YAML">my_project:
  target: dev
  outputs:
    dev:
      type: redshift
      host: my-cluster.xxx.us-east-1.redshift.amazonaws.com
      user: "{{ env_var('REDSHIFT_USER') }}"
      password: "{{ env_var('REDSHIFT_PASSWORD') }}"
      port: 5439
      dbname: analytics
      schema: dbt_dev
      threads: 4
      ra3_node: true    # RA3 节点(托管存储)

Redshift 专属:distkey / sortkey

SQL">{{ config(
    materialized = 'table',
    dist         = 'customer_id',          -- 分布键(按此列哈希分片)
    sort         = ['order_date', 'status'] -- 排序键(加速范围查询)
) }}

SELECT
    order_id,
    customer_id,
    order_date,
    status,
    amount_dollars
FROM {{ ref('stg_orders') }}
📌

distkey 选择原则 选择 JOIN 中最频繁使用的列作为 distkey(如 customer_id、order_id),让同一个 JOIN key 的数据分布在同一节点,减少数据跨节点传输(shuffling)。高基数列(唯一值多)适合做 distkey。

9.4 DuckDB——本地开发利器

YAML">my_project:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: ./analytics.duckdb   # 本地文件路径
      threads: 4
      extensions:                 # DuckDB 扩展
        - httpfs                  # 直接查询 S3/GCS 文件
        - json
        - parquet
SQL">-- DuckDB 可以直接查询本地 Parquet/CSV 文件
{{ config(materialized='view') }}

SELECT *
FROM read_parquet('./data/raw_orders_*.parquet')
WHERE order_date >= '2024-01-01'
🦆

DuckDB 作为本地开发环境 DuckDB 支持直接查询 S3、GCS 上的 Parquet/CSV 文件,可以将真实的云端数据采样到本地进行开发测试。相比对 BigQuery/Snowflake 频繁发起测试查询,本地 DuckDB 开发成本为零且响应极快。

9.5 dbt Cloud

dbt Cloud 是 dbt Labs 提供的托管服务,提供 Web IDE、调度、文档托管等功能,适合团队协作:

功能说明
Cloud IDE浏览器中直接编写、调试 dbt 代码,无需本地环境
任务调度内置 cron 调度器,配置运行频率和通知
Slim CIPR 时只运行变更的模型(通过 state: 对比)
文档托管自动生成并托管 dbt docs 网站
元数据 API查询运行历史、模型状态、测试结果等元数据
环境管理开发/测试/生产环境独立,安全隔离

9.6 实战:BigQuery 上的完整电商数据模型

SQL">-- models/marts/core/fct_orders.sql
{{ config(
    materialized = 'incremental',
    unique_key   = 'order_id',
    incremental_strategy = 'merge',
    partition_by = {"field": "order_date", "data_type": "date"},
    cluster_by   = ["customer_id", "status"]
) }}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
),
order_items AS (
    SELECT
        order_id,
        COUNT(*)         AS item_count,
        SUM(quantity)    AS total_quantity
    FROM {{ ref('stg_order_items') }}
    GROUP BY order_id
),
payments AS (
    SELECT
        order_id,
        payment_method,
        payment_status
    FROM {{ ref('stg_payments') }}
)

SELECT
    o.order_id,
    o.customer_id,
    o.order_date,
    o.status,
    o.amount_dollars,
    c.email        AS customer_email,
    c.customer_tier,
    oi.item_count,
    oi.total_quantity,
    p.payment_method,
    p.payment_status
FROM orders o
LEFT JOIN customers  c  USING (customer_id)
LEFT JOIN order_items oi USING (order_id)
LEFT JOIN payments    p  USING (order_id)

{% if is_incremental() %}
WHERE o.updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
📌

本章小结
各数据仓库在 profiles.yml 中配置连接,通过适配器处理 SQL 方言差异。BigQuery 重点:分区表(按日期)+ 聚簇表(按高频过滤列),结合增量模型大幅降低计费扫描量。Snowflake 支持动态 warehouse 大小调整。Redshift 需要合理设置 distkey/sortkey 优化 JOIN 性能。

DuckDB 是零成本的本地开发环境,可直接读取 Parquet 文件,非常适合开发阶段。dbt Cloud 提供团队协作所需的 IDE、调度、CI 和文档托管功能。