6.1 Jinja 模板基础
dbt 在 SQL 文件中嵌入了 Jinja2 模板引擎,使 SQL 可以包含变量、条件判断和循环。dbt 编译时先将 Jinja 渲染为纯 SQL,再提交到数据仓库执行。
| 语法 | 用途 | 示例 |
|---|---|---|
{{ ... }} | 输出表达式 | {{ ref('model') }} |
{% ... %} | 控制语句(if/for/set) | {% if condition %} |
{# ... #} | 注释(不输出到 SQL) | {# 这是注释 #} |
条件语句 {% if %}
SQL-- 根据目标环境选择不同逻辑
SELECT
order_id,
customer_id,
amount_dollars
{% if target.name == 'prod' %}
, sensitive_data -- 生产环境包含敏感字段
{% endif %}
FROM {{ ref('stg_orders') }}
{% if target.name != 'prod' %}
LIMIT 1000 -- 开发环境限制行数,加快查询速度
{% endif %}
循环语句 {% for %}
SQL-- 动态 UNION ALL 多张表(按月分区表)
{% set months = ['2024_01', '2024_02', '2024_03', '2024_04'] %}
{% for month in months %}
SELECT * FROM raw.events_{{ month }}
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}
6.2 宏(Macros)的定义与调用
宏是 Jinja 函数,定义在 macros/ 目录的 .sql 文件中,用于封装可复用的 SQL 片段:
SQL-- macros/clean_string.sql
{% macro clean_string(column_name) %}
LOWER(TRIM({{ column_name }}))
{% endmacro %}
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- macros/safe_divide.sql
{% macro safe_divide(numerator, denominator) %}
CASE
WHEN {{ denominator }} = 0 OR {{ denominator }} IS NULL
THEN NULL
ELSE {{ numerator }} * 1.0 / {{ denominator }}
END
{% endmacro %}
SQL-- 在模型中调用宏
SELECT
order_id,
{{ clean_string('customer_name') }} AS customer_name,
{{ cents_to_dollars('amount_cents') }} AS amount_dollars,
{{ safe_divide('revenue', 'orders') }} AS avg_order_value
FROM {{ ref('stg_orders') }}
生成代码的宏(generate_schema_name)
SQL-- macros/generate_schema_name.sql
-- 覆盖 dbt 默认 schema 命名逻辑
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
6.3 dbt_utils 包——常用工具宏
dbt_utils 是官方维护的工具宏包,提供大量开箱即用的实用函数:
YAML# packages.yml
packages:
- package: dbt-labs/dbt_utils
version: [">=1.2.0", "<2.0.0"]
BASH">dbt deps # 安装
surrogate_key — 生成代理键
SQLSELECT
{{ dbt_utils.generate_surrogate_key(['order_id', 'product_id']) }} AS order_item_key,
order_id,
product_id,
quantity
FROM {{ ref('stg_order_items') }}
-- 编译为: MD5(CAST(order_id AS VARCHAR) || '-' || CAST(product_id AS VARCHAR))
date_spine — 生成日期序列
SQL-- 生成从 2024-01-01 到今天的所有日期
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('2024-01-01' as date)",
end_date="current_date"
) }}
pivot — 行转列
SQLSELECT
customer_id,
{{ dbt_utils.pivot(
column = 'order_channel',
values = ['web', 'mobile', 'retail'],
agg = 'SUM',
then_value= 'amount_dollars'
) }}
FROM {{ ref('stg_orders') }}
GROUP BY customer_id
-- 编译为:
-- SUM(CASE WHEN order_channel = 'web' THEN amount_dollars ELSE 0 END) AS web,
-- SUM(CASE WHEN order_channel = 'mobile' THEN amount_dollars ELSE 0 END) AS mobile,
-- ...
get_column_values — 动态获取枚举值
SQL-- 自动获取所有 channel 值,无需硬编码
{% set channels = dbt_utils.get_column_values(
table = ref('stg_orders'),
column = 'order_channel'
) %}
SELECT
customer_id,
{% for channel in channels %}
SUM(CASE WHEN order_channel = '{{ channel }}'
THEN amount_dollars ELSE 0 END) AS {{ channel }}_revenue
{% if not loop.last %},{% endif %}
{% endfor %}
FROM {{ ref('stg_orders') }}
GROUP BY customer_id
6.4 var() — 项目变量
项目变量(Variables)允许从外部传入配置,让模型行为可配置:
YAML# dbt_project.yml
vars:
start_date: '2024-01-01'
payment_methods: ['credit_card', 'bank_transfer', 'paypal']
event_min_count: 10
SQL-- 在模型中使用变量
SELECT *
FROM {{ ref('stg_orders') }}
WHERE order_date >= '{{ var("start_date") }}'
-- 命令行覆盖变量
-- dbt run --vars '{"start_date": "2024-06-01", "event_min_count": 5}'
6.5 env_var() — 环境变量
敏感配置(密钥、连接字符串)不应硬编码在代码中,使用 env_var() 从环境变量读取:
YAML# profiles.yml — 使用环境变量
my_analytics:
target: prod
outputs:
prod:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
role: TRANSFORMER
database: ANALYTICS
warehouse: TRANSFORMING
schema: dbt_prod
BASH# 设置环境变量后运行
export SNOWFLAKE_ACCOUNT=xy12345.us-east-1
export SNOWFLAKE_USER=dbt_user
export SNOWFLAKE_PASSWORD=super_secret_password
dbt run --target prod
6.6 run_query() — 动态 SQL 执行
run_query() 允许在宏中执行 SQL 并获取结果,用于动态生成代码:
SQL-- macros/get_max_date.sql
{% macro get_max_loaded_date(table_ref) %}
{% set query %}
SELECT MAX(_loaded_at) FROM {{ table_ref }}
{% endset %}
{% set results = run_query(query) %}
{% if execute %}
{% set max_date = results.columns[0].values()[0] %}
{{ return(max_date) }}
{% endif %}
{% endmacro %}
run_query() 在编译阶段执行
run_query() 在 dbt 编译(而非运行)阶段执行,所以被查询的表必须已经存在。通常需要用 {% if execute %} 包裹,避免在 dbt compile 时报错。
本章小结
Jinja 模板让 SQL 拥有条件、循环等编程能力,解决了跨环境差异(dev/prod)、动态列、重复代码等问题。宏(macro)定义在 macros/ 目录,是可复用的 SQL 函数。
dbt_utils 包(dbt-labs/dbt_utils)提供 surrogate_key、date_spine、pivot、get_column_values 等常用工具宏,大幅减少样板代码。var() 读取项目变量,env_var() 读取系统环境变量(适合存放密钥)。