Chapter 06

Jinja 与宏(Macros)

用 Jinja 模板让 SQL 动态化,用宏封装可复用的转换逻辑,告别 SQL 重复代码

6.1 Jinja 模板基础

dbt 在 SQL 文件中嵌入了 Jinja2 模板引擎,使 SQL 可以包含变量、条件判断和循环。dbt 编译时先将 Jinja 渲染为纯 SQL,再提交到数据仓库执行。

语法用途示例
{{ ... }}输出表达式{{ ref('model') }}
{% ... %}控制语句(if/for/set){% if condition %}
{# ... #}注释(不输出到 SQL){# 这是注释 #}

条件语句 {% if %}

SQL-- 根据目标环境选择不同逻辑
SELECT
    order_id,
    customer_id,
    amount_dollars
    {% if target.name == 'prod' %}
    , sensitive_data    -- 生产环境包含敏感字段
    {% endif %}
FROM {{ ref('stg_orders') }}

{% if target.name != 'prod' %}
LIMIT 1000    -- 开发环境限制行数,加快查询速度
{% endif %}

循环语句 {% for %}

SQL-- 动态 UNION ALL 多张表(按月分区表)
{% set months = ['2024_01', '2024_02', '2024_03', '2024_04'] %}

{% for month in months %}
SELECT * FROM raw.events_{{ month }}
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}

6.2 宏(Macros)的定义与调用

宏是 Jinja 函数,定义在 macros/ 目录的 .sql 文件中,用于封装可复用的 SQL 片段:

SQL-- macros/clean_string.sql
{% macro clean_string(column_name) %}
    LOWER(TRIM({{ column_name }}))
{% endmacro %}

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
    ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}

-- macros/safe_divide.sql
{% macro safe_divide(numerator, denominator) %}
    CASE
        WHEN {{ denominator }} = 0 OR {{ denominator }} IS NULL
        THEN NULL
        ELSE {{ numerator }} * 1.0 / {{ denominator }}
    END
{% endmacro %}
SQL-- 在模型中调用宏
SELECT
    order_id,
    {{ clean_string('customer_name') }}    AS customer_name,
    {{ cents_to_dollars('amount_cents') }}   AS amount_dollars,
    {{ safe_divide('revenue', 'orders') }}    AS avg_order_value
FROM {{ ref('stg_orders') }}

生成代码的宏(generate_schema_name)

SQL-- macros/generate_schema_name.sql
-- 覆盖 dbt 默认 schema 命名逻辑
{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ default_schema }}
    {%- else -%}
        {{ default_schema }}_{{ custom_schema_name | trim }}
    {%- endif -%}
{%- endmacro %}

6.3 dbt_utils 包——常用工具宏

dbt_utils 是官方维护的工具宏包,提供大量开箱即用的实用函数:

YAML# packages.yml
packages:
  - package: dbt-labs/dbt_utils
    version: [">=1.2.0", "<2.0.0"]
BASH">dbt deps   # 安装

surrogate_key — 生成代理键

SQLSELECT
    {{ dbt_utils.generate_surrogate_key(['order_id', 'product_id']) }} AS order_item_key,
    order_id,
    product_id,
    quantity
FROM {{ ref('stg_order_items') }}
-- 编译为: MD5(CAST(order_id AS VARCHAR) || '-' || CAST(product_id AS VARCHAR))

date_spine — 生成日期序列

SQL-- 生成从 2024-01-01 到今天的所有日期
{{ dbt_utils.date_spine(
    datepart="day",
    start_date="cast('2024-01-01' as date)",
    end_date="current_date"
) }}

pivot — 行转列

SQLSELECT
    customer_id,
    {{ dbt_utils.pivot(
        column    = 'order_channel',
        values    = ['web', 'mobile', 'retail'],
        agg       = 'SUM',
        then_value= 'amount_dollars'
    ) }}
FROM {{ ref('stg_orders') }}
GROUP BY customer_id
-- 编译为:
-- SUM(CASE WHEN order_channel = 'web' THEN amount_dollars ELSE 0 END) AS web,
-- SUM(CASE WHEN order_channel = 'mobile' THEN amount_dollars ELSE 0 END) AS mobile,
-- ...

get_column_values — 动态获取枚举值

SQL-- 自动获取所有 channel 值,无需硬编码
{% set channels = dbt_utils.get_column_values(
    table = ref('stg_orders'),
    column = 'order_channel'
) %}

SELECT
    customer_id,
    {% for channel in channels %}
    SUM(CASE WHEN order_channel = '{{ channel }}'
             THEN amount_dollars ELSE 0 END) AS {{ channel }}_revenue
    {% if not loop.last %},{% endif %}
    {% endfor %}
FROM {{ ref('stg_orders') }}
GROUP BY customer_id

6.4 var() — 项目变量

项目变量(Variables)允许从外部传入配置,让模型行为可配置:

YAML# dbt_project.yml
vars:
  start_date: '2024-01-01'
  payment_methods: ['credit_card', 'bank_transfer', 'paypal']
  event_min_count: 10
SQL-- 在模型中使用变量
SELECT *
FROM {{ ref('stg_orders') }}
WHERE order_date >= '{{ var("start_date") }}'

-- 命令行覆盖变量
-- dbt run --vars '{"start_date": "2024-06-01", "event_min_count": 5}'

6.5 env_var() — 环境变量

敏感配置(密钥、连接字符串)不应硬编码在代码中,使用 env_var() 从环境变量读取:

YAML# profiles.yml — 使用环境变量
my_analytics:
  target: prod
  outputs:
    prod:
      type: snowflake
      account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
      role: TRANSFORMER
      database: ANALYTICS
      warehouse: TRANSFORMING
      schema: dbt_prod
BASH# 设置环境变量后运行
export SNOWFLAKE_ACCOUNT=xy12345.us-east-1
export SNOWFLAKE_USER=dbt_user
export SNOWFLAKE_PASSWORD=super_secret_password
dbt run --target prod

6.6 run_query() — 动态 SQL 执行

run_query() 允许在宏中执行 SQL 并获取结果,用于动态生成代码:

SQL-- macros/get_max_date.sql
{% macro get_max_loaded_date(table_ref) %}
    {% set query %}
        SELECT MAX(_loaded_at) FROM {{ table_ref }}
    {% endset %}

    {% set results = run_query(query) %}
    {% if execute %}
        {% set max_date = results.columns[0].values()[0] %}
        {{ return(max_date) }}
    {% endif %}
{% endmacro %}
⚠️

run_query() 在编译阶段执行 run_query() 在 dbt 编译(而非运行)阶段执行,所以被查询的表必须已经存在。通常需要用 {% if execute %} 包裹,避免在 dbt compile 时报错。

📌

本章小结
Jinja 模板让 SQL 拥有条件、循环等编程能力,解决了跨环境差异(dev/prod)、动态列、重复代码等问题。宏(macro)定义在 macros/ 目录,是可复用的 SQL 函数。

dbt_utils 包(dbt-labs/dbt_utils)提供 surrogate_keydate_spinepivotget_column_values 等常用工具宏,大幅减少样板代码。var() 读取项目变量,env_var() 读取系统环境变量(适合存放密钥)。