DDIA Ch.11 流处理 — 复习专题¶
梳理自《Designing Data-Intensive Applications》第 2 版 Ch.11 流处理
定位:历史脉络 → 挑战 → 解法,先框架后下钻;配合 6h 冲刺第 4 段 使用
配对:Part 2 分布式 · Ch.10 批处理
← 返回 DDIA 框架 | 专题索引 | ↑ 总脑图 流路径 · 实时场景
组件深潜:Flink 实时计算 · ClickHouse OLAP
6h 第 4 段复习清单(90min)¶
对应 ddia-framework § 第 4 段(3:15–4:45)。
全书 6h 里最重要的一章;节奏:80 分钟阅读 + 10 分钟休息。
| 时段 | 做什么 |
|---|---|
| 0–20 min | § 全景串讲 + 30 秒版 |
| 20–35 min | § 架构图双视角 + § MVP 伪代码 |
| 35–60 min | § 核心概念下钻 + DDIA Ch.11 挑读(见 DDIA 对照) |
| 60–75 min | § 与本专题映射 + Flink 文档 目录扫 10 min,标 1 个「下周深潜」小节 |
| 75–90 min | 写下产出 + 勾选 checkbox |
读法
- 抓住 无界、事件时间、窗口、状态、容错;微批历史(Storm 时代)可跳。
- 读完后 必须 打开 Flink 文档对照,否则容易停在抽象层。
必须搞懂的点
| 概念 | 问自己 | 下文 |
|---|---|---|
| 无界数据 | 和日批分区本质区别? | § 无界输入 |
| 事件时间 vs 处理时间 | 埋点乱序谁说了算? | § 时间语义 |
| Watermark | 如何决定「这一窗口可以算了」? | § Watermark |
| 窗口 | 滚动 / 滑动 / 会话各适合啥? | § 窗口类型 |
| 状态、Checkpoint | 挂了怎么不丢数、不重算全历史? | § 状态与容错 |
| 流表对偶 | Flink SQL 和「表」啥关系? | § 流表对偶 |
写下来(必做)
流 vs 批(3 行):
延迟:____ vs ____
输入:____ vs ____
错了怎么办:____ vs ____
下周深潜 Flink 小节:________
流处理全景:历史脉络 → 挑战 → 解法(串讲)¶
一句话主线:日批 太慢 → 把 日志/Kafka 当持续输入 → 需要 7×24 计算引擎 → 乱序、状态、故障引出 事件时间 + Watermark + Checkpoint。
0. 起点:批处理不够快¶
| 挑战 | 监控、风控、实时大屏要 秒~分钟级;T+1 批处理 延迟 unacceptable |
| 直觉 | 数据一直在产生 → 无界(unbounded) 输入 |
| 和 Ch.10 关系 | 批 = 定期算 一整份有界快照;流 = 持续算不断到来的事件(见 Ch.10) |
1. 消息日志:流 = 可重放的 log(Kafka 时代)¶
| 年代/场景 | 2010s 起;埋点、CDC、服务解耦 |
| 挑战 | 多订阅者、数据不能丢、要 回溯 |
| 解法 | 持久化消息日志(Kafka):生产者 append;消费者按 offset 读 |
| 为何选它 | 和 存储引擎「只追加」 同构;解耦 + 可重放 |
| 得到 | 「流」有了 稳定输入边界(topic 分区) |
| 新代价 | 仍需要 上游计算引擎 消费 log |
光有 log 不会自己聚合 → 流处理器。
2. 流处理器:Storm 微批 vs Flink 真流¶
| Storm(早期) | Flink(本专题) | |
|---|---|---|
| 模型 | 常 微批(小批量触发) | 逐事件 或原生流 |
| 事件时间 | 弱 | 原生 Watermark |
| 状态 | 有限 | Keyed State + Checkpoint |
| 为何 Flink 适合 CAPI/计费演进 | — | 乱序、状态大、要 EOS |
历史选择:业务要 按事件发生时间 统计(不是机器收到时间)→ 事件时间 + Watermark 成为主流方案。
3. 四大挑战 → 四大解法¶
| 挑战 | 现象 | 解法 |
|---|---|---|
| 无界 | 数据永远来 | 用 窗口 把无界切成有界计算单元 |
| 乱序 | 网络延迟、重试 → 先到的不一定早发生 | 事件时间 + Watermark(允许迟到策略) |
| 有状态 | 计数、会话、join 要记住历史 | Keyed State(按 key 分区状态) |
| 容错 | 进程挂、网络断 | Checkpoint 异步快照 + 重放 log |
4. 时间线(复习用)¶
timeline
title 流处理演化(简化)
section 延迟
日批不够 : T加1瓶颈
section 输入
Kafka日志 : 持久可重放
section 计算
Storm微批 : 低延迟原型
Flink真流 : 事件时间加状态
section 语义
Checkpoint : Exactly_once
5. 微批历史(一笔带过)¶
- Spark Streaming 等:把流 切成小 batch 跑 —— 简单,但 事件时间、迟到 支持弱。
- DDIA 6h 可跳 细节;记住:真流 vs 微批 是选型维度即可。
6. 选型:用挑战反推¶
| 你的挑战 | 更倾向 |
|---|---|
| T+1 报表、分区重跑够用 | 批处理 |
| 分钟级大盘、实时风控 | 流处理 |
| 既要历史回填又要实时 | Lambda / Kappa(批+流双路径,对账难) |
| 同 SQL 批流一体 | Flink batch mode / 其他流批引擎(规划项) |
30 秒版¶
无界事件流(Kafka / 埋点)
→ 赋事件时间 + 生成 Watermark
→ 按窗口 / 连续聚合(Keyed State)
→ Checkpoint 周期性快照
→ 写入 Sink(ClickHouse / Kafka / DB)
架构图(双视角)¶
应用视角:谁在用、数据怎么流¶
站在业务 / 平台用户:看见持续写入与实时查询,不是 TaskManager 线程。
flowchart LR
subgraph producers [生产者]
App[App 埋点]
CAPI[CAPI 回传]
end
subgraph log [消息日志]
Kafka[(Kafka Topic)]
end
subgraph stream [流计算]
Flink[Flink 作业]
end
subgraph serving [服务存储]
RTTable[(实时汇总表)]
CH[(ClickHouse)]
end
subgraph users [消费方]
Dash[实时大屏]
Alert[风控告警]
BatchFix[日批对账修正]
end
App --> Kafka
CAPI --> Kafka
Kafka --> Flink
Flink --> RTTable
Flink --> CH
RTTable --> Dash
CH --> Dash
CH --> Alert
CH --> BatchFix
| 节点 | 角色 |
|---|---|
| Kafka | 持久 log,可回溯、多订阅 |
| Flink | 7×24 计算,窗口 + 状态 |
| 实时表 | 低延迟 派生数据 |
| 日批对账 | 流结果与 Ch.10 批结果对齐 |
系统内部视角:一条事件在引擎里怎么走¶
flowchart TB
source[Source 拉取 partition offset]
parse[解析 赋 eventTime]
wm[Watermark 推进]
keyby[KeyBy 路由]
window[窗口缓冲]
state[Keyed State 累加]
ckpt[Checkpoint 快照 offset+state]
sink[Sink 写出]
backpressure[背压 下游慢则上游降速]
source --> parse
parse --> wm
wm --> keyby
keyby --> window
window --> state
state --> sink
sink -.-> backpressure
backpressure -.-> source
state --> ckpt
source --> ckpt
| 阶段 | 解决的历史问题 |
|---|---|
| offset | 消费进度可恢复 |
| eventTime + Watermark | 乱序下仍按业务时间出结果 |
| KeyBy + State | 无界流上做有状态聚合 |
| Window | 把无界切成 有界计算单元 |
| Checkpoint | 进程挂了就 接着算,不必从头 |
| 背压 | 下游写不动时 别 OOM |
MVP 示例代码(Java):SimpleStreamPipeline¶
教学用伪代码:内存 消息队列 模拟 Kafka;滚动窗口 + 事件时间 Watermark;checkpoint offset。
对应 系统内部视角。
// SimpleStreamPipeline.java (pseudocode)
//
// 场景:按 campaign_id 统计 1 分钟事件时间窗口内的 PV
// Watermark:当前最大 eventTime - 允许乱序 5s → 关闭窗口并输出
import java.util.*;
class SimpleStreamPipeline {
private final MessageLog log = new MessageLog();
private final Map<String, Long> keyedState = new HashMap<>(); // 简化:全局 state
private final Map<WindowKey, Long> windowCounts = new HashMap<>();
private long committedOffset = 0;
private long maxEventTime = Long.MIN_VALUE;
private static final long ALLOWED_LATENESS_MS = 5_000;
private static final long WINDOW_SIZE_MS = 60_000;
// ---------- 生产端(模拟埋点写入 Kafka)----------
void publish(Event e) {
log.append(e);
}
// ---------- 主循环(模拟 Flink runtime 持续消费)----------
void runOnce() {
List<LoggedRecord> batch = log.pollAfter(committedOffset);
for (LoggedRecord rec : batch) {
processRecord(rec);
committedOffset = rec.offset;
}
advanceWatermarksAndFireWindows();
}
private void processRecord(LoggedRecord rec) {
Event e = rec.event;
maxEventTime = Math.max(maxEventTime, e.eventTimeMs);
// KeyBy(campaignId) + 落入某个 event-time 窗口
long windowStart = (e.eventTimeMs / WINDOW_SIZE_MS) * WINDOW_SIZE_MS;
WindowKey wk = new WindowKey(e.campaignId, windowStart);
windowCounts.merge(wk, 1L, Long::sum);
}
private void advanceWatermarksAndFireWindows() {
long watermark = maxEventTime - ALLOWED_LATENESS_MS;
// 窗口 end <= watermark → 认为到齐,输出并删除
List<WindowKey> toFire = new ArrayList<>();
for (WindowKey wk : windowCounts.keySet()) {
long windowEnd = wk.windowStartMs + WINDOW_SIZE_MS;
if (windowEnd <= watermark) {
toFire.add(wk);
}
}
for (WindowKey wk : toFire) {
long count = windowCounts.remove(wk);
sinkWrite(wk, count); // 写出派生结果
}
}
// Checkpoint:快照 offset + 未触发窗口(生产还有 keyed state backend)
Checkpoint snapshot() {
return new Checkpoint(committedOffset, new HashMap<>(windowCounts), maxEventTime);
}
void restore(Checkpoint cp) {
committedOffset = cp.offset;
windowCounts.clear();
windowCounts.putAll(cp.pendingWindows);
maxEventTime = cp.maxEventTime;
}
private void sinkWrite(WindowKey wk, long count) {
System.out.printf("SINK campaign=%s windowStart=%d pv=%d%n",
wk.campaignId, wk.windowStartMs, count);
}
// ---------- 模拟 Kafka log ----------
static class MessageLog {
private final List<LoggedRecord> records = new ArrayList<>();
private long nextOffset = 1;
void append(Event e) {
records.add(new LoggedRecord(nextOffset++, e));
}
List<LoggedRecord> pollAfter(long offset) {
List<LoggedRecord> out = new ArrayList<>();
for (LoggedRecord r : records) {
if (r.offset > offset) out.add(r);
}
return out;
}
}
static class LoggedRecord {
long offset;
Event event;
LoggedRecord(long o, Event e) { offset = o; event = e; }
}
static class Event {
String campaignId;
long eventTimeMs; // 事件时间
Event(String c, long t) { campaignId = c; eventTimeMs = t; }
}
static class WindowKey {
String campaignId;
long windowStartMs;
WindowKey(String c, long w) { campaignId = c; windowStartMs = w; }
@Override public boolean equals(Object o) {
if (!(o instanceof WindowKey)) return false;
WindowKey w = (WindowKey) o;
return campaignId.equals(w.campaignId) && windowStartMs == w.windowStartMs;
}
@Override public int hashCode() { return Objects.hash(campaignId, windowStartMs); }
}
static class Checkpoint {
long offset;
Map<WindowKey, Long> pendingWindows;
long maxEventTime;
Checkpoint(long o, Map<WindowKey, Long> w, long m) {
offset = o; pendingWindows = w; maxEventTime = m;
}
}
// ---------- 演示:乱序事件 + checkpoint 恢复 ----------
public static void main(String[] args) {
SimpleStreamPipeline job = new SimpleStreamPipeline();
job.publish(new Event("c1", 10_000)); // 0:10
job.publish(new Event("c1", 65_000)); // 1:05 下一窗口
job.publish(new Event("c1", 8_000)); // 乱序:仍属 0:00 窗口
job.runOnce();
Checkpoint cp = job.snapshot();
job.publish(new Event("c2", 70_000));
job.runOnce();
// 模拟宕机恢复
SimpleStreamPipeline job2 = new SimpleStreamPipeline();
job2.restore(cp);
job2.runOnce();
}
}
MVP 与两张架构图的对应
| 架构图节点 | 代码里 |
|---|---|
| Source / offset | pollAfter(committedOffset) |
| eventTime | Event.eventTimeMs |
| Watermark | maxEventTime - ALLOWED_LATENESS |
| KeyBy + Window | WindowKey(campaignId, windowStart) |
| State | windowCounts |
| 触发输出 | windowEnd <= watermark → sinkWrite |
| Checkpoint | snapshot / restore |
故意没做:分布式 checkpoint barrier、KeyedStateBackend、精确一次 sink 事务、侧输出迟到流。
和 Ch.10 批处理 MVP 对比
| 批 MVP | 流 MVP | |
|---|---|---|
| 输入边界 | dt 分区 有界 |
无界 MessageLog |
| 算完触发 | 全分区算完 一次提交 | Watermark 关窗 |
| 容错 | rerun 删分区重算 |
Checkpoint 恢复 offset+窗口 |
核心概念下钻¶
无界输入与消息日志¶
| 批 | 流 | |
|---|---|---|
| 输入边界 | 分区 完结 | 永不完结(用窗口人为截断) |
| 存储隐喻 | 分区文件 | log + offset |
| 重复消费 | 重跑分区 | 至少一次 + 幂等 sink |
事件时间 vs 处理时间¶
| 时间类型 | 定义 | 风险 |
|---|---|---|
| 事件时间(event time) | 业务发生时刻(埋点字段) | 实现复杂,要处理乱序 |
| 处理时间(processing time) | 算子机器时钟 | 简单; 重跑结果变、不准确 |
历史为何选事件时间:广告计费、转化归因 必须以用户行为时间为准,不能用消费延迟。
详见 Flink § 时间语义。
Watermark 与窗口¶
Watermark = 「事件时间 ≤ T 的数据大致到齐」的信号 → 触发窗口 计算并输出。
事件: --e1--e3--e2(late)--e4-->
↑
Watermark 推进关闭窗口
允许 迟到数据:侧输出(side output)或 允许延迟 N(trade-off 准确度 vs 延迟)。
窗口类型¶
| 类型 | 含义 | 典型场景 |
|---|---|---|
| 滚动(tumbling) | 固定长度、不重叠 | 每分钟 PV |
| 滑动(sliding) | 固定长度、有步长 | 最近 5 分钟每 1 分钟更新 |
| 会话(session) | 间隙超时切分 | 用户一次访问会话 |
窗口 = 在 无界流 上制造 有界计算单元(联系 Ch.10 的「有界」)。
状态与容错¶
| 概念 | 作用 |
|---|---|
| Keyed State | 每个 key(如 user_id)独立状态;scale-out 时随 key 走 |
| Checkpoint | 异步打快照;失败从上次 checkpoint 恢复 + 重放 Kafka |
| Exactly-once(EOS) | 端到端语义:源可重放 + 算子快照 + 幂等 sink |
和批处理对比:批靠 重跑分区;流靠 checkpoint + 偏移量(见 批 vs 流)。
状态落盘常用 RocksDB(LSM) → 存储引擎 LSM。
流表对偶(一句话)¶
- 流 = 不断变化的表(changelog);表 = 流在某时刻的 snapshot。
- Flink SQL / 流批一体语法都建立在这层抽象上; 不必先写 DataStream API。
背压(一句)¶
下游慢 → 上游 自动降速,避免 OOM;Flink 内置。批处理更多靠 队列积压告警 + 限流。
与本专题映射¶
架构演进见 Flink § 定位(V1.0 批 → V1.2 流):
V1.0:connector → Airflow → dbt → ClickHouse(T+1)
V1.2:ingest-capi / Kafka → Flink → ClickHouse(分钟级)
| Flink 概念 | DDIA / 历史问题 | 项目对应 |
|---|---|---|
| Kafka Source | 持久 log | 埋点、CAPI 事件 |
| Event Time + Watermark | 乱序 | 转化窗口、归因 |
| Window | 无界→有界 | 滚动指标 |
| Keyed State | 有状态聚合 | 按 campaign/user 累计 |
| Checkpoint | 容错 | 作业升级不丢状态 |
| Sink | 派生数据写出 | ClickHouse 实时表 |
计费边界(待沉淀):哪些指标 必须日批对账,哪些可 流式近似 + 批修正。
批 vs 流(完整对照)¶
简表见 Ch.10。
| 维度 | 批处理 | 流处理 |
|---|---|---|
| 延迟 | 分钟~天 | 秒~分钟 |
| 输入 | 有界分区 | 无界事件流 |
| 正确性手段 | 重跑分区、幂等写 | Checkpoint、EOS、幂等 sink |
| 状态 | 每 job 冷启动或中间表 | 长期 Keyed State |
| 乱序 | 批前排序 / 分区已对齐 | Watermark + 迟到策略 |
| 成本 | 集中算、可错峰 | 7×24 算力、状态存储 |
| 适用 | 报表、对账、历史回填 | 大屏、风控、实时特征 |
同一份指标两套实现时:必须设计 对账(Ch.10 批为权威 or 流为近似)。
Flink 概念 ↔ DDIA 对照¶
| Flink | DDIA Ch.11 思想 |
|---|---|
DataStream |
无界数据 |
assignTimestampsAndWatermarks |
事件时间 |
Watermark |
进度与乱序 |
window |
有界计算 |
KeyedStream / State |
有状态算子 |
checkpoint |
容错与恢复 |
| Table API / SQL | 流表对偶 |
深潜 → flink-streaming.md
DDIA Ch.11 书内读法对照¶
| 建议读 | 可跳或扫读 |
|---|---|
| 传递事件流、消息代理 | 某单一 MQ 安装细节 |
| 分区与消费组 | — |
| 时间与时钟(事件/处理) | — |
| 窗口 | — |
| 流 join、连接 | 先懂概念,细节后补 |
| 容错、精确一次 | — |
| 批与流对比 | — |
读完本节,你应该能口述¶
- 流处理解决的历史问题:批太慢 + 数据持续到达。
- Kafka log → 流引擎 → Sink 三段各干什么。
- 事件时间、Watermark、窗口、Checkpoint 分别对付 乱序、何时出结果、无界、故障。
- 你们 V1.0 批 vs V1.2 流 边界;Flink 文档下一步读哪节。
下潜顺序:本节 → Part 2 分布式(EOS/对账)→ flink-streaming → Ch.10 批处理(对账视角)。
QA 疑问解答¶
格式:❌ 我的理解 + ✅ 纠正。关联正文锚点。
条目模板:
### Qn:标题(日期 可选)
**关联**:[正文锚点](#...)
❌ **我的理解(错 / 不完整)**
- …
✅ **纠正**
- …
(暂无条目 — 有疑问随学随记。)
待沉淀¶
- 流批指标对账方案
- 实时计费 vs 日批计费边界文档
- Lakehouse / 流批一体是否引入(链 ddia-framework §3.3)