跳转至

dbt 数据建模与指标治理

基于 olad_data_ap 项目实践,dbt 作为物理建模层与指标定义层 来源:olad_data_ap/ARCH-02/00-技术架构总纲.md §5

← 返回 数据工程索引 | DDIA 框架


零基础:dbt 是什么?(从没用过也能懂)

一句话

dbt(data build tool)= 用 SQL 在数仓里「建表、洗数、分层」,并帮你管依赖顺序、测试和文档的一套工具。

不是 数据库,不是 调度系统(那是 Airflow),不是 Flink 那种流计算引擎。


你可能已经在做的事 vs dbt

你熟悉的方式 dbt 里叫什么
写一堆 INSERT INTO dwd_xxx SELECT ... FROM ods_xxx 一个 .sql 文件 = 一个 model(模型)
先跑 ods 脚本,再跑 dwd,顺序靠 Airflow 配 ref('ods_xxx') 声明依赖,dbt 自动先跑上游
CREATE TABLE dws AS SELECT ... model 里只写 SELECT,dbt 决定建成 表还是视图
手工检查「今天分区有没有空」 schema.yml 里写 test(not_null、unique 等)
SQL 文件扔 Git,但不知道谁依赖谁 血缘 lineage 自动从 ref() 生成

类比:若 Airflow 是 「几点钟开工」,dbt 是 「车间里每道工序的图纸 + 质检」——活还是 SQL,只是不再散落在十个脚本里硬排顺序。


跑起来时发生了什么?

你写的:models/dwd_bill.sql
        SELECT ... FROM {{ ref('ods_bill') }}

dbt run 时:
  1. 解析所有 model,画出 DAG(谁引用谁)
  2. 按拓扑顺序,在 ClickHouse(或 Snowflake 等)里执行
  3. 默认把 SELECT 结果物化成 表/视图(可配置 incremental 只追加新分区)
  4. 跑 schema.yml 里的测试

数据始终在数仓里(你们文档里是 ClickHouse);dbt 只是 连上去执行 SQL 的客户端 + 工程化外壳


和计费 / 数仓分层的关系

常见分层(你们文档里的 ODS → DWD → DWS → ADS):

ODS  原始入账/拉数落地          ← 往往由采集、Flink、批量导入完成
DWD  清洗、统一口径、去重       ← dbt 最常干这层
DWS  按天/账户/产品汇总         ← dbt
ADS  报表、对账、接口用的宽表   ← dbt

没用过 dbt 也可以:用纯 SQL + Airflow 同样能建这些表;dbt 的价值是 依赖、测试、文档、Git 协作 更省事,尤其模型多了以后。


需要会什么才能上手?

需要 不需要
SQL(SELECT、JOIN、聚合) Java / Scala
知道表在哪(ClickHouse 等) 先会 Airflow(可后学)
基本概念:分区、增量 把它当成新数据库

本地试玩典型命令(了解即可,今天不必装):

dbt run          # 按依赖执行所有 model
dbt test         # 跑数据质量测试
dbt docs generate && dbt docs serve   # 看血缘和文档站

和 DDIA / 你栈里其他组件

组件 分工
Airflow 定时:每天 2 点触发 dbt run
dbt 里面 hundreds 条 SQL 怎么组织、测试
ClickHouse 表真正存哪
Flink 实时进 ODS 或实时汇总(dbt 不管实时)

DDIA 框架 时看到「dbt」:记住它是 批处理派生数据 这条路上的 SQL 工程化工具 即可;不会 dbt 不影响你先读 Ch.10 批处理


读完本节你应该能回答

  1. dbt 是数据库吗?→ 不是,连数仓跑 SQL 的工具
  2. 和 Airflow 区别?→ Airflow 调度,dbt 建模 + 测试
  3. 我没用过要紧吗?→ 你日常若写 数仓 SQL + 定时任务,概念上已经在做同类事

下文从 项目里怎么用 dbt 展开;若你司栈里没有 dbt,当 「数仓 SQL 最佳实践参考」 读即可。


一、dbt 在项目中的定位

1.1 六层架构中的位置

L1 接入层 → connector-svc
L2 数据层 → ClickHouse (ODS/DWD/DWS/ADS)
L3 计算层 → dbt-runner + Airflow ← dbt 在这里
L4 服务层 → metric-svc / attribution-svc / admin-svc / BFF
L5 AI 层 → ai-svc + Cube.js + 本地 LLM
L6 前端层 → Next.js

1.2 为什么选择 dbt?

优势 说明
SQL 优先 数据团队直接用 SQL 建模,不需要学 Python/Java
版本控制 模型代码可 Git 管理,变更可审计
数据血缘 dbt 帮你追踪 lineage(dbt manifest)
测试 内置 not_null / unique / relationships 测试
文档 自动生成 data catalog

1.3 项目中的技术栈

# 项目结构
olad_data_ap/
├── dbt/
│   ├── models/
│      ├── staging/       # ODS → DWD(清洗)         └── stg_meta_ads.sql
│      └── marts/         # DWD → DWS(聚合)          └── dws_ad_perf_daily.sql
│   ├── schema.yml         # 模型描述 + 测试   ├── dbt_project.yml    # 项目配置   └── profiles.yml       # 连接配置(ClickHouse)

二、数仓分层模型

2.1 ODS 层(原始层)

-- models/staging/stg_meta_ads.sql
-- 原始数据落 ODS,仅做字段映射,不做清洗

{{ config(
    materialized = 'incremental',
    unique_key = 'campaign_id',
    on_schema_change = 'sync_all'
) }}

SELECT
    account_id,
    campaign_id,
    campaign_name,
    status,
    dt,
    ingested_at
FROM ods_meta_ads_campaign_raw
{% if is_incremental() %}
WHERE ingested_at > (SELECT max(ingested_at) FROM {{ this }})
{% endif %}

2.2 DWD 层(清洗层)

-- models/staging/stg_meta_ads.sql(续)
-- 标准化事件格式,统一字段名称

WITH source_enum AS (
    SELECT 'meta'    AS source, 1 AS source_id UNION ALL
    SELECT 'google'  AS source, 2 AS source_id UNION ALL
    SELECT 'tiktok'  AS source, 3 AS source_id
)

SELECT
    {{ dbt_utils.generate_surrogate_key(['tenant_id', 'campaign_id']) }} AS event_key,
    tenant_id,
    event_date,
    source_id AS source,
    account_id,
    campaign_id,
    ad_id,
    -- 标准化事件类型
    CASE 
        WHEN event_type IN ('post_like', 'page_like') THEN 'conversion'
        WHEN event_type IN ('video_view', 'page_view')  THEN 'impression'
        ELSE 'click'
    END AS event_type,
    event_value,
    -- 维度标准化
    country,
    device
FROM ods_meta_ads_raw
JOIN source_enum USING (source)

2.3 DWS 层(汇总层)

-- models/marts/dws_ad_perf_daily.sql
-- 主题宽表,按天聚合

{{ config(
    materialized = 'incremental',
    unique_key = ['tenant_id', 'dt', 'source', 'account_id', 'country', 'device'],
    partition_by = 'toYYYYMM(dt)',
    order_by = 'tenant_id, dt, source, account_id'
) }}

SELECT
    tenant_id,
    dt,
    source,
    account_id,
    country,
    device,
    -- 核心指标
    sum(impressions)   AS impressions,
    sum(clicks)        AS clicks,
    sum(conversions)   AS conversions,
    sum(spend_usd)    AS spend_usd,
    sum(revenue_usd)  AS revenue_usd,
    -- 派生指标(这里只存原始值,计算在 ADS 层)
    count(DISTINCT user_id) AS unique_users
FROM dwd_ad_event
WHERE dt >= '2024-01-01'
GROUP BY
    tenant_id, dt, source, account_id, country, device

三、dbt 核心概念

3.1 Materialization 策略

策略 场景 项目中的应用
table 大表低频更新 ADS 预聚合表
incremental 高频新增数据 ODS/DWD/DWS 主流建模
view 小表高频访问 中间层视图
ephemeral CTEs 中间复用 不直接落表的中间逻辑

3.2 增量模型(Incremental)

-- 核心配置
{{ config(
    materialized = 'incremental',
    unique_key = 'id',           -- 唯一键用于去重
    partition_by = 'dt',          -- 按日期分区
    cluster_by = ['tenant_id']   -- 按租户聚簇
) }}

-- 增量逻辑
{% if is_incremental() %}
WHERE dt > (SELECT max(dt) FROM {{ this }})
{% endif %}

3.3 数据测试

# models/schema.yml
version: 2

models:
  - name: dws_ad_perf_daily
    description: 日度广告效果汇总宽表
    columns:
      - name: tenant_id
        tests:
          - not_null
          - unique: false  # 允许多条,按组合键唯一

      - name: spend_usd
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 1000000

      - name: roas
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 100

3.4 变量与宏

-- dbt_project.yml
vars:
  effective_date: "2026-05-01"
  lookback_days: 30

-- macros/ad_metrics.sql
{% macro calculate_roas(spend, revenue) %}
    CASE 
        WHEN {{ spend }} > 0 
        THEN round({{ revenue }} / {{ spend }}, 2)
        ELSE 0
    END
{% endmacro %}

-- 使用
SELECT
    spend_usd,
    revenue_usd,
    {{ calculate_roas('spend_usd', 'revenue_usd') }} AS roas
FROM dws_ad_perf_daily

四、指标治理(Metric Governance)

4.1 为什么需要指标治理?

问题:
- 不同报表的 ROAS 计算口径不同
- 口径变更后历史数据不一致
- 分析师不知道该用哪个指标

解决:
- 指标字典(ads_metric_dict)统一管理
- dbt 模型开源交付,口径透明
- 变更审计(ads_metric_change_log)

4.2 双层语义架构

Layer 1: dbt 物理层(物理 SQL)
  └─ 定义:spend_sum = SUM(spend_usd)

Layer 2: Cube.js 逻辑层(语义层)
  └─ 定义:ROAS = revenue_sum / spend_sum
           支持:跨维度、加滤镜、时间对比

项目设计:
┌─────────────────────────────────────┐
│ 指标字典 metric-svc(单一真相源)      │
│   ads_metric_dict + ads_metric_change_log │
└─────────────────────────────────────┘
         ↓ 查询
┌─────────────────────────────────────┐
│  Cube.js 语义层(ADS)                 │
│   维度、度量、过滤器、时间对比         │
└─────────────────────────────────────┘
         ↓ 编译
┌─────────────────────────────────────┐
│  dbt 物理层(DWS/ADS)                │
│   ClickHouse 执行                    │
└─────────────────────────────────────┘

4.3 指标变更流程

-- 1. 提交变更申请
INSERT INTO ads_metric_change_log (
    metric_id, old_definition, new_definition, 
    change_reason, requested_by, status
) VALUES (
    'roas_v2', 
    'revenue / spend',
    'revenue / spend WHERE revenue > 0',  -- 新增分母非零保护
    '避免 ROAS=∞ 的边界情况',
    'analyst_zhang',
    'pending'
);

-- 2. 评审通过后灰度生效
-- 3. 历史回灌(DWD 向上重算)

五、面试高频问题

Q1: dbt 和 Spark 的区别?

答:
- dbt:SQL 优先,运行在数仓上(ClickHouse/BigQuery/Snowflake)
- Spark:代码优先,支持复杂 ETL(Python/Java/Scala)

使用场景:
- dbt:轻量级建模、指标治理、快速迭代
- Spark:大规模数据处理、机器学习、复杂逻辑

项目中的分工:
- dbt:批处理建模(每日 DAG)
- Flink:实时流处理(V1.2 引入)

Q2: incremental 模型如何处理历史数据变更?

答:三种策略
1. 完全重算(full_refresh):dbt run --full-refresh
2. 增量追加(incremental):仅处理新增分区
3. 软删除+重新拉取:基于 updated_at 字段判断

项目中用:
- 每日增量 + 每周全量校验
- 对账机制检测数据异常

Q3: 如何保证数据血缘可追溯?

答:
1. dbt manifest 导出完整 lineage
2. 上下游模型通过 ref() 关联
3. 变更历史在 ads_metric_change_log 记录
4. 项目中配合 OpenLineage 上报(V1.2)

Q4: 数仓分层为什么这么设计?

答:分层原则
- ODS(Operational Data Store):原始层,只 INSERT 不 UPDATE
- DWD(Data Warehouse Detail):清洗层,统一事件口径
- DWS(Data Warehouse Summary):汇总层,按主题聚合
- ADS(Application Data Store):应用层,预聚合加速查询

好处:
1. 边界清晰:每层职责单一
2. 复用性高:DWD 可被多个 DWS 引用
3. 口径统一:从 DWD 向上重算,保证一致性

六、项目实践 Checklist

  • 理解 dbt 在六层架构中的位置(计算层)
  • 能解释 ODS → DWD → DWS → ADS 分层逻辑
  • 掌握 incremental 模型配置(unique_key / partition_by)
  • 理解双层语义架构(dbt 物理 + Cube 逻辑)
  • 理解指标变更流程(评审 → 灰度 → 回灌)
  • 能写简单的数据测试(not_null / dbt_expectations)

相关文档: - ClickHouse 深度解析 - olad_data_ap 技术架构总纲 §5