7.1 为什么需要增量模型
对于小型数据集,materialized='table' 每次全量重建没有问题。但当表有数十亿行时,每次运行重建整张表不仅耗时,还会消耗大量计算成本(在 BigQuery/Snowflake 上直接影响账单)。
增量模型(Incremental Model)解决这一问题:第一次运行时全量构建,后续运行时只处理"新的"或"变更的"数据,并将其追加或合并到已存在的表中。
增量模型的适用场景
事件日志表(页面浏览、点击、API 调用)、订单流水表、传感器数据——这类数据只增不改或历史记录不会变更的场景最适合增量模型。如果历史数据会被更新(如订单状态变更),需要配合 unique_key 做 MERGE。
7.2 基础增量模型
SQL-- models/marts/fct_events.sql
{{ config(
materialized = 'incremental',
unique_key = 'event_id' -- 可选:有此配置则做 MERGE,否则只 APPEND
) }}
SELECT
event_id,
user_id,
event_type,
event_at,
properties
FROM {{ source('segment', 'tracks') }}
{% if is_incremental() %}
-- 只处理上次运行后的新数据
WHERE event_at > (SELECT MAX(event_at) FROM {{ this }})
{% endif %}
理解 is_incremental() 和 this
is_incremental() 在非首次运行且不是 --full-refresh 时返回 true。{{ this }} 指向当前模型在数据仓库中的完整表名(如 analytics.dbt_prod.fct_events),用于引用已存在的表。
7.3 增量策略(Incremental Strategy)
不同数据仓库支持不同的增量策略,每种策略有不同的语义:
| 策略 | 语义 | 支持的仓库 | 适用场景 |
|---|---|---|---|
append | 只追加新行,不更新旧行 | 所有 | 不可变事件日志 |
merge | 有 unique_key 则 UPDATE,否则 INSERT | BigQuery, Snowflake, Redshift | 有状态变更的数据 |
delete+insert | 先删除匹配分区,再插入 | BigQuery, Redshift, Databricks | 按分区键批量替换 |
insert_overwrite | 覆盖指定分区 | BigQuery, Spark/Databricks | 分区表全量替换 |
SQL-- 使用 merge 策略(Snowflake)
{{ config(
materialized = 'incremental',
unique_key = 'order_id',
incremental_strategy = 'merge',
merge_update_columns = ['status', 'updated_at', 'amount_dollars']
) }}
SELECT
order_id,
customer_id,
status,
amount_dollars,
updated_at
FROM {{ source('ecommerce', 'orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
7.4 unique_key — 多列组合键
当主键是多列的组合时,unique_key 可以是列表:
SQL{{ config(
materialized = 'incremental',
unique_key = ['date_day', 'campaign_id'], -- 复合主键
incremental_strategy = 'merge'
) }}
SELECT
date_day,
campaign_id,
impressions,
clicks,
spend
FROM {{ source('google_ads', 'ad_performance') }}
{% if is_incremental() %}
WHERE date_day > (SELECT MAX(date_day) FROM {{ this }})
-- Google Ads 数据可能回填,多取几天保证数据完整
OR date_day >= CURRENT_DATE - INTERVAL '3 days'
{% endif %}
7.5 增量过滤最佳实践
基于时间戳过滤
SQL{% if is_incremental() %}
-- 方式 1:基于已有数据的最大时间戳
WHERE _loaded_at > (SELECT COALESCE(MAX(_loaded_at), '1900-01-01') FROM {{ this }})
-- 方式 2:固定回溯窗口(处理乱序、延迟数据)
WHERE _loaded_at >= DATEADD(day, -3, CURRENT_DATE)
-- 方式 3:使用 var 控制回溯天数(推荐,可调整)
WHERE _loaded_at >= DATEADD(day, -{{ var('lookback_days', 3) }}, CURRENT_DATE)
{% endif %}
注意延迟数据(Late Arriving Data) 上游数据管道可能因为网络延迟、重试等原因导致数据晚于预期到达。严格基于 MAX 时间戳会漏掉这些迟到数据。建议使用固定回溯窗口(如 -3 天),配合 unique_key MERGE 去重,是更健壮的方案。
7.6 全量刷新:--full-refresh
BASH# 增量模型的正常运行(只处理新数据)
dbt run --select fct_events
# 全量刷新:删除表并从头重建(等同于 table 物化)
dbt run --select fct_events --full-refresh
# 全量刷新所有增量模型
dbt run --full-refresh
什么时候需要全量刷新:
- 模型逻辑(SELECT 语句)发生了变更,历史数据需要重新计算
- Schema 变更(新增列、修改列类型)
- 发现历史数据有问题,需要从源头重跑
- 首次部署新增量模型
7.7 BigQuery 增量模型:分区表
SQL-- BigQuery 专属:分区 + 聚簇增量模型
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = {
"field": "event_date",
"data_type": "date",
"granularity": "day"
},
cluster_by = ["user_id", "event_type"]
) }}
SELECT
DATE(event_at) AS event_date,
event_id,
user_id,
event_type,
event_at
FROM {{ source('segment', 'tracks') }}
{% if is_incremental() %}
WHERE DATE(event_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)
{% endif %}
本章小结
增量模型(materialized='incremental')在首次运行后只处理新数据,大幅降低大表的运行成本。核心机制:is_incremental() 判断是否增量运行,{{ this }} 引用已存在的目标表。
增量策略:append(只插入)、merge(INSERT+UPDATE)、delete+insert(分区替换)。unique_key 配合 merge 策略实现 upsert 语义。延迟数据处理推荐使用固定回溯窗口(-3 天)而非严格 MAX 时间戳。--full-refresh 强制全量重建。