dbt 数据建模与指标治理¶
基于 olad_data_ap 项目实践,dbt 作为物理建模层与指标定义层 来源:olad_data_ap/ARCH-02/00-技术架构总纲.md §5
零基础:dbt 是什么?(从没用过也能懂)¶
一句话¶
dbt(data build tool)= 用 SQL 在数仓里「建表、洗数、分层」,并帮你管依赖顺序、测试和文档的一套工具。
它 不是 数据库,不是 调度系统(那是 Airflow),不是 Flink 那种流计算引擎。
你可能已经在做的事 vs dbt¶
| 你熟悉的方式 | dbt 里叫什么 |
|---|---|
写一堆 INSERT INTO dwd_xxx SELECT ... FROM ods_xxx |
一个 .sql 文件 = 一个 model(模型) |
先跑 ods 脚本,再跑 dwd,顺序靠 Airflow 配 |
ref('ods_xxx') 声明依赖,dbt 自动先跑上游 |
CREATE TABLE dws AS SELECT ... |
model 里只写 SELECT,dbt 决定建成 表还是视图 |
| 手工检查「今天分区有没有空」 | schema.yml 里写 test(not_null、unique 等) |
| SQL 文件扔 Git,但不知道谁依赖谁 | 血缘 lineage 自动从 ref() 生成 |
类比:若 Airflow 是 「几点钟开工」,dbt 是 「车间里每道工序的图纸 + 质检」——活还是 SQL,只是不再散落在十个脚本里硬排顺序。
跑起来时发生了什么?¶
你写的:models/dwd_bill.sql
SELECT ... FROM {{ ref('ods_bill') }}
dbt run 时:
1. 解析所有 model,画出 DAG(谁引用谁)
2. 按拓扑顺序,在 ClickHouse(或 Snowflake 等)里执行
3. 默认把 SELECT 结果物化成 表/视图(可配置 incremental 只追加新分区)
4. 跑 schema.yml 里的测试
数据始终在数仓里(你们文档里是 ClickHouse);dbt 只是 连上去执行 SQL 的客户端 + 工程化外壳。
和计费 / 数仓分层的关系¶
常见分层(你们文档里的 ODS → DWD → DWS → ADS):
ODS 原始入账/拉数落地 ← 往往由采集、Flink、批量导入完成
DWD 清洗、统一口径、去重 ← dbt 最常干这层
DWS 按天/账户/产品汇总 ← dbt
ADS 报表、对账、接口用的宽表 ← dbt
没用过 dbt 也可以:用纯 SQL + Airflow 同样能建这些表;dbt 的价值是 依赖、测试、文档、Git 协作 更省事,尤其模型多了以后。
需要会什么才能上手?¶
| 需要 | 不需要 |
|---|---|
| SQL(SELECT、JOIN、聚合) | Java / Scala |
| 知道表在哪(ClickHouse 等) | 先会 Airflow(可后学) |
| 基本概念:分区、增量 | 把它当成新数据库 |
本地试玩典型命令(了解即可,今天不必装):
dbt run # 按依赖执行所有 model
dbt test # 跑数据质量测试
dbt docs generate && dbt docs serve # 看血缘和文档站
和 DDIA / 你栈里其他组件¶
| 组件 | 分工 |
|---|---|
| Airflow | 定时:每天 2 点触发 dbt run |
| dbt | 里面 hundreds 条 SQL 怎么组织、测试 |
| ClickHouse | 表真正存哪 |
| Flink | 实时进 ODS 或实时汇总(dbt 不管实时) |
读 DDIA 框架 时看到「dbt」:记住它是 批处理派生数据 这条路上的 SQL 工程化工具 即可;不会 dbt 不影响你先读 Ch.10 批处理。
读完本节你应该能回答¶
- dbt 是数据库吗?→ 不是,连数仓跑 SQL 的工具
- 和 Airflow 区别?→ Airflow 调度,dbt 建模 + 测试
- 我没用过要紧吗?→ 你日常若写 数仓 SQL + 定时任务,概念上已经在做同类事
下文从 项目里怎么用 dbt 展开;若你司栈里没有 dbt,当 「数仓 SQL 最佳实践参考」 读即可。
一、dbt 在项目中的定位¶
1.1 六层架构中的位置¶
L1 接入层 → connector-svc
L2 数据层 → ClickHouse (ODS/DWD/DWS/ADS)
L3 计算层 → dbt-runner + Airflow ← dbt 在这里
L4 服务层 → metric-svc / attribution-svc / admin-svc / BFF
L5 AI 层 → ai-svc + Cube.js + 本地 LLM
L6 前端层 → Next.js
1.2 为什么选择 dbt?¶
| 优势 | 说明 |
|---|---|
| SQL 优先 | 数据团队直接用 SQL 建模,不需要学 Python/Java |
| 版本控制 | 模型代码可 Git 管理,变更可审计 |
| 数据血缘 | dbt 帮你追踪 lineage(dbt manifest) |
| 测试 | 内置 not_null / unique / relationships 测试 |
| 文档 | 自动生成 data catalog |
1.3 项目中的技术栈¶
# 项目结构
olad_data_ap/
├── dbt/
│ ├── models/
│ │ ├── staging/ # ODS → DWD(清洗)
│ │ │ └── stg_meta_ads.sql
│ │ └── marts/ # DWD → DWS(聚合)
│ │ └── dws_ad_perf_daily.sql
│ ├── schema.yml # 模型描述 + 测试
│ ├── dbt_project.yml # 项目配置
│ └── profiles.yml # 连接配置(ClickHouse)
二、数仓分层模型¶
2.1 ODS 层(原始层)¶
-- models/staging/stg_meta_ads.sql
-- 原始数据落 ODS,仅做字段映射,不做清洗
{{ config(
materialized = 'incremental',
unique_key = 'campaign_id',
on_schema_change = 'sync_all'
) }}
SELECT
account_id,
campaign_id,
campaign_name,
status,
dt,
ingested_at
FROM ods_meta_ads_campaign_raw
{% if is_incremental() %}
WHERE ingested_at > (SELECT max(ingested_at) FROM {{ this }})
{% endif %}
2.2 DWD 层(清洗层)¶
-- models/staging/stg_meta_ads.sql(续)
-- 标准化事件格式,统一字段名称
WITH source_enum AS (
SELECT 'meta' AS source, 1 AS source_id UNION ALL
SELECT 'google' AS source, 2 AS source_id UNION ALL
SELECT 'tiktok' AS source, 3 AS source_id
)
SELECT
{{ dbt_utils.generate_surrogate_key(['tenant_id', 'campaign_id']) }} AS event_key,
tenant_id,
event_date,
source_id AS source,
account_id,
campaign_id,
ad_id,
-- 标准化事件类型
CASE
WHEN event_type IN ('post_like', 'page_like') THEN 'conversion'
WHEN event_type IN ('video_view', 'page_view') THEN 'impression'
ELSE 'click'
END AS event_type,
event_value,
-- 维度标准化
country,
device
FROM ods_meta_ads_raw
JOIN source_enum USING (source)
2.3 DWS 层(汇总层)¶
-- models/marts/dws_ad_perf_daily.sql
-- 主题宽表,按天聚合
{{ config(
materialized = 'incremental',
unique_key = ['tenant_id', 'dt', 'source', 'account_id', 'country', 'device'],
partition_by = 'toYYYYMM(dt)',
order_by = 'tenant_id, dt, source, account_id'
) }}
SELECT
tenant_id,
dt,
source,
account_id,
country,
device,
-- 核心指标
sum(impressions) AS impressions,
sum(clicks) AS clicks,
sum(conversions) AS conversions,
sum(spend_usd) AS spend_usd,
sum(revenue_usd) AS revenue_usd,
-- 派生指标(这里只存原始值,计算在 ADS 层)
count(DISTINCT user_id) AS unique_users
FROM dwd_ad_event
WHERE dt >= '2024-01-01'
GROUP BY
tenant_id, dt, source, account_id, country, device
三、dbt 核心概念¶
3.1 Materialization 策略¶
| 策略 | 场景 | 项目中的应用 |
|---|---|---|
| table | 大表低频更新 | ADS 预聚合表 |
| incremental | 高频新增数据 | ODS/DWD/DWS 主流建模 |
| view | 小表高频访问 | 中间层视图 |
| ephemeral | CTEs 中间复用 | 不直接落表的中间逻辑 |
3.2 增量模型(Incremental)¶
-- 核心配置
{{ config(
materialized = 'incremental',
unique_key = 'id', -- 唯一键用于去重
partition_by = 'dt', -- 按日期分区
cluster_by = ['tenant_id'] -- 按租户聚簇
) }}
-- 增量逻辑
{% if is_incremental() %}
WHERE dt > (SELECT max(dt) FROM {{ this }})
{% endif %}
3.3 数据测试¶
# models/schema.yml
version: 2
models:
- name: dws_ad_perf_daily
description: 日度广告效果汇总宽表
columns:
- name: tenant_id
tests:
- not_null
- unique: false # 允许多条,按组合键唯一
- name: spend_usd
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 1000000
- name: roas
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 100
3.4 变量与宏¶
-- dbt_project.yml
vars:
effective_date: "2026-05-01"
lookback_days: 30
-- macros/ad_metrics.sql
{% macro calculate_roas(spend, revenue) %}
CASE
WHEN {{ spend }} > 0
THEN round({{ revenue }} / {{ spend }}, 2)
ELSE 0
END
{% endmacro %}
-- 使用
SELECT
spend_usd,
revenue_usd,
{{ calculate_roas('spend_usd', 'revenue_usd') }} AS roas
FROM dws_ad_perf_daily
四、指标治理(Metric Governance)¶
4.1 为什么需要指标治理?¶
问题:
- 不同报表的 ROAS 计算口径不同
- 口径变更后历史数据不一致
- 分析师不知道该用哪个指标
解决:
- 指标字典(ads_metric_dict)统一管理
- dbt 模型开源交付,口径透明
- 变更审计(ads_metric_change_log)
4.2 双层语义架构¶
Layer 1: dbt 物理层(物理 SQL)
└─ 定义:spend_sum = SUM(spend_usd)
Layer 2: Cube.js 逻辑层(语义层)
└─ 定义:ROAS = revenue_sum / spend_sum
支持:跨维度、加滤镜、时间对比
项目设计:
┌─────────────────────────────────────┐
│ 指标字典 metric-svc(单一真相源) │
│ ads_metric_dict + ads_metric_change_log │
└─────────────────────────────────────┘
↓ 查询
┌─────────────────────────────────────┐
│ Cube.js 语义层(ADS) │
│ 维度、度量、过滤器、时间对比 │
└─────────────────────────────────────┘
↓ 编译
┌─────────────────────────────────────┐
│ dbt 物理层(DWS/ADS) │
│ ClickHouse 执行 │
└─────────────────────────────────────┘
4.3 指标变更流程¶
-- 1. 提交变更申请
INSERT INTO ads_metric_change_log (
metric_id, old_definition, new_definition,
change_reason, requested_by, status
) VALUES (
'roas_v2',
'revenue / spend',
'revenue / spend WHERE revenue > 0', -- 新增分母非零保护
'避免 ROAS=∞ 的边界情况',
'analyst_zhang',
'pending'
);
-- 2. 评审通过后灰度生效
-- 3. 历史回灌(DWD 向上重算)
五、面试高频问题¶
Q1: dbt 和 Spark 的区别?¶
答:
- dbt:SQL 优先,运行在数仓上(ClickHouse/BigQuery/Snowflake)
- Spark:代码优先,支持复杂 ETL(Python/Java/Scala)
使用场景:
- dbt:轻量级建模、指标治理、快速迭代
- Spark:大规模数据处理、机器学习、复杂逻辑
项目中的分工:
- dbt:批处理建模(每日 DAG)
- Flink:实时流处理(V1.2 引入)
Q2: incremental 模型如何处理历史数据变更?¶
答:三种策略
1. 完全重算(full_refresh):dbt run --full-refresh
2. 增量追加(incremental):仅处理新增分区
3. 软删除+重新拉取:基于 updated_at 字段判断
项目中用:
- 每日增量 + 每周全量校验
- 对账机制检测数据异常
Q3: 如何保证数据血缘可追溯?¶
答:
1. dbt manifest 导出完整 lineage
2. 上下游模型通过 ref() 关联
3. 变更历史在 ads_metric_change_log 记录
4. 项目中配合 OpenLineage 上报(V1.2)
Q4: 数仓分层为什么这么设计?¶
答:分层原则
- ODS(Operational Data Store):原始层,只 INSERT 不 UPDATE
- DWD(Data Warehouse Detail):清洗层,统一事件口径
- DWS(Data Warehouse Summary):汇总层,按主题聚合
- ADS(Application Data Store):应用层,预聚合加速查询
好处:
1. 边界清晰:每层职责单一
2. 复用性高:DWD 可被多个 DWS 引用
3. 口径统一:从 DWD 向上重算,保证一致性
六、项目实践 Checklist¶
- 理解 dbt 在六层架构中的位置(计算层)
- 能解释 ODS → DWD → DWS → ADS 分层逻辑
- 掌握 incremental 模型配置(unique_key / partition_by)
- 理解双层语义架构(dbt 物理 + Cube 逻辑)
- 理解指标变更流程(评审 → 灰度 → 回灌)
- 能写简单的数据测试(not_null / dbt_expectations)
相关文档: - ClickHouse 深度解析 - olad_data_ap 技术架构总纲 §5