跳转至

Airflow 调度与数据管道

基于 olad_data_ap 项目实践,Airflow 作为任务编排与调度引擎 来源:olad_data_ap/ARCH-02/00-技术架构总纲.md §4.2


一、Airflow 在项目中的定位

1.1 六层架构中的位置

L1 接入层 → connector-svc(定时拉取媒体数据)
L2 数据层 → ClickHouse (ODS/DWD/DWS/ADS)
L3 计算层 → Airflow ← 调度 dbt-runner + connector
L4 服务层 → metric-svc / attribution-svc / admin-svc / BFF
L5 AI 层 → ai-svc + Cube.js + 本地 LLM
L6 前端层 → Next.js

1.2 为什么选择 Airflow?

优势 说明
Python 原生 数据团队直接用 Python 写 DAG
丰富的 Operator 支持 Python/Bash/Docker/Kubernetes/Cloud
可观测性 内置 Web UI 查看任务状态/日志
定时调度 Cron 表达式灵活配置
回填支持 catchup/backfill 补历史数据

1.3 项目中的技术栈

# 项目结构
olad_data_ap/
├── airflow/
│   ├── dags/
│      └── example_hello_dag.py
│   └── plugins/
│       └── .gitkeep
├── infra/
│   └── docker-compose.yml  # Airflow 容器
├── Makefile
│   ├── make up        # 启动包括 Airflow   ├── make dbt-run   # 触发 dbt DAG   └── make seed      # 灌 mock 数据

二、DAG 基础概念

2.1 DAG 结构

# airflow/dags/example_hello_dag.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# 默认参数
default_args = {
    'owner': 'ads-platform',
    'depends_on_past': False,      # 不依赖上次运行
    'wait_for_downstream': False,
    'start_date': datetime(2026, 1, 1),
    'retries': 3,                  # 重试次数
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

# 定义 DAG
with DAG(
    dag_id='ads_daily_pipeline',      # 唯一 ID
    default_args=default_args,
    description='广告数据每日处理管道',
    schedule_interval='0 2 * * *',    # 每天凌晨 2 点
    catchup=True,                     # 补历史数据
    tags=['ads', 'daily'],
) as dag:

    # 任务 1: 拉取昨日数据
    pull_meta_ads = BashOperator(
        task_id='pull_meta_ads',
        bash_command='python scripts/pull_meta_ads.py {{ ds }}',
    )

    # 任务 2: 拉取 Google Ads
    pull_google_ads = BashOperator(
        task_id='pull_google_ads',
        bash_command='python scripts/pull_google_ads.py {{ ds }}',
    )

    # 任务 3: 运行 dbt 模型
    run_dbt_models = BashOperator(
        task_id='run_dbt_models',
        bash_command='cd /olad_data_ap && dbt run --target prod',
    )

    # 任务 4: 数据质量检查
    data_quality_check = PythonOperator(
        task_id='data_quality_check',
        python_callable=check_data_quality,
        op_kwargs={'target_date': '{{ ds }}'},
    )

    # 任务依赖关系
    [pull_meta_ads, pull_google_ads] >> run_dbt_models >> data_quality_check

2.2 时间相关模板

模板 说明 示例
{{ ds }} 业务日期(YYYY-MM-DD) 2026-05-18
{{ ds_nodash }} 无连字符 20260518
{{ prev_ds }} 上次执行日期 2026-05-17
{{ next_ds }} 下次执行日期 2026-05-19
{{ execution_date }} 执行时间 2026-05-18T02:00:00
{{ ti }} TaskInstance 对象 用于 XCom

2.3 任务依赖关系

# 线性依赖
task1 >> task2 >> task3

# 并行执行
[task1, task2, task3] >> task4

# 复杂依赖
[task1, task2] >> [task3, task4] >> task5

三、常用 Operator

3.1 Python Operator

from airflow.operators.python import PythonOperator, BranchPythonOperator

def extract_transform_load(target_date: str, **context):
    """从 ClickHouse 读取数据,处理后写回"""
    from ads_core import ch_client

    # 获取上任务数据
    ti = context['ti']
    source_data = ti.xcom_pull(task_ids='extract_data')

    # 处理
    processed = transform_data(source_data, target_date)

    # 写入
    ch_client.execute(processed)

    # 传递数据给下任务
    return processed

extract_task = PythonOperator(
    task_id='etl_process',
    python_callable=extract_transform_load,
    op_kwargs={'target_date': '{{ ds }}'},
)

3.2 Bash Operator

from airflow.operators.bash import BashOperator

run_dbt = BashOperator(
    task_id='run_dbt_daily',
    bash_command='''
        cd /olad_data_ap/dbt &&
        dbt run --select tag:daily --vars '{"date": "{{ ds }}"}' &&
        dbt test --select tag:daily
    ''',
    env={'CH_HOST': 'clickhouse'},  # 环境变量
    append_env=True,
)

3.3 Sensor(等待外部条件)

from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_sensor import TimeSensor

# 等待上游 DAG 完成
wait_upstream = ExternalTaskSensor(
    task_id='wait_upstream_pipeline',
    external_dag_id='upstream_dag',
    external_task_id=None,  # 等待整个 DAG
    timeout=timedelta(hours=4),
    poke_interval=300,  # 每 5 分钟检查一次
)

# 等待特定时间点
wait_until_2am = TimeSensor(
    task_id='wait_until_2am',
    target_time=time(2, 0),
    poke_interval=300,
)

四、项目中的实际 DAG

4.1 每日数据管道

# dags/ads_daily_pipeline.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ads-platform',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='ads_daily_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # T+1 凌晨 2 点
    catchup=True,
) as dag:

    # ========== L1: 数据接入 ==========
    pull_meta = BashOperator(
        task_id='pull_meta_ads',
        bash_command='python scripts/pull_meta.py --date {{ ds }} --source meta',
    )

    pull_google = BashOperator(
        task_id='pull_google_ads',
        bash_command='python scripts/pull_meta.py --date {{ ds }} --source google',
    )

    pull_tiktok = BashOperator(
        task_id='pull_tiktok_ads',
        bash_command='python scripts/pull_meta.py --date {{ ds }} --source tiktok',
    )

    # ========== L2: 数据质量检查 ==========
    check_data_quality = PythonOperator(
        task_id='check_data_quality',
        python_callable=lambda **ctx: check_quality(ctx['ds']),
    )

    # ========== L3: dbt 建模 ==========
    dbt_run_staging = BashOperator(
        task_id='dbt_run_staging',
        bash_command='dbt run --select tag:staging --vars \'{"date": "{{ ds }}"}\'',
    )

    dbt_run_marts = BashOperator(
        task_id='dbt_run_marts',
        bash_command='dbt run --select tag:marts --vars \'{"date": "{{ ds }}"}\'',
    )

    dbt_test = BashOperator(
        task_id='dbt_test',
        bash_command='dbt test --select tag:marts',
    )

    # ========== L4: 看板缓存刷新 ==========
    refresh_cache = PythonOperator(
        task_id='refresh_dashboard_cache',
        python_callable=refresh_redis_cache,
    )

    # ========== 依赖关系 ==========
    [pull_meta, pull_google, pull_tiktok] >> check_data_quality
    check_data_quality >> dbt_run_staging >> dbt_run_marts >> dbt_test
    dbt_test >> refresh_cache

4.2 回填 DAG

# 用于处理历史数据

from airflow.operators.python import PythonOperator

def backfill_task(target_date: str):
    """回填指定日期的数据"""
    from scripts.backfill import run_backfill
    run_backfill(target_date)

with DAG(
    dag_id='ads_backfill',
    default_args=default_args,
    schedule_interval=None,  # 手动触发
    start_date=datetime(2024, 1, 1),
) as dag:

    backfill = PythonOperator(
        task_id='backfill_data',
        python_callable=backfill_task,
        op_kwargs={'target_date': '{{ params.target_date }}'},
        params={'target_date': '2024-01-01'},
    )

五、常见问题处理

5.1 数据质量检查

def check_data_quality(target_date: str, **context):
    """检查ODS层数据质量"""
    from ads_core import ch_client

    query = f"""
    SELECT 
        source,
        count() as total_rows,
        countIf(event_type = 'impression') as impressions,
        countIf(event_type = 'click') as clicks,
        countIf(event_type = 'conversion') as conversions,
        -- 空值率
        countIf(campaign_id = '') / count() as empty_campaign_rate,
        -- 数据延迟
        max(ingested_at) as last_ingest_time
    FROM ods_ad_event_raw
    WHERE dt = '{target_date}'
    GROUP BY source
    """

    result = ch_client.query(query)

    for row in result:
        # 告警规则
        if row.empty_campaign_rate > 0.1:
            send_alert(f"数据质量问题: {row.source} 空值率 {row.empty_campaign_rate}")
        if row.total_rows < 1000:
            send_alert(f"数据量异常: {row.source}{row.total_rows} 条")

5.2 失败重试与通知

from airflow.notifications.bases import notify
from airflow.utils.email import send_email

default_args = {
    'owner': 'ads-platform',
    'email': ['data-team@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
}

def failure_callback(context):
    """任务失败时发送告警"""
    dag_id = context['dag_run'].dag_id
    task_id = context['task_instance'].task_id
    log_url = context['task_instance'].log_url

    message = f"""
    DAG: {dag_id}
    Task: {task_id}
    Execution Date: {context['execution_date']}
    Log: {log_url}
    """
    send_email(to=['data-team@company.com'], subject=f"[Airflow Alert] {dag_id} Failed", html_content=message)

六、面试高频问题

答:
- Airflow:工作流调度,适合批处理定时任务
- Kafka:消息队列,适合实时流数据
- Flink:流处理引擎,适合实时计算

项目中的分工:
- Airflow:定时调度 dbt 批处理任务(T+1 日报)
- Kafka:V1.2 引入,用于 CAPI 事件缓冲
- Flink:V1.2 引入,用于实时流计算

Airflow 不能做实时计算,它是调度器不是计算引擎

Q2: 如何保证 DAG 的稳定性?

答:
1. 合理设置重试次数和间隔
2. 添加超时控制(execution_timeout)
3. 关键节点加数据质量检查
4. 失败告警(email/Slack/PagerDuty)
5. 依赖外部任务时用 Sensor 等待

项目中:
- 每天凌晨 2 点执行 T+1 数据
- catchup=True 自动补历史
- 超过 30 分钟未完成发 P0 告警

Q3: 如何处理数据回填?

答:
1. Airflow catchup:start_date 之前的数据自动补
2. 手动触发 backfill:DAG 设为 schedule_interval=None
3. dbt run --full-refresh:完全重算
4. 增量模型:基于 ds 变量过滤处理特定日期

项目中用:
- 每日增量 + 每周全量校验
- 对账机制检测数据异常

Q4: DAG 任务间如何传递数据?

答:两种方式
1. XCom(Cross-communication):小数据量
   - task1 返回数据 → task2 读取

2. 外部存储:大数据量
   - task1 写 ClickHouse/HDFS
   - task2 从存储读取

项目中用:
- 轻量数据用 XCom(统计结果)
- 批量数据写 CH(ads_core.ch_client)

七、项目实践 Checklist

  • 理解 Airflow 在六层架构中的位置(计算层调度)
  • 能写简单的 DAG(拉取 → 建模 → 检查 → 告警)
  • 掌握任务依赖关系(>> / [] 并行)
  • 理解时间模板(ds / prev_ds / ds_nodash)
  • 能配置重试和告警
  • 理解 Airflow 不能做实时计算(它是调度器)

相关文档: - ClickHouse 深度解析 - dbt 数据建模 - Flink 实时计算 - olad_data_ap 技术架构总纲 §4.2