Airflow 调度与数据管道¶
基于 olad_data_ap 项目实践,Airflow 作为任务编排与调度引擎 来源:olad_data_ap/ARCH-02/00-技术架构总纲.md §4.2
一、Airflow 在项目中的定位¶
1.1 六层架构中的位置¶
L1 接入层 → connector-svc(定时拉取媒体数据)
L2 数据层 → ClickHouse (ODS/DWD/DWS/ADS)
L3 计算层 → Airflow ← 调度 dbt-runner + connector
L4 服务层 → metric-svc / attribution-svc / admin-svc / BFF
L5 AI 层 → ai-svc + Cube.js + 本地 LLM
L6 前端层 → Next.js
1.2 为什么选择 Airflow?¶
| 优势 | 说明 |
|---|---|
| Python 原生 | 数据团队直接用 Python 写 DAG |
| 丰富的 Operator | 支持 Python/Bash/Docker/Kubernetes/Cloud |
| 可观测性 | 内置 Web UI 查看任务状态/日志 |
| 定时调度 | Cron 表达式灵活配置 |
| 回填支持 | catchup/backfill 补历史数据 |
1.3 项目中的技术栈¶
# 项目结构
olad_data_ap/
├── airflow/
│ ├── dags/
│ │ └── example_hello_dag.py
│ └── plugins/
│ └── .gitkeep
├── infra/
│ └── docker-compose.yml # Airflow 容器
├── Makefile
│ ├── make up # 启动包括 Airflow
│ ├── make dbt-run # 触发 dbt DAG
│ └── make seed # 灌 mock 数据
二、DAG 基础概念¶
2.1 DAG 结构¶
# airflow/dags/example_hello_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# 默认参数
default_args = {
'owner': 'ads-platform',
'depends_on_past': False, # 不依赖上次运行
'wait_for_downstream': False,
'start_date': datetime(2026, 1, 1),
'retries': 3, # 重试次数
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
# 定义 DAG
with DAG(
dag_id='ads_daily_pipeline', # 唯一 ID
default_args=default_args,
description='广告数据每日处理管道',
schedule_interval='0 2 * * *', # 每天凌晨 2 点
catchup=True, # 补历史数据
tags=['ads', 'daily'],
) as dag:
# 任务 1: 拉取昨日数据
pull_meta_ads = BashOperator(
task_id='pull_meta_ads',
bash_command='python scripts/pull_meta_ads.py {{ ds }}',
)
# 任务 2: 拉取 Google Ads
pull_google_ads = BashOperator(
task_id='pull_google_ads',
bash_command='python scripts/pull_google_ads.py {{ ds }}',
)
# 任务 3: 运行 dbt 模型
run_dbt_models = BashOperator(
task_id='run_dbt_models',
bash_command='cd /olad_data_ap && dbt run --target prod',
)
# 任务 4: 数据质量检查
data_quality_check = PythonOperator(
task_id='data_quality_check',
python_callable=check_data_quality,
op_kwargs={'target_date': '{{ ds }}'},
)
# 任务依赖关系
[pull_meta_ads, pull_google_ads] >> run_dbt_models >> data_quality_check
2.2 时间相关模板¶
| 模板 | 说明 | 示例 |
|---|---|---|
{{ ds }} |
业务日期(YYYY-MM-DD) | 2026-05-18 |
{{ ds_nodash }} |
无连字符 | 20260518 |
{{ prev_ds }} |
上次执行日期 | 2026-05-17 |
{{ next_ds }} |
下次执行日期 | 2026-05-19 |
{{ execution_date }} |
执行时间 | 2026-05-18T02:00:00 |
{{ ti }} |
TaskInstance 对象 | 用于 XCom |
2.3 任务依赖关系¶
# 线性依赖
task1 >> task2 >> task3
# 并行执行
[task1, task2, task3] >> task4
# 复杂依赖
[task1, task2] >> [task3, task4] >> task5
三、常用 Operator¶
3.1 Python Operator¶
from airflow.operators.python import PythonOperator, BranchPythonOperator
def extract_transform_load(target_date: str, **context):
"""从 ClickHouse 读取数据,处理后写回"""
from ads_core import ch_client
# 获取上任务数据
ti = context['ti']
source_data = ti.xcom_pull(task_ids='extract_data')
# 处理
processed = transform_data(source_data, target_date)
# 写入
ch_client.execute(processed)
# 传递数据给下任务
return processed
extract_task = PythonOperator(
task_id='etl_process',
python_callable=extract_transform_load,
op_kwargs={'target_date': '{{ ds }}'},
)
3.2 Bash Operator¶
from airflow.operators.bash import BashOperator
run_dbt = BashOperator(
task_id='run_dbt_daily',
bash_command='''
cd /olad_data_ap/dbt &&
dbt run --select tag:daily --vars '{"date": "{{ ds }}"}' &&
dbt test --select tag:daily
''',
env={'CH_HOST': 'clickhouse'}, # 环境变量
append_env=True,
)
3.3 Sensor(等待外部条件)¶
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_sensor import TimeSensor
# 等待上游 DAG 完成
wait_upstream = ExternalTaskSensor(
task_id='wait_upstream_pipeline',
external_dag_id='upstream_dag',
external_task_id=None, # 等待整个 DAG
timeout=timedelta(hours=4),
poke_interval=300, # 每 5 分钟检查一次
)
# 等待特定时间点
wait_until_2am = TimeSensor(
task_id='wait_until_2am',
target_time=time(2, 0),
poke_interval=300,
)
四、项目中的实际 DAG¶
4.1 每日数据管道¶
# dags/ads_daily_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ads-platform',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='ads_daily_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # T+1 凌晨 2 点
catchup=True,
) as dag:
# ========== L1: 数据接入 ==========
pull_meta = BashOperator(
task_id='pull_meta_ads',
bash_command='python scripts/pull_meta.py --date {{ ds }} --source meta',
)
pull_google = BashOperator(
task_id='pull_google_ads',
bash_command='python scripts/pull_meta.py --date {{ ds }} --source google',
)
pull_tiktok = BashOperator(
task_id='pull_tiktok_ads',
bash_command='python scripts/pull_meta.py --date {{ ds }} --source tiktok',
)
# ========== L2: 数据质量检查 ==========
check_data_quality = PythonOperator(
task_id='check_data_quality',
python_callable=lambda **ctx: check_quality(ctx['ds']),
)
# ========== L3: dbt 建模 ==========
dbt_run_staging = BashOperator(
task_id='dbt_run_staging',
bash_command='dbt run --select tag:staging --vars \'{"date": "{{ ds }}"}\'',
)
dbt_run_marts = BashOperator(
task_id='dbt_run_marts',
bash_command='dbt run --select tag:marts --vars \'{"date": "{{ ds }}"}\'',
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='dbt test --select tag:marts',
)
# ========== L4: 看板缓存刷新 ==========
refresh_cache = PythonOperator(
task_id='refresh_dashboard_cache',
python_callable=refresh_redis_cache,
)
# ========== 依赖关系 ==========
[pull_meta, pull_google, pull_tiktok] >> check_data_quality
check_data_quality >> dbt_run_staging >> dbt_run_marts >> dbt_test
dbt_test >> refresh_cache
4.2 回填 DAG¶
# 用于处理历史数据
from airflow.operators.python import PythonOperator
def backfill_task(target_date: str):
"""回填指定日期的数据"""
from scripts.backfill import run_backfill
run_backfill(target_date)
with DAG(
dag_id='ads_backfill',
default_args=default_args,
schedule_interval=None, # 手动触发
start_date=datetime(2024, 1, 1),
) as dag:
backfill = PythonOperator(
task_id='backfill_data',
python_callable=backfill_task,
op_kwargs={'target_date': '{{ params.target_date }}'},
params={'target_date': '2024-01-01'},
)
五、常见问题处理¶
5.1 数据质量检查¶
def check_data_quality(target_date: str, **context):
"""检查ODS层数据质量"""
from ads_core import ch_client
query = f"""
SELECT
source,
count() as total_rows,
countIf(event_type = 'impression') as impressions,
countIf(event_type = 'click') as clicks,
countIf(event_type = 'conversion') as conversions,
-- 空值率
countIf(campaign_id = '') / count() as empty_campaign_rate,
-- 数据延迟
max(ingested_at) as last_ingest_time
FROM ods_ad_event_raw
WHERE dt = '{target_date}'
GROUP BY source
"""
result = ch_client.query(query)
for row in result:
# 告警规则
if row.empty_campaign_rate > 0.1:
send_alert(f"数据质量问题: {row.source} 空值率 {row.empty_campaign_rate}")
if row.total_rows < 1000:
send_alert(f"数据量异常: {row.source} 仅 {row.total_rows} 条")
5.2 失败重试与通知¶
from airflow.notifications.bases import notify
from airflow.utils.email import send_email
default_args = {
'owner': 'ads-platform',
'email': ['data-team@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
}
def failure_callback(context):
"""任务失败时发送告警"""
dag_id = context['dag_run'].dag_id
task_id = context['task_instance'].task_id
log_url = context['task_instance'].log_url
message = f"""
DAG: {dag_id}
Task: {task_id}
Execution Date: {context['execution_date']}
Log: {log_url}
"""
send_email(to=['data-team@company.com'], subject=f"[Airflow Alert] {dag_id} Failed", html_content=message)
六、面试高频问题¶
Q1: Airflow 和 Kafka/Flink 的区别?¶
答:
- Airflow:工作流调度,适合批处理定时任务
- Kafka:消息队列,适合实时流数据
- Flink:流处理引擎,适合实时计算
项目中的分工:
- Airflow:定时调度 dbt 批处理任务(T+1 日报)
- Kafka:V1.2 引入,用于 CAPI 事件缓冲
- Flink:V1.2 引入,用于实时流计算
Airflow 不能做实时计算,它是调度器不是计算引擎
Q2: 如何保证 DAG 的稳定性?¶
答:
1. 合理设置重试次数和间隔
2. 添加超时控制(execution_timeout)
3. 关键节点加数据质量检查
4. 失败告警(email/Slack/PagerDuty)
5. 依赖外部任务时用 Sensor 等待
项目中:
- 每天凌晨 2 点执行 T+1 数据
- catchup=True 自动补历史
- 超过 30 分钟未完成发 P0 告警
Q3: 如何处理数据回填?¶
答:
1. Airflow catchup:start_date 之前的数据自动补
2. 手动触发 backfill:DAG 设为 schedule_interval=None
3. dbt run --full-refresh:完全重算
4. 增量模型:基于 ds 变量过滤处理特定日期
项目中用:
- 每日增量 + 每周全量校验
- 对账机制检测数据异常
Q4: DAG 任务间如何传递数据?¶
答:两种方式
1. XCom(Cross-communication):小数据量
- task1 返回数据 → task2 读取
2. 外部存储:大数据量
- task1 写 ClickHouse/HDFS
- task2 从存储读取
项目中用:
- 轻量数据用 XCom(统计结果)
- 批量数据写 CH(ads_core.ch_client)
七、项目实践 Checklist¶
- 理解 Airflow 在六层架构中的位置(计算层调度)
- 能写简单的 DAG(拉取 → 建模 → 检查 → 告警)
- 掌握任务依赖关系(>> / [] 并行)
- 理解时间模板(ds / prev_ds / ds_nodash)
- 能配置重试和告警
- 理解 Airflow 不能做实时计算(它是调度器)
相关文档: - ClickHouse 深度解析 - dbt 数据建模 - Flink 实时计算 - olad_data_ap 技术架构总纲 §4.2