跳转至

DDIA Ch.11 流处理 — 复习专题

梳理自《Designing Data-Intensive Applications》第 2 版 Ch.11 流处理
定位:历史脉络 → 挑战 → 解法,先框架后下钻;配合 6h 冲刺第 4 段 使用
配对:Part 2 分布式 · Ch.10 批处理

← 返回 DDIA 框架 | 专题索引 | ↑ 总脑图 流路径 · 实时场景

组件深潜Flink 实时计算 · ClickHouse OLAP


6h 第 4 段复习清单(90min)

对应 ddia-framework § 第 4 段(3:15–4:45)。
全书 6h 里最重要的一章;节奏:80 分钟阅读 + 10 分钟休息。

时段 做什么
0–20 min § 全景串讲 + 30 秒版
20–35 min § 架构图双视角 + § MVP 伪代码
35–60 min § 核心概念下钻 + DDIA Ch.11 挑读(见 DDIA 对照
60–75 min § 与本专题映射 + Flink 文档 目录扫 10 min,标 1 个「下周深潜」小节
75–90 min 写下产出 + 勾选 checkbox

读法

  • 抓住 无界、事件时间、窗口、状态、容错;微批历史(Storm 时代)可跳
  • 读完后 必须 打开 Flink 文档对照,否则容易停在抽象层。

必须搞懂的点

概念 问自己 下文
无界数据 和日批分区本质区别? § 无界输入
事件时间 vs 处理时间 埋点乱序谁说了算? § 时间语义
Watermark 如何决定「这一窗口可以算了」? § Watermark
窗口 滚动 / 滑动 / 会话各适合啥? § 窗口类型
状态、Checkpoint 挂了怎么不丢数、不重算全历史? § 状态与容错
流表对偶 Flink SQL 和「表」啥关系? § 流表对偶

写下来(必做)

流 vs 批(3 行):
  延迟:____ vs ____
  输入:____ vs ____
  错了怎么办:____ vs ____

下周深潜 Flink 小节:________

流处理全景:历史脉络 → 挑战 → 解法(串讲)

用法:先读这一节,再 下钻。疑问记入 QA

一句话主线:日批 太慢 → 把 日志/Kafka 当持续输入 → 需要 7×24 计算引擎 → 乱序、状态、故障引出 事件时间 + Watermark + Checkpoint


0. 起点:批处理不够快

挑战 监控、风控、实时大屏要 秒~分钟级;T+1 批处理 延迟 unacceptable
直觉 数据一直在产生 → 无界(unbounded) 输入
和 Ch.10 关系 批 = 定期算 一整份有界快照;流 = 持续算不断到来的事件(见 Ch.10

1. 消息日志:流 = 可重放的 log(Kafka 时代)

年代/场景 2010s 起;埋点、CDC、服务解耦
挑战 多订阅者、数据不能丢、要 回溯
解法 持久化消息日志(Kafka):生产者 append;消费者按 offset 读
为何选它 存储引擎「只追加」 同构;解耦 + 可重放
得到 「流」有了 稳定输入边界(topic 分区)
新代价 仍需要 上游计算引擎 消费 log

光有 log 不会自己聚合 → 流处理器


Storm(早期) Flink(本专题)
模型 微批(小批量触发) 逐事件 或原生流
事件时间 原生 Watermark
状态 有限 Keyed State + Checkpoint
为何 Flink 适合 CAPI/计费演进 乱序、状态大、要 EOS

历史选择:业务要 按事件发生时间 统计(不是机器收到时间)→ 事件时间 + Watermark 成为主流方案。


3. 四大挑战 → 四大解法

挑战 现象 解法
无界 数据永远来 窗口 把无界切成有界计算单元
乱序 网络延迟、重试 → 先到的不一定早发生 事件时间 + Watermark(允许迟到策略)
有状态 计数、会话、join 要记住历史 Keyed State(按 key 分区状态)
容错 进程挂、网络断 Checkpoint 异步快照 + 重放 log

4. 时间线(复习用)

timeline
    title 流处理演化(简化)
    section 延迟
        日批不够 : T加1瓶颈
    section 输入
        Kafka日志 : 持久可重放
    section 计算
        Storm微批 : 低延迟原型
        Flink真流 : 事件时间加状态
    section 语义
        Checkpoint : Exactly_once

5. 微批历史(一笔带过)

  • Spark Streaming 等:把流 切成小 batch 跑 —— 简单,但 事件时间、迟到 支持弱。
  • DDIA 6h 可跳 细节;记住:真流 vs 微批 是选型维度即可。

6. 选型:用挑战反推

你的挑战 更倾向
T+1 报表、分区重跑够用 批处理
分钟级大盘、实时风控 流处理
既要历史回填又要实时 Lambda / Kappa(批+流双路径,对账难)
同 SQL 批流一体 Flink batch mode / 其他流批引擎(规划项)

30 秒版

无界事件流(Kafka / 埋点)
  → 赋事件时间 + 生成 Watermark
  → 按窗口 / 连续聚合(Keyed State)
  → Checkpoint 周期性快照
  → 写入 Sink(ClickHouse / Kafka / DB)

架构图(双视角)

应用视角:谁在用、数据怎么流

站在业务 / 平台用户:看见持续写入与实时查询,不是 TaskManager 线程。

flowchart LR
  subgraph producers [生产者]
    App[App 埋点]
    CAPI[CAPI 回传]
  end

  subgraph log [消息日志]
    Kafka[(Kafka Topic)]
  end

  subgraph stream [流计算]
    Flink[Flink 作业]
  end

  subgraph serving [服务存储]
    RTTable[(实时汇总表)]
    CH[(ClickHouse)]
  end

  subgraph users [消费方]
    Dash[实时大屏]
    Alert[风控告警]
    BatchFix[日批对账修正]
  end

  App --> Kafka
  CAPI --> Kafka
  Kafka --> Flink
  Flink --> RTTable
  Flink --> CH
  RTTable --> Dash
  CH --> Dash
  CH --> Alert
  CH --> BatchFix
节点 角色
Kafka 持久 log,可回溯、多订阅
Flink 7×24 计算,窗口 + 状态
实时表 低延迟 派生数据
日批对账 流结果与 Ch.10 批结果对齐

系统内部视角:一条事件在引擎里怎么走

flowchart TB
  source[Source 拉取 partition offset]
  parse[解析 赋 eventTime]
  wm[Watermark 推进]
  keyby[KeyBy 路由]
  window[窗口缓冲]
  state[Keyed State 累加]
  ckpt[Checkpoint 快照 offset+state]
  sink[Sink 写出]
  backpressure[背压 下游慢则上游降速]

  source --> parse
  parse --> wm
  wm --> keyby
  keyby --> window
  window --> state
  state --> sink
  sink -.-> backpressure
  backpressure -.-> source
  state --> ckpt
  source --> ckpt
阶段 解决的历史问题
offset 消费进度可恢复
eventTime + Watermark 乱序下仍按业务时间出结果
KeyBy + State 无界流上做有状态聚合
Window 把无界切成 有界计算单元
Checkpoint 进程挂了就 接着算,不必从头
背压 下游写不动时 别 OOM

MVP 示例代码(Java):SimpleStreamPipeline

教学用伪代码:内存 消息队列 模拟 Kafka;滚动窗口 + 事件时间 Watermarkcheckpoint offset
对应 系统内部视角

// SimpleStreamPipeline.java (pseudocode)
//
// 场景:按 campaign_id 统计 1 分钟事件时间窗口内的 PV
// Watermark:当前最大 eventTime - 允许乱序 5s → 关闭窗口并输出

import java.util.*;

class SimpleStreamPipeline {

  private final MessageLog log = new MessageLog();
  private final Map<String, Long> keyedState = new HashMap<>(); // 简化:全局 state
  private final Map<WindowKey, Long> windowCounts = new HashMap<>();
  private long committedOffset = 0;
  private long maxEventTime = Long.MIN_VALUE;
  private static final long ALLOWED_LATENESS_MS = 5_000;
  private static final long WINDOW_SIZE_MS = 60_000;

  // ---------- 生产端(模拟埋点写入 Kafka)----------
  void publish(Event e) {
    log.append(e);
  }

  // ---------- 主循环(模拟 Flink runtime 持续消费)----------
  void runOnce() {
    List<LoggedRecord> batch = log.pollAfter(committedOffset);
    for (LoggedRecord rec : batch) {
      processRecord(rec);
      committedOffset = rec.offset;
    }
    advanceWatermarksAndFireWindows();
  }

  private void processRecord(LoggedRecord rec) {
    Event e = rec.event;
    maxEventTime = Math.max(maxEventTime, e.eventTimeMs);

    // KeyBy(campaignId) + 落入某个 event-time 窗口
    long windowStart = (e.eventTimeMs / WINDOW_SIZE_MS) * WINDOW_SIZE_MS;
    WindowKey wk = new WindowKey(e.campaignId, windowStart);
    windowCounts.merge(wk, 1L, Long::sum);
  }

  private void advanceWatermarksAndFireWindows() {
  long watermark = maxEventTime - ALLOWED_LATENESS_MS;

    // 窗口 end <= watermark → 认为到齐,输出并删除
    List<WindowKey> toFire = new ArrayList<>();
    for (WindowKey wk : windowCounts.keySet()) {
      long windowEnd = wk.windowStartMs + WINDOW_SIZE_MS;
      if (windowEnd <= watermark) {
        toFire.add(wk);
      }
    }
    for (WindowKey wk : toFire) {
      long count = windowCounts.remove(wk);
      sinkWrite(wk, count); // 写出派生结果
    }
  }

  // Checkpoint:快照 offset + 未触发窗口(生产还有 keyed state backend)
  Checkpoint snapshot() {
    return new Checkpoint(committedOffset, new HashMap<>(windowCounts), maxEventTime);
  }

  void restore(Checkpoint cp) {
    committedOffset = cp.offset;
    windowCounts.clear();
    windowCounts.putAll(cp.pendingWindows);
    maxEventTime = cp.maxEventTime;
  }

  private void sinkWrite(WindowKey wk, long count) {
    System.out.printf("SINK campaign=%s windowStart=%d pv=%d%n",
        wk.campaignId, wk.windowStartMs, count);
  }

  // ---------- 模拟 Kafka log ----------
  static class MessageLog {
    private final List<LoggedRecord> records = new ArrayList<>();
    private long nextOffset = 1;

    void append(Event e) {
      records.add(new LoggedRecord(nextOffset++, e));
    }

    List<LoggedRecord> pollAfter(long offset) {
      List<LoggedRecord> out = new ArrayList<>();
      for (LoggedRecord r : records) {
        if (r.offset > offset) out.add(r);
      }
      return out;
    }
  }

  static class LoggedRecord {
    long offset;
    Event event;
    LoggedRecord(long o, Event e) { offset = o; event = e; }
  }

  static class Event {
    String campaignId;
    long eventTimeMs; // 事件时间
    Event(String c, long t) { campaignId = c; eventTimeMs = t; }
  }

  static class WindowKey {
    String campaignId;
    long windowStartMs;
    WindowKey(String c, long w) { campaignId = c; windowStartMs = w; }
    @Override public boolean equals(Object o) {
      if (!(o instanceof WindowKey)) return false;
      WindowKey w = (WindowKey) o;
      return campaignId.equals(w.campaignId) && windowStartMs == w.windowStartMs;
    }
    @Override public int hashCode() { return Objects.hash(campaignId, windowStartMs); }
  }

  static class Checkpoint {
    long offset;
    Map<WindowKey, Long> pendingWindows;
    long maxEventTime;
    Checkpoint(long o, Map<WindowKey, Long> w, long m) {
      offset = o; pendingWindows = w; maxEventTime = m;
    }
  }

  // ---------- 演示:乱序事件 + checkpoint 恢复 ----------
  public static void main(String[] args) {
    SimpleStreamPipeline job = new SimpleStreamPipeline();

    job.publish(new Event("c1", 10_000));   // 0:10
    job.publish(new Event("c1", 65_000));   // 1:05 下一窗口
    job.publish(new Event("c1", 8_000));    // 乱序:仍属 0:00 窗口
    job.runOnce();

    Checkpoint cp = job.snapshot();

    job.publish(new Event("c2", 70_000));
    job.runOnce();

    // 模拟宕机恢复
    SimpleStreamPipeline job2 = new SimpleStreamPipeline();
    job2.restore(cp);
    job2.runOnce();
  }
}

MVP 与两张架构图的对应

架构图节点 代码里
Source / offset pollAfter(committedOffset)
eventTime Event.eventTimeMs
Watermark maxEventTime - ALLOWED_LATENESS
KeyBy + Window WindowKey(campaignId, windowStart)
State windowCounts
触发输出 windowEnd <= watermarksinkWrite
Checkpoint snapshot / restore

故意没做:分布式 checkpoint barrier、KeyedStateBackend、精确一次 sink 事务、侧输出迟到流。

Ch.10 批处理 MVP 对比

批 MVP 流 MVP
输入边界 dt 分区 有界 无界 MessageLog
算完触发 全分区算完 一次提交 Watermark 关窗
容错 rerun 删分区重算 Checkpoint 恢复 offset+窗口

核心概念下钻

无界输入与消息日志

输入边界 分区 完结 永不完结(用窗口人为截断)
存储隐喻 分区文件 log + offset
重复消费 重跑分区 至少一次 + 幂等 sink

事件时间 vs 处理时间

时间类型 定义 风险
事件时间(event time) 业务发生时刻(埋点字段) 实现复杂,要处理乱序
处理时间(processing time) 算子机器时钟 简单; 重跑结果变、不准确

历史为何选事件时间:广告计费、转化归因 必须以用户行为时间为准,不能用消费延迟。

详见 Flink § 时间语义


Watermark 与窗口

Watermark = 「事件时间 ≤ T 的数据大致到齐」的信号 → 触发窗口 计算并输出

事件:  --e1--e3--e2(late)--e4-->
                ↑
         Watermark 推进关闭窗口

允许 迟到数据:侧输出(side output)或 允许延迟 N(trade-off 准确度 vs 延迟)。


窗口类型

类型 含义 典型场景
滚动(tumbling) 固定长度、不重叠 每分钟 PV
滑动(sliding) 固定长度、有步长 最近 5 分钟每 1 分钟更新
会话(session) 间隙超时切分 用户一次访问会话

窗口 = 在 无界流 上制造 有界计算单元(联系 Ch.10 的「有界」)。


状态与容错

概念 作用
Keyed State 每个 key(如 user_id)独立状态;scale-out 时随 key 走
Checkpoint 异步打快照;失败从上次 checkpoint 恢复 + 重放 Kafka
Exactly-once(EOS) 端到端语义:源可重放 + 算子快照 + 幂等 sink

和批处理对比:批靠 重跑分区;流靠 checkpoint + 偏移量(见 批 vs 流)。

状态落盘常用 RocksDB(LSM)存储引擎 LSM


流表对偶(一句话)

  • = 不断变化的表(changelog); = 流在某时刻的 snapshot。
  • Flink SQL / 流批一体语法都建立在这层抽象上; 不必先写 DataStream API

背压(一句)

下游慢 → 上游 自动降速,避免 OOM;Flink 内置。批处理更多靠 队列积压告警 + 限流


与本专题映射

架构演进见 Flink § 定位(V1.0 批 → V1.2 流):

V1.0:connector → Airflow → dbt → ClickHouse(T+1)
V1.2:ingest-capi / Kafka → Flink → ClickHouse(分钟级)
Flink 概念 DDIA / 历史问题 项目对应
Kafka Source 持久 log 埋点、CAPI 事件
Event Time + Watermark 乱序 转化窗口、归因
Window 无界→有界 滚动指标
Keyed State 有状态聚合 按 campaign/user 累计
Checkpoint 容错 作业升级不丢状态
Sink 派生数据写出 ClickHouse 实时表

计费边界(待沉淀):哪些指标 必须日批对账,哪些可 流式近似 + 批修正


批 vs 流(完整对照)

简表见 Ch.10

维度 批处理 流处理
延迟 分钟~天 秒~分钟
输入 有界分区 无界事件流
正确性手段 重跑分区、幂等写 Checkpoint、EOS、幂等 sink
状态 每 job 冷启动或中间表 长期 Keyed State
乱序 批前排序 / 分区已对齐 Watermark + 迟到策略
成本 集中算、可错峰 7×24 算力、状态存储
适用 报表、对账、历史回填 大屏、风控、实时特征

同一份指标两套实现时:必须设计 对账(Ch.10 批为权威 or 流为近似)。


Flink DDIA Ch.11 思想
DataStream 无界数据
assignTimestampsAndWatermarks 事件时间
Watermark 进度与乱序
window 有界计算
KeyedStream / State 有状态算子
checkpoint 容错与恢复
Table API / SQL 流表对偶

深潜 → flink-streaming.md


DDIA Ch.11 书内读法对照

建议读 可跳或扫读
传递事件流、消息代理 某单一 MQ 安装细节
分区与消费组
时间与时钟(事件/处理)
窗口
流 join、连接 先懂概念,细节后补
容错、精确一次
批与流对比

读完本节,你应该能口述

  1. 流处理解决的历史问题:批太慢 + 数据持续到达
  2. Kafka log → 流引擎 → Sink 三段各干什么。
  3. 事件时间、Watermark、窗口、Checkpoint 分别对付 乱序、何时出结果、无界、故障
  4. 你们 V1.0 批 vs V1.2 流 边界;Flink 文档下一步读哪节。

下潜顺序:本节 → Part 2 分布式(EOS/对账)→ flink-streamingCh.10 批处理(对账视角)。


QA 疑问解答

格式:❌ 我的理解 + ✅ 纠正。关联正文锚点。

条目模板

### Qn:标题(日期 可选)

**关联**:[正文锚点](#...)

❌ **我的理解(错 / 不完整)**
- …

✅ **纠正**
- 

(暂无条目 — 有疑问随学随记。)


待沉淀

  • 流批指标对账方案
  • 实时计费 vs 日批计费边界文档
  • Lakehouse / 流批一体是否引入(链 ddia-framework §3.3