Chapter 10

CI/CD 与生产化

将 dbt 项目接入自动化流水线——PR 时自动测试,按计划调度,持续监控数据质量

10.1 GitHub Actions:基础 CI/CD 流水线

每次 Pull Request 时自动运行 dbt build,确保代码变更不会破坏数据质量:

YAML"># .github/workflows/dbt-ci.yml
name: dbt CI

on:
  pull_request:
    branches: [main]
    paths:
      - 'models/**'
      - 'tests/**'
      - 'macros/**'
      - 'packages.yml'
      - 'dbt_project.yml'

jobs:
  dbt-build:
    runs-on: ubuntu-latest
    env:
      DBT_PROFILES_DIR: .
      BIGQUERY_KEYFILE: ${{ secrets.BIGQUERY_KEYFILE }}

    steps:
      - uses: actions/checkout@v4

      - name: Setup Python 3.11
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: pip

      - name: Install dbt
        run: pip install dbt-core dbt-bigquery dbt-expectations

      - name: Write GCP keyfile
        run: |
          echo '${{ secrets.BIGQUERY_KEYFILE }}' > /tmp/keyfile.json

      - name: Install dbt packages
        run: dbt deps

      - name: Check source freshness
        run: dbt source freshness --target ci
        continue-on-error: true   # freshness 警告不阻断 CI

      - name: Run dbt build (run + test)
        run: dbt build --target ci

10.2 Slim CI:只测试变更的模型

完整的 dbt build 在大型项目中可能需要数小时。Slim CI 利用 dbt Artifacts(state 对比)只运行本次 PR 中变更的模型及其下游依赖,大幅缩短 CI 时间:

YAML"># .github/workflows/dbt-slim-ci.yml
name: dbt Slim CI

on:
  pull_request:
    branches: [main]

jobs:
  slim-ci:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dbt
        run: pip install dbt-core dbt-bigquery

      - name: Install packages
        run: dbt deps

      - name: Download production manifest
        run: |
          # 下载生产环境最新的 manifest.json(用于状态对比)
          gsutil cp gs://my-dbt-artifacts/prod/manifest.json ./prod_manifest.json

      - name: dbt Slim CI build
        run: |
          dbt build \
            --target ci \
            --select "state:modified+"          `# 变更的模型 + 所有下游` \
            --defer                              `# 未运行的上游依赖引用 prod 的结果` \
            --state ./                           `# 当前编译产出`

      - name: Upload CI manifest
        run: gsutil cp ./target/manifest.json gs://my-dbt-artifacts/ci/${{ github.run_id }}/manifest.json
💡

--defer 参数的作用 --defer 告诉 dbt:对于 CI 环境中没有运行的上游模型(因为 Slim CI 跳过了它们),直接引用生产环境中已有的表。这样即使只运行了变更的模型,下游模型仍然有数据可以 JOIN。

10.3 生产部署流水线

YAML"># .github/workflows/dbt-prod.yml
name: dbt Production Deploy

on:
  push:
    branches: [main]
  schedule:
    - cron: '0 3 * * *'   # 每天 UTC 03:00(北京时间 11:00)运行

jobs:
  dbt-prod:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dbt
        run: pip install dbt-core dbt-bigquery

      - name: Install packages
        run: dbt deps

      - name: Source freshness check
        run: dbt source freshness --target prod

      - name: Run full build
        run: dbt build --target prod

      - name: Generate and upload docs
        run: |
          dbt docs generate --target prod
          gsutil -m cp -r ./target gs://my-dbt-docs/latest/

      - name: Upload artifacts
        run: |
          gsutil cp ./target/manifest.json    gs://my-dbt-artifacts/prod/manifest.json
          gsutil cp ./target/run_results.json gs://my-dbt-artifacts/prod/run_results.json
          gsutil cp ./target/catalog.json     gs://my-dbt-artifacts/prod/catalog.json

      - name: Notify on failure
        if: failure()
        uses: slackapi/slack-github-action@v1
        with:
          channel-id: 'data-alerts'
          slack-message: "dbt 生产构建失败: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
        env:
          SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

10.4 dbt Artifacts — 元数据文件

每次 dbt 运行后,target/ 目录会生成以下元数据文件:

文件内容用途
manifest.json项目完整结构:模型/测试/宏的元数据、依赖 DAGSlim CI state 对比、血缘分析
run_results.json本次运行每个节点的执行结果、耗时、行数性能分析、失败告警
catalog.json数据仓库中表/视图的实际 schema(列名、类型、统计)dbt docs 展示列信息
sources.jsonsource freshness 检查结果数据管道监控

分析慢模型和失败模式

PYTHON"># parse_run_results.py — 分析 run_results.json
import json
from pathlib import Path

with open('target/run_results.json') as f:
    results = json.load(f)

# 找出执行最慢的 5 个模型
nodes = results['results']
slow_models = sorted(
    [n for n in nodes if n['status'] == 'success'],
    key=lambda x: x.get('execution_time', 0),
    reverse=True
)[:5]

print("最慢的模型:")
for m in slow_models:
    name = m['unique_id'].split('.')[-1]
    time = m.get('execution_time', 0)
    print(f"  {name}: {time:.1f}s")

# 失败的测试
failures = [n for n in nodes if n['status'] == 'fail']
if failures:
    print(f"\n失败的测试 ({len(failures)} 个):")
    for f in failures:
        print(f"  {f['unique_id']}")

10.5 Apache Airflow 调度 dbt

大型数据团队通常使用 Airflow(或 Prefect/Dagster)编排复杂的数据管道,dbt 作为 Airflow DAG 中的一个步骤:

PYTHON"># dags/daily_analytics.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-team',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['data-team@company.com'],
}

with DAG(
    'daily_analytics_dbt',
    default_args=default_args,
    description='Daily dbt transformation run',
    schedule_interval='0 3 * * *',   # UTC 03:00
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['dbt', 'analytics'],
) as dag:

    dbt_source_freshness = BashOperator(
        task_id='dbt_source_freshness',
        bash_command='cd /opt/dbt && dbt source freshness --target prod',
    )

    dbt_run = BashOperator(
        task_id='dbt_run',
        bash_command='cd /opt/dbt && dbt run --target prod',
    )

    dbt_test = BashOperator(
        task_id='dbt_test',
        bash_command='cd /opt/dbt && dbt test --target prod',
    )

    dbt_docs_generate = BashOperator(
        task_id='dbt_docs_generate',
        bash_command='cd /opt/dbt && dbt docs generate --target prod',
    )

    # 定义依赖关系
    dbt_source_freshness >> dbt_run >> dbt_test >> dbt_docs_generate
🔧

推荐使用 astronomer-cosmos Astronomer 的 Cosmos 库可以将 dbt 项目直接解析为 Airflow DAG,每个 dbt 模型对应一个 Airflow Task,实现更细粒度的任务调度、重试和依赖管理。

10.6 数据质量监控集成

与 Great Expectations 集成

BASH"># 安装 great_expectations
pip install great_expectations

# 在 dbt test 失败后触发 GE 详细报告
dbt test --store-failures

dbt 内置异常检测

YAML"># 使用 dbt-expectations 做统计异常检测
models:
  - name: fct_orders
    tests:
      # 检测行数异常(相比昨天不能减少超过 20%)
      - dbt_expectations.expect_table_row_count_to_be_between:
          min_value: "{{ (var('yesterday_row_count', 0) * 0.8) | int }}"
    columns:
      - name: amount_dollars
        tests:
          # 平均值不能偏离历史均值超过 50%
          - dbt_expectations.expect_column_mean_to_be_between:
              min_value: 10.0
              max_value: 5000.0

10.7 生产化检查清单

📌

本章小结
GitHub Actions 实现 dbt CI/CD 全自动化:PR 时触发 Slim CI(state:modified+ --defer)只测试变更,合并到 main 时触发完整生产构建。

dbt Artifacts(manifest.json/run_results.json/catalog.json)是元数据宝库,用于 Slim CI 状态对比、性能分析和失败模式追踪。Airflow + astronomer-cosmos 实现细粒度的生产调度和依赖管理。

生产化的 dbt 项目需要:环境隔离、CI 自动测试、调度告警、Artifacts 归档、文档发布、freshness 监控——让数据工程像软件工程一样可靠、可审计、可维护。