3.1 为什么需要 Sources
在第2章我们用 source('ecommerce', 'orders') 引用了原始表。但 dbt 怎么知道这个表在哪里?这需要在 schema.yml 中声明 Source。
Source 声明带来以下好处:
- 血缘图从数据源头延伸到最终 mart,完整追踪数据流向
- 可以对原始表定义测试(不重复就是 unique、不为空就是 not_null)
- 可以检查数据新鲜度(source freshness),发现上游管道延迟
- 统一管理所有外部数据依赖,团队成员可以清楚了解原始数据的位置
3.2 在 schema.yml 中定义 Sources
YAML# models/staging/schema.yml
version: 2
sources:
- name: ecommerce # source 的引用名称(在 source() 函数中使用)
description: "电商系统的原始数据,由 Fivetran 从 PostgreSQL 同步"
database: raw_db # 数据库名(可选,默认用 profiles.yml 中的配置)
schema: ecommerce_raw # Schema 名称(实际数据库中的 schema)
tables:
- name: orders
description: "订单原始数据表,每行代表一笔订单"
loaded_at_field: _loaded_at # 用于 freshness 检查的时间戳字段
freshness:
warn_after: {count: 12, period: hour} # 12小时未更新警告
error_after: {count: 24, period: hour} # 24小时未更新报错
columns:
- name: order_id
description: "订单唯一 ID"
tests:
- unique
- not_null
- name: customer_id
description: "下单用户 ID,关联 customers 表"
tests:
- not_null
- name: status
description: "订单状态"
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
- name: customers
description: "用户基础信息表"
loaded_at_field: _loaded_at
freshness:
warn_after: {count: 24, period: hour}
columns:
- name: customer_id
tests:
- unique
- not_null
- name: email
tests:
- unique
- not_null
- name: products
description: "商品信息表"
loaded_at_field: updated_at
columns:
- name: product_id
tests:
- unique
- not_null
3.3 source() 函数引用
在模型 SQL 中使用 {{ source('source_name', 'table_name') }} 引用已声明的数据源:
SQL-- models/staging/stg_orders.sql
{{ config(materialized='view') }}
SELECT
order_id,
customer_id,
order_date::date AS order_date,
UPPER(status) AS status,
amount / 100.0 AS amount_dollars,
_loaded_at
FROM {{ source('ecommerce', 'orders') }}
-- dbt 会将此编译为: raw_db.ecommerce_raw.orders
3.4 数据新鲜度检查:dbt source freshness
Source freshness(数据新鲜度)用来检查原始数据是否按预期频率更新。如果上游数据管道(Fivetran/Airbyte/自定义 ETL)出现故障,dbt 能提前发出警告。
BASH# 检查所有 sources 的数据新鲜度
dbt source freshness
# 只检查特定 source
dbt source freshness --select source:ecommerce
# 检查特定表
dbt source freshness --select source:ecommerce.orders
执行后输出类似:
OUTPUTFound 3 sources to check
ecommerce.orders PASS [last loaded 2h 15m ago, warn_after=12h]
ecommerce.customers PASS [last loaded 6h 30m ago, warn_after=24h]
ecommerce.products WARN [last loaded 13h 20m ago, warn_after=12h]
freshness 检查的前提
Source 表必须有 loaded_at_field 字段(时间戳),且已在 schema.yml 中配置 freshness 阈值。freshness 检查只看最新记录的时间戳,不计算实际数据更新量。
3.5 多数据源管理实战
真实项目通常有来自多个系统的数据。以电商平台为例,数据可能来自:订单系统(PostgreSQL)、用户行为(Segment/Amplitude)、广告投放(Google Ads API):
YAML# models/staging/schema.yml — 多源配置
version: 2
sources:
# 来源 1:电商核心业务系统
- name: ecommerce
schema: raw_ecommerce
description: "电商核心业务数据(Fivetran from PostgreSQL)"
tables:
- name: orders
loaded_at_field: _fivetran_synced
freshness: {warn_after: {count: 6, period: hour}}
- name: customers
loaded_at_field: _fivetran_synced
freshness: {warn_after: {count: 6, period: hour}}
- name: order_items
loaded_at_field: _fivetran_synced
# 来源 2:用户行为事件(Segment)
- name: segment
schema: raw_segment
description: "用户行为事件数据(Segment Source)"
tables:
- name: tracks
loaded_at_field: received_at
freshness: {warn_after: {count: 1, period: hour}}
columns:
- name: id
tests: [unique, not_null]
- name: pages
loaded_at_field: received_at
freshness: {warn_after: {count: 1, period: hour}}
# 来源 3:广告投放(Google Ads,每日同步)
- name: google_ads
schema: raw_google_ads
description: "Google Ads 广告数据(每日同步)"
tables:
- name: campaigns
loaded_at_field: _loaded_at
freshness: {warn_after: {count: 25, period: hour}}
- name: ad_performance
loaded_at_field: _loaded_at
freshness: {warn_after: {count: 25, period: hour}}
3.6 在 CI/CD 中集成 freshness 检查
生产环境中,建议在 dbt run 之前先检查数据新鲜度,避免在陈旧数据上运行转换:
BASH# 推荐的生产运行顺序
dbt source freshness # 1. 检查数据新鲜度
dbt run # 2. 运行所有模型
dbt test # 3. 运行所有测试
dbt docs generate # 4. 更新文档
本章小结
Source 是 dbt 对原始数据表的声明,在 schema.yml 的 sources: 块中定义。声明包括:source 名称、database/schema 路径、每张表的描述、列描述及测试。
{{ source('name', 'table') }} 在模型中引用已声明的原始数据表。dbt source freshness 根据 loaded_at_field 检查数据是否按预期更新,对监控上游数据管道健康状况至关重要。