跳转至

Flink 实时计算深度解析

基于 olad_data_ap 项目规划,V1.2 引入 Flink 流处理
来源:olad_data_ap/ARCH-02/00-技术架构总纲.md §15
先修(EOS / 对账)DDIA Part 2 分布式 · § 批流分布式三角


1.1 架构演进

V1.0 MVP(当前):
┌─────────────┐
│ connector  │  ← 定时拉取(T+1 批处理)
│   -svc     │
└─────────────┘
       ↓
┌─────────────┐
│  Airflow   │  ← DAG 调度(T+1 批处理)
│  dbt-runner│
└─────────────┘
       ↓
┌─────────────────────────────────────┐
│           ClickHouse                │
│  ODS → DWD → DWS → ADS             │
└─────────────────────────────────────┘

V1.2(规划):
┌─────────────────────────────────────┐
│  connector-svc(批)  │  ingest-capi(流)│
└─────────────────────────────────────┘
                         ↓
┌─────────────────────────────────────┐
│    Flink 流处理引擎                  │
│  - 实时事件处理                     │
│  - CAPI 事件接收                    │
│  - 窗口聚合                         │
└─────────────────────────────────────┘
       ↓
┌─────────────────────────────────────┐
│           ClickHouse                │
│  ODS → DWD → DWS → ADS             │
└─────────────────────────────────────┘
特性 Flink Spark Streaming 项目选择
延迟 毫秒级真流 微批(最快 0.5s) Flink
事件时间 原生支持 需额外配置 Flink
状态管理 强大,支持故障恢复 受限 Flink
迟到数据 Watermark 处理 不支持 Flink
背压 自动平衡 批处理机制 Flink

二、核心概念

2.1 时间语义(Time Semantics)

// 两种时间语义

// Processing Time:机器本地时间(简单但不准确)
DataStream<Event> processed = stream
    .assignTimestamps(ProcessingTime())  // 不推荐

// Event Time:事件本身携带的时间戳(准确但复杂)
DataStream<Event> events = stream
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
            .withTimestampAssigner((event, ts) -> event.getEventTime())
    );

面试追问:什么时候用 Processing Time?

答:只有在对准确性要求不高的场景用
- 日志聚合(允许分钟级误差)
- 监控指标(相对顺序即可)

Event Time 适用:
- 广告归因(用户行为顺序很重要)
- 金融交易(严格按交易时间计算)
- 实时风控(延迟容忍度低)

2.2 Watermark(水位线)

// Watermark = 事件时间的进度条
// 告诉 Flink:比这个时间更早的数据不会再来

// 乱序容忍策略(最多等 10 秒)
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());

// 持续时间策略(最大等待 5 分钟)
WatermarkStrategy<Event> strategy2 = WatermarkStrategy
    .<Event>forMonotonousTimestamps()
    .withIdleness(Duration.ofMinutes(5));

Watermark 触发窗口计算的条件

watermark >= window_end_time

2.3 窗口(Window)

// 滚动窗口(Tumbling Window)
// 每 1 小时一个窗口,不重叠
stream
    .keyBy(event -> event.getAccountId())
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .aggregate(new SpendAggregator());

// 滑动窗口(Sliding Window)
// 窗口 1 小时,滑动 5 分钟
stream
    .keyBy(event -> event.getAccountId())
    .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
    .aggregate(new SpendAggregator());

// 会话窗口(Session Window)
// 活动间隔超过 10 分钟开启新窗口
stream
    .keyBy(event -> event.getAccountId())
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .aggregate(new SpendAggregator());

三、项目中的应用场景

3.1 CAPI 事件接收(实时归因)

// ingest-capi 接收 Meta CAPI 事件后交给 Flink 处理

// 事件输入
DataStream<CapiEvent> capiStream = env
    .addSource(new CapiSource())  // 从 Kafka 读取
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<CapiEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
            .withTimestampAssigner((e, ts) -> e.getEventTime())
    );

// 实时窗口聚合(每 5 分钟一个桶)
DataStream<AdMetrics> metricsStream = capiStream
    .keyBy(e -> compositeKey(e))  // tenant + account + campaign
    .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
    .aggregate(new MetricsAggregator());

// 写入 ClickHouse
metricsStream.addSink(new ClickHouseSink());

3.2 实时 ROAS 计算

// 实时计算广告 ROAS(投入产出比)
// 用于看板实时监控

class RoasCalculator extends AggregateFunction<Event, RoasState, Double> {

    @Override
    public RoasState createAccumulator() {
        return new RoasState(0, 0);  // spend, revenue
    }

    @Override
    public RoasState add(Event event, RoasState acc) {
        if (event.isSpend()) {
            acc.spend += event.getAmount();
        } else {
            acc.revenue += event.getAmount();
        }
        return acc;
    }

    @Override
    public Double getResult(RoasState acc) {
        return acc.spend > 0 ? acc.revenue / acc.spend : 0.0;
    }
}

3.3 异常检测

// 检测异常花费(超过阈值告警)
DataStream<Alert> alerts = metricsStream
    .keyBy(m -> m.getTenantId())
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .process(new AnomalyDetectorFunction());

// 异常检测逻辑
class AnomalyDetectorFunction extends ProcessWindowFunction<AdMetrics, Alert, String, TimeWindow> {

    @Override
    public void process(Context ctx, Iterable<AdMetrics> metrics, Collector<Alert> out) {
        AdMetrics current = metrics.iterator().next();
        AdMetrics history = getHistoricalAvg(current.getTenantId());

        double threshold = 2.0;  // 超过历史均值 2 倍
        if (current.getSpend() > history.getSpend() * threshold) {
            out.collect(Alert.builder()
                .tenantId(current.getTenantId())
                .type(AlertType.SPEND_SPIKE)
                .message("Spend异常: " + current.getSpend() + " vs 平均 " + history.getSpend())
                .build());
        }
    }
}

四、状态管理与 Checkpoint

4.1 状态类型

// Keyed State(按键分区状态)
// 每个 Key 维护独立状态

DataStream<AdEvent> stream = ...
stream.keyBy(e -> e.getAccountId())
    .map(new StatefulMapper());

class StatefulMapper extends RichMapFunction<AdEvent, AdMetrics> {

    // Value State:单个值
    private ValueState<SpendState> spendState;

    // List State:列表
    private ListState<AdEvent> recentEvents;

    // Map State:键值对
    private MapState<String, Integer> counterMap;

    @Override
    public void open(Configuration config) {
        spendState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("spend", SpendState.class));

        recentEvents = getRuntimeContext().getListState(
            new ListStateDescriptor<>("recent", AdEvent.class));
    }

    @Override
    public AdMetrics map(AdEvent event) throws Exception {
        SpendState current = spendState.value();
        if (current == null) {
            current = new SpendState();
        }
        current.spend += event.getSpend();
        spendState.update(current);

        return buildMetrics(event);
    }
}

4.2 Checkpoint 机制

// 启用 Checkpoint(Exactly-Once)
env.enableCheckpointing(60_000);  // 每 60 秒检查一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setCheckpointTimeout(10 * 60_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 状态后端
env.setStateBackend(new EmbeddedHashMapStateBackend());  // 本地测试
// 生产环境用 RocksDBStateBackend

4.3 Exactly-Once 端到端

理论背景:幂等、协调、分区对齐 → Part 2 § 批流分布式三角 · § Ch.9

// 两阶段提交(Two-Phase Commit)
// Source → Flink → Sink 必须配套支持 TPC

// 1. Source(读取外部系统)
KafkaSource<String> source = KafkaSource.<String>builder()
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    .setProperty("isolation.level", "read_committed")
    .build();

// 2. Flink 计算
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka");

// 3. Sink(写入外部系统)
KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setRecordSerializer(new SimpleStringSerializer())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("ads-flink")
    .build();

五、面试高频问题

答:
| 维度 | Flink | Spark Streaming |
|------|-------|----------------|
| 处理模型 | 真流(每条处理)| 微批(批量处理)|
| 延迟 | 毫秒级 | 500ms+ |
| 事件时间 | 原生支持 | 需额外配置 watermark |
| 迟到数据 | Watermark 处理 | 不支持 |
| 状态管理 | RocksDB 强状态 | 受限 |
| 背压 | 自动流控 | 批次级背压 |
| API | DataStream API | DStream |

项目选择 Flink:广告场景需要低延迟 + 事件时间 + 状态管理

Q2: Checkpoint 和 Savepoint 区别?

答:
- Checkpoint:自动创建,用于故障恢复,Job 重启后自动删除
- Savepoint:手动创建,用于计划性停止/恢复,不会自动删除

使用场景:
- Checkpoint:日常故障恢复
- Savepoint:版本升级、集群迁移、紧急暂停

命令:
- 创建 Savepoint:flink savepoint -jobId <id> <target_path>
- 从 Savepoint 恢复:flink run -s <savepoint_path>

Q3: Watermark 是什么?解决什么问题?

答:
Watermark = 时间戳进度条,告诉 Flink 事件时间的进度

解决问题:
1. 乱序数据:晚到的事件不会无限等待
2. 触发窗口:水印达到窗口结束时间才触发计算
3. 进度感知:知道上游数据延迟多久

设置策略:
- 有界乱序(forBoundedOutOfOrderness):最多等 N 秒
- 持续时间(forMonotonousTimestamps):严格递增

trade-off:
- Watermark 太大:延迟增加,但数据完整性好
- Watermark 太小:延迟小,但可能丢失数据

Q4: 背压(Back Pressure)怎么产生和解决?

答:
产生原因:
- 下游处理慢(计算量大/写入阻塞)
- Flink 缓冲区满,反压上游

解决思路:
1. 增加并行度(parallelism)
2. 优化算子性能(减少计算/增加缓存)
3. 调整缓冲区大小
4. 启用背压监控(UI)

Flink 自动背压:不同于 Spark 批处理,Flink 通过流控自动平衡

Q5: 状态过期和清理

// 状态 TTL(Time To Live)
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)  // 读写时更新
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)  // 不返回过期状态
    .cleanupFullSnapshot()  // 全量快照时清理
    .build();

ValueStateDescriptor<SpendState> descriptor = new ValueStateDescriptor<>(
    "spend", SpendState.class);
descriptor.enableTimeToLive(ttlConfig);

六、项目实践 Checklist

  • 理解 Flink 在 V1.2 架构中的位置(实时流处理)
  • 掌握时间语义(Event Time vs Processing Time)
  • 理解 Watermark 机制和乱序处理
  • 掌握窗口类型(Tumbling/Sliding/Session)
  • 理解 Checkpoint 和 Exactly-Once
  • 能解释两阶段提交(Source → Flink → Sink)

相关文档: - ClickHouse 深度解析 - dbt 数据建模 - Airflow 调度 - olad_data_ap 技术架构总纲 §15