Chapter 07

增量模型(Incremental Models)

处理大规模数据时,每次全量重建代价高昂——增量模型只处理新增和变更的数据

7.1 为什么需要增量模型

对于小型数据集,materialized='table' 每次全量重建没有问题。但当表有数十亿行时,每次运行重建整张表不仅耗时,还会消耗大量计算成本(在 BigQuery/Snowflake 上直接影响账单)。

增量模型(Incremental Model)解决这一问题:第一次运行时全量构建,后续运行时只处理"新的"或"变更的"数据,并将其追加或合并到已存在的表中。

💡

增量模型的适用场景 事件日志表(页面浏览、点击、API 调用)、订单流水表、传感器数据——这类数据只增不改或历史记录不会变更的场景最适合增量模型。如果历史数据会被更新(如订单状态变更),需要配合 unique_key 做 MERGE。

7.2 基础增量模型

SQL-- models/marts/fct_events.sql
{{ config(
    materialized = 'incremental',
    unique_key   = 'event_id'        -- 可选:有此配置则做 MERGE,否则只 APPEND
) }}

SELECT
    event_id,
    user_id,
    event_type,
    event_at,
    properties
FROM {{ source('segment', 'tracks') }}

{% if is_incremental() %}
-- 只处理上次运行后的新数据
WHERE event_at > (SELECT MAX(event_at) FROM {{ this }})
{% endif %}
📌

理解 is_incremental() 和 this is_incremental() 在非首次运行且不是 --full-refresh 时返回 true。{{ this }} 指向当前模型在数据仓库中的完整表名(如 analytics.dbt_prod.fct_events),用于引用已存在的表。

7.3 增量策略(Incremental Strategy)

不同数据仓库支持不同的增量策略,每种策略有不同的语义:

策略语义支持的仓库适用场景
append只追加新行,不更新旧行所有不可变事件日志
merge有 unique_key 则 UPDATE,否则 INSERTBigQuery, Snowflake, Redshift有状态变更的数据
delete+insert先删除匹配分区,再插入BigQuery, Redshift, Databricks按分区键批量替换
insert_overwrite覆盖指定分区BigQuery, Spark/Databricks分区表全量替换
SQL-- 使用 merge 策略(Snowflake)
{{ config(
    materialized       = 'incremental',
    unique_key         = 'order_id',
    incremental_strategy = 'merge',
    merge_update_columns = ['status', 'updated_at', 'amount_dollars']
) }}

SELECT
    order_id,
    customer_id,
    status,
    amount_dollars,
    updated_at
FROM {{ source('ecommerce', 'orders') }}

{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

7.4 unique_key — 多列组合键

当主键是多列的组合时,unique_key 可以是列表:

SQL{{ config(
    materialized       = 'incremental',
    unique_key         = ['date_day', 'campaign_id'],  -- 复合主键
    incremental_strategy = 'merge'
) }}

SELECT
    date_day,
    campaign_id,
    impressions,
    clicks,
    spend
FROM {{ source('google_ads', 'ad_performance') }}

{% if is_incremental() %}
WHERE date_day > (SELECT MAX(date_day) FROM {{ this }})
   -- Google Ads 数据可能回填,多取几天保证数据完整
   OR date_day >= CURRENT_DATE - INTERVAL '3 days'
{% endif %}

7.5 增量过滤最佳实践

基于时间戳过滤

SQL{% if is_incremental() %}
-- 方式 1:基于已有数据的最大时间戳
WHERE _loaded_at > (SELECT COALESCE(MAX(_loaded_at), '1900-01-01') FROM {{ this }})

-- 方式 2:固定回溯窗口(处理乱序、延迟数据)
WHERE _loaded_at >= DATEADD(day, -3, CURRENT_DATE)

-- 方式 3:使用 var 控制回溯天数(推荐,可调整)
WHERE _loaded_at >= DATEADD(day, -{{ var('lookback_days', 3) }}, CURRENT_DATE)
{% endif %}
⚠️

注意延迟数据(Late Arriving Data) 上游数据管道可能因为网络延迟、重试等原因导致数据晚于预期到达。严格基于 MAX 时间戳会漏掉这些迟到数据。建议使用固定回溯窗口(如 -3 天),配合 unique_key MERGE 去重,是更健壮的方案。

7.6 全量刷新:--full-refresh

BASH# 增量模型的正常运行(只处理新数据)
dbt run --select fct_events

# 全量刷新:删除表并从头重建(等同于 table 物化)
dbt run --select fct_events --full-refresh

# 全量刷新所有增量模型
dbt run --full-refresh

什么时候需要全量刷新:

7.7 BigQuery 增量模型:分区表

SQL-- BigQuery 专属:分区 + 聚簇增量模型
{{ config(
    materialized             = 'incremental',
    incremental_strategy     = 'insert_overwrite',
    partition_by             = {
        "field": "event_date",
        "data_type": "date",
        "granularity": "day"
    },
    cluster_by               = ["user_id", "event_type"]
) }}

SELECT
    DATE(event_at)            AS event_date,
    event_id,
    user_id,
    event_type,
    event_at
FROM {{ source('segment', 'tracks') }}

{% if is_incremental() %}
WHERE DATE(event_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)
{% endif %}
📌

本章小结
增量模型(materialized='incremental')在首次运行后只处理新数据,大幅降低大表的运行成本。核心机制:is_incremental() 判断是否增量运行,{{ this }} 引用已存在的目标表。

增量策略:append(只插入)、merge(INSERT+UPDATE)、delete+insert(分区替换)。unique_key 配合 merge 策略实现 upsert 语义。延迟数据处理推荐使用固定回溯窗口(-3 天)而非严格 MAX 时间戳。--full-refresh 强制全量重建。