Flink 实时计算深度解析¶
基于 olad_data_ap 项目规划,V1.2 引入 Flink 流处理
来源:olad_data_ap/ARCH-02/00-技术架构总纲.md §15
先修(EOS / 对账):DDIA Part 2 分布式 · § 批流分布式三角
一、Flink 在项目中的定位¶
1.1 架构演进¶
V1.0 MVP(当前):
┌─────────────┐
│ connector │ ← 定时拉取(T+1 批处理)
│ -svc │
└─────────────┘
↓
┌─────────────┐
│ Airflow │ ← DAG 调度(T+1 批处理)
│ dbt-runner│
└─────────────┘
↓
┌─────────────────────────────────────┐
│ ClickHouse │
│ ODS → DWD → DWS → ADS │
└─────────────────────────────────────┘
V1.2(规划):
┌─────────────────────────────────────┐
│ connector-svc(批) │ ingest-capi(流)│
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Flink 流处理引擎 │
│ - 实时事件处理 │
│ - CAPI 事件接收 │
│ - 窗口聚合 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ ClickHouse │
│ ODS → DWD → DWS → ADS │
└─────────────────────────────────────┘
1.2 为什么选择 Flink?¶
| 特性 | Flink | Spark Streaming | 项目选择 |
|---|---|---|---|
| 延迟 | 毫秒级真流 | 微批(最快 0.5s) | Flink |
| 事件时间 | 原生支持 | 需额外配置 | Flink |
| 状态管理 | 强大,支持故障恢复 | 受限 | Flink |
| 迟到数据 | Watermark 处理 | 不支持 | Flink |
| 背压 | 自动平衡 | 批处理机制 | Flink |
二、核心概念¶
2.1 时间语义(Time Semantics)¶
// 两种时间语义
// Processing Time:机器本地时间(简单但不准确)
DataStream<Event> processed = stream
.assignTimestamps(ProcessingTime()) // 不推荐
// Event Time:事件本身携带的时间戳(准确但复杂)
DataStream<Event> events = stream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.getEventTime())
);
面试追问:什么时候用 Processing Time?
答:只有在对准确性要求不高的场景用
- 日志聚合(允许分钟级误差)
- 监控指标(相对顺序即可)
Event Time 适用:
- 广告归因(用户行为顺序很重要)
- 金融交易(严格按交易时间计算)
- 实时风控(延迟容忍度低)
2.2 Watermark(水位线)¶
// Watermark = 事件时间的进度条
// 告诉 Flink:比这个时间更早的数据不会再来
// 乱序容忍策略(最多等 10 秒)
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.getEventTime());
// 持续时间策略(最大等待 5 分钟)
WatermarkStrategy<Event> strategy2 = WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withIdleness(Duration.ofMinutes(5));
Watermark 触发窗口计算的条件:
watermark >= window_end_time
2.3 窗口(Window)¶
// 滚动窗口(Tumbling Window)
// 每 1 小时一个窗口,不重叠
stream
.keyBy(event -> event.getAccountId())
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new SpendAggregator());
// 滑动窗口(Sliding Window)
// 窗口 1 小时,滑动 5 分钟
stream
.keyBy(event -> event.getAccountId())
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new SpendAggregator());
// 会话窗口(Session Window)
// 活动间隔超过 10 分钟开启新窗口
stream
.keyBy(event -> event.getAccountId())
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.aggregate(new SpendAggregator());
三、项目中的应用场景¶
3.1 CAPI 事件接收(实时归因)¶
// ingest-capi 接收 Meta CAPI 事件后交给 Flink 处理
// 事件输入
DataStream<CapiEvent> capiStream = env
.addSource(new CapiSource()) // 从 Kafka 读取
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<CapiEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((e, ts) -> e.getEventTime())
);
// 实时窗口聚合(每 5 分钟一个桶)
DataStream<AdMetrics> metricsStream = capiStream
.keyBy(e -> compositeKey(e)) // tenant + account + campaign
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.aggregate(new MetricsAggregator());
// 写入 ClickHouse
metricsStream.addSink(new ClickHouseSink());
3.2 实时 ROAS 计算¶
// 实时计算广告 ROAS(投入产出比)
// 用于看板实时监控
class RoasCalculator extends AggregateFunction<Event, RoasState, Double> {
@Override
public RoasState createAccumulator() {
return new RoasState(0, 0); // spend, revenue
}
@Override
public RoasState add(Event event, RoasState acc) {
if (event.isSpend()) {
acc.spend += event.getAmount();
} else {
acc.revenue += event.getAmount();
}
return acc;
}
@Override
public Double getResult(RoasState acc) {
return acc.spend > 0 ? acc.revenue / acc.spend : 0.0;
}
}
3.3 异常检测¶
// 检测异常花费(超过阈值告警)
DataStream<Alert> alerts = metricsStream
.keyBy(m -> m.getTenantId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new AnomalyDetectorFunction());
// 异常检测逻辑
class AnomalyDetectorFunction extends ProcessWindowFunction<AdMetrics, Alert, String, TimeWindow> {
@Override
public void process(Context ctx, Iterable<AdMetrics> metrics, Collector<Alert> out) {
AdMetrics current = metrics.iterator().next();
AdMetrics history = getHistoricalAvg(current.getTenantId());
double threshold = 2.0; // 超过历史均值 2 倍
if (current.getSpend() > history.getSpend() * threshold) {
out.collect(Alert.builder()
.tenantId(current.getTenantId())
.type(AlertType.SPEND_SPIKE)
.message("Spend异常: " + current.getSpend() + " vs 平均 " + history.getSpend())
.build());
}
}
}
四、状态管理与 Checkpoint¶
4.1 状态类型¶
// Keyed State(按键分区状态)
// 每个 Key 维护独立状态
DataStream<AdEvent> stream = ...
stream.keyBy(e -> e.getAccountId())
.map(new StatefulMapper());
class StatefulMapper extends RichMapFunction<AdEvent, AdMetrics> {
// Value State:单个值
private ValueState<SpendState> spendState;
// List State:列表
private ListState<AdEvent> recentEvents;
// Map State:键值对
private MapState<String, Integer> counterMap;
@Override
public void open(Configuration config) {
spendState = getRuntimeContext().getState(
new ValueStateDescriptor<>("spend", SpendState.class));
recentEvents = getRuntimeContext().getListState(
new ListStateDescriptor<>("recent", AdEvent.class));
}
@Override
public AdMetrics map(AdEvent event) throws Exception {
SpendState current = spendState.value();
if (current == null) {
current = new SpendState();
}
current.spend += event.getSpend();
spendState.update(current);
return buildMetrics(event);
}
}
4.2 Checkpoint 机制¶
// 启用 Checkpoint(Exactly-Once)
env.enableCheckpointing(60_000); // 每 60 秒检查一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setCheckpointTimeout(10 * 60_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 状态后端
env.setStateBackend(new EmbeddedHashMapStateBackend()); // 本地测试
// 生产环境用 RocksDBStateBackend
4.3 Exactly-Once 端到端¶
理论背景:幂等、协调、分区对齐 → Part 2 § 批流分布式三角 · § Ch.9
// 两阶段提交(Two-Phase Commit)
// Source → Flink → Sink 必须配套支持 TPC
// 1. Source(读取外部系统)
KafkaSource<String> source = KafkaSource.<String>builder()
.setStartingOffsets(OffsetsInitializer.committedOffsets())
.setProperty("isolation.level", "read_committed")
.build();
// 2. Flink 计算
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka");
// 3. Sink(写入外部系统)
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(new SimpleStringSerializer())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("ads-flink")
.build();
五、面试高频问题¶
Q1: Flink 和 Spark Streaming 区别?¶
答:
| 维度 | Flink | Spark Streaming |
|------|-------|----------------|
| 处理模型 | 真流(每条处理)| 微批(批量处理)|
| 延迟 | 毫秒级 | 500ms+ |
| 事件时间 | 原生支持 | 需额外配置 watermark |
| 迟到数据 | Watermark 处理 | 不支持 |
| 状态管理 | RocksDB 强状态 | 受限 |
| 背压 | 自动流控 | 批次级背压 |
| API | DataStream API | DStream |
项目选择 Flink:广告场景需要低延迟 + 事件时间 + 状态管理
Q2: Checkpoint 和 Savepoint 区别?¶
答:
- Checkpoint:自动创建,用于故障恢复,Job 重启后自动删除
- Savepoint:手动创建,用于计划性停止/恢复,不会自动删除
使用场景:
- Checkpoint:日常故障恢复
- Savepoint:版本升级、集群迁移、紧急暂停
命令:
- 创建 Savepoint:flink savepoint -jobId <id> <target_path>
- 从 Savepoint 恢复:flink run -s <savepoint_path>
Q3: Watermark 是什么?解决什么问题?¶
答:
Watermark = 时间戳进度条,告诉 Flink 事件时间的进度
解决问题:
1. 乱序数据:晚到的事件不会无限等待
2. 触发窗口:水印达到窗口结束时间才触发计算
3. 进度感知:知道上游数据延迟多久
设置策略:
- 有界乱序(forBoundedOutOfOrderness):最多等 N 秒
- 持续时间(forMonotonousTimestamps):严格递增
trade-off:
- Watermark 太大:延迟增加,但数据完整性好
- Watermark 太小:延迟小,但可能丢失数据
Q4: 背压(Back Pressure)怎么产生和解决?¶
答:
产生原因:
- 下游处理慢(计算量大/写入阻塞)
- Flink 缓冲区满,反压上游
解决思路:
1. 增加并行度(parallelism)
2. 优化算子性能(减少计算/增加缓存)
3. 调整缓冲区大小
4. 启用背压监控(UI)
Flink 自动背压:不同于 Spark 批处理,Flink 通过流控自动平衡
Q5: 状态过期和清理¶
// 状态 TTL(Time To Live)
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 读写时更新
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期状态
.cleanupFullSnapshot() // 全量快照时清理
.build();
ValueStateDescriptor<SpendState> descriptor = new ValueStateDescriptor<>(
"spend", SpendState.class);
descriptor.enableTimeToLive(ttlConfig);
六、项目实践 Checklist¶
- 理解 Flink 在 V1.2 架构中的位置(实时流处理)
- 掌握时间语义(Event Time vs Processing Time)
- 理解 Watermark 机制和乱序处理
- 掌握窗口类型(Tumbling/Sliding/Session)
- 理解 Checkpoint 和 Exactly-Once
- 能解释两阶段提交(Source → Flink → Sink)
相关文档: - ClickHouse 深度解析 - dbt 数据建模 - Airflow 调度 - olad_data_ap 技术架构总纲 §15