Chapter 03

Sources 数据源管理

在 schema.yml 中声明原始数据源,追踪数据新鲜度,让数据血缘从源头开始

3.1 为什么需要 Sources

在第2章我们用 source('ecommerce', 'orders') 引用了原始表。但 dbt 怎么知道这个表在哪里?这需要在 schema.yml 中声明 Source。

Source 声明带来以下好处:

3.2 在 schema.yml 中定义 Sources

YAML# models/staging/schema.yml
version: 2

sources:
  - name: ecommerce                    # source 的引用名称(在 source() 函数中使用)
    description: "电商系统的原始数据,由 Fivetran 从 PostgreSQL 同步"
    database: raw_db                   # 数据库名(可选,默认用 profiles.yml 中的配置)
    schema: ecommerce_raw              # Schema 名称(实际数据库中的 schema)

    tables:
      - name: orders
        description: "订单原始数据表,每行代表一笔订单"
        loaded_at_field: _loaded_at    # 用于 freshness 检查的时间戳字段
        freshness:
          warn_after: {count: 12, period: hour}   # 12小时未更新警告
          error_after: {count: 24, period: hour}  # 24小时未更新报错
        columns:
          - name: order_id
            description: "订单唯一 ID"
            tests:
              - unique
              - not_null
          - name: customer_id
            description: "下单用户 ID,关联 customers 表"
            tests:
              - not_null
          - name: status
            description: "订单状态"
            tests:
              - accepted_values:
                  values: ['pending', 'completed', 'cancelled', 'refunded']

      - name: customers
        description: "用户基础信息表"
        loaded_at_field: _loaded_at
        freshness:
          warn_after: {count: 24, period: hour}
        columns:
          - name: customer_id
            tests:
              - unique
              - not_null
          - name: email
            tests:
              - unique
              - not_null

      - name: products
        description: "商品信息表"
        loaded_at_field: updated_at
        columns:
          - name: product_id
            tests:
              - unique
              - not_null

3.3 source() 函数引用

在模型 SQL 中使用 {{ source('source_name', 'table_name') }} 引用已声明的数据源:

SQL-- models/staging/stg_orders.sql
{{ config(materialized='view') }}

SELECT
    order_id,
    customer_id,
    order_date::date                  AS order_date,
    UPPER(status)                    AS status,
    amount / 100.0                   AS amount_dollars,
    _loaded_at
FROM {{ source('ecommerce', 'orders') }}
-- dbt 会将此编译为: raw_db.ecommerce_raw.orders

3.4 数据新鲜度检查:dbt source freshness

Source freshness(数据新鲜度)用来检查原始数据是否按预期频率更新。如果上游数据管道(Fivetran/Airbyte/自定义 ETL)出现故障,dbt 能提前发出警告。

BASH# 检查所有 sources 的数据新鲜度
dbt source freshness

# 只检查特定 source
dbt source freshness --select source:ecommerce

# 检查特定表
dbt source freshness --select source:ecommerce.orders

执行后输出类似:

OUTPUTFound 3 sources to check

ecommerce.orders          PASS   [last loaded 2h 15m ago, warn_after=12h]
ecommerce.customers       PASS   [last loaded 6h 30m ago, warn_after=24h]
ecommerce.products        WARN   [last loaded 13h 20m ago, warn_after=12h]
⚠️

freshness 检查的前提 Source 表必须有 loaded_at_field 字段(时间戳),且已在 schema.yml 中配置 freshness 阈值。freshness 检查只看最新记录的时间戳,不计算实际数据更新量。

3.5 多数据源管理实战

真实项目通常有来自多个系统的数据。以电商平台为例,数据可能来自:订单系统(PostgreSQL)、用户行为(Segment/Amplitude)、广告投放(Google Ads API):

YAML# models/staging/schema.yml — 多源配置
version: 2

sources:
  # 来源 1:电商核心业务系统
  - name: ecommerce
    schema: raw_ecommerce
    description: "电商核心业务数据(Fivetran from PostgreSQL)"
    tables:
      - name: orders
        loaded_at_field: _fivetran_synced
        freshness: {warn_after: {count: 6, period: hour}}
      - name: customers
        loaded_at_field: _fivetran_synced
        freshness: {warn_after: {count: 6, period: hour}}
      - name: order_items
        loaded_at_field: _fivetran_synced

  # 来源 2:用户行为事件(Segment)
  - name: segment
    schema: raw_segment
    description: "用户行为事件数据(Segment Source)"
    tables:
      - name: tracks
        loaded_at_field: received_at
        freshness: {warn_after: {count: 1, period: hour}}
        columns:
          - name: id
            tests: [unique, not_null]
      - name: pages
        loaded_at_field: received_at
        freshness: {warn_after: {count: 1, period: hour}}

  # 来源 3:广告投放(Google Ads,每日同步)
  - name: google_ads
    schema: raw_google_ads
    description: "Google Ads 广告数据(每日同步)"
    tables:
      - name: campaigns
        loaded_at_field: _loaded_at
        freshness: {warn_after: {count: 25, period: hour}}
      - name: ad_performance
        loaded_at_field: _loaded_at
        freshness: {warn_after: {count: 25, period: hour}}

3.6 在 CI/CD 中集成 freshness 检查

生产环境中,建议在 dbt run 之前先检查数据新鲜度,避免在陈旧数据上运行转换:

BASH# 推荐的生产运行顺序
dbt source freshness   # 1. 检查数据新鲜度
dbt run                # 2. 运行所有模型
dbt test               # 3. 运行所有测试
dbt docs generate      # 4. 更新文档
📌

本章小结
Source 是 dbt 对原始数据表的声明,在 schema.ymlsources: 块中定义。声明包括:source 名称、database/schema 路径、每张表的描述、列描述及测试。

{{ source('name', 'table') }} 在模型中引用已声明的原始数据表。dbt source freshness 根据 loaded_at_field 检查数据是否按预期更新,对监控上游数据管道健康状况至关重要。