理解 Kotlin Flow:冷热流与背压处理 lxx 2025年10月16日 目次 使用场景 # 1. 持续输出数据(如定时器、进度条) # 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun timerFlow (): Flow < Int > = flow {
for ( i in 1. . 5 ) {
delay ( 1000 )
emit ( i ) // 每秒发出一个进度
}
}
fun main () = runBlocking {
timerFlow ()
. onEach { println ( "Progress: $it /5" ) }
. onCompletion { println ( "Done!" ) }
. collect ()
}
// 输出:
// Progress: 1/5
// Progress: 2/5
// Progress: 3/5
// Progress: 4/5
// Progress: 5/5
// Done!
2. 在不同线程中执行 # 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun fetchData (): Flow < String > = flow {
println ( "Running in thread: ${Thread.currentThread().name} " )
delay ( 1000 )
emit ( "Data from network" )
}. flowOn ( Dispatchers . IO ) // 在 IO 线程执行
fun main () = runBlocking {
fetchData ()
. onEach { println ( "Collect on thread: ${Thread.currentThread().name} " ) }
. collect { println ( "Received: $it " ) }
}
// 输出:
// Running in thread: DefaultDispatcher-worker-1
// Collect on thread: main
// Received: Data from network
3. 合并多个异步请求 # 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun getUser (): Flow < String > = flow {
delay ( 500 )
emit ( "User: Alice" )
}
fun getMessagesCount (): Flow < Int > = flow {
delay ( 800 )
emit ( 42 )
}
fun main () = runBlocking {
combine ( getUser (), getMessagesCount ()) { user , count ->
" $user , Unread messages: $count "
}. collect { println ( it ) }
}
// 输出:
// User: Alice, Unread messages: 42
这些特性让人想起了 RxJava…
冷热流 # Flow 官方文档中区分为冷流 和热流 两种类型,指的是数据的发送特性。
类型 生活类比 特征 冷流 (Cold Flow) 点外卖 :每次点餐,餐厅才开始做菜每次订阅(collect)都会重新触发一次数据生产 热流 (Hot Flow) 看直播 :主播持续直播,新进来的观众只能看到当前及之后的内容数据持续发送,不会重放过去的数据
另一个比喻是将 Flow 看作一条传送带:冷流是按需求启动传送,将所有商品传送给你;热流则是始终在运行,你只能捕获你到达时之后的商品。
实现原理 # Flow 的基础接口 # Flow 是一个接口,只定义了一个方法用于收集数据。FlowCollector 负责发送数据:
1
2
3
4
5
6
7
8
9
// Flow.kt
public interface Flow < out T > {
public suspend fun collect ( collector : FlowCollector < T >)
}
// FlowCollector.kt
public fun interface FlowCollector < in T > {
public suspend fun emit ( value : T )
}
冷流的执行流程 # 以下面的代码为例:
1
2
3
4
5
CoroutineScope ( Job ()). launch {
flowOf ( 1 , 2 , 3 ). collect {
print ( it )
}
}
冷流的发送分为两个步骤:定义数据生产逻辑,然后触发数据收集。
代码实际上执行了 unsafeFlow 方法:
1
2
3
4
5
6
7
8
9
internal inline fun < T > unsafeFlow (
@BuilderInference crossinline block : suspend FlowCollector < T >.() -> Unit
): Flow < T > {
return object : Flow < T > {
override suspend fun collect ( collector : FlowCollector < T >) {
collector . block ()
}
}
}
其核心逻辑是:使用 FlowCollector 执行外部传入的 block。FlowCollector 来自哪里?正是 collect() 方法参数传入的对象。由于 FlowCollector 是函数式接口,可以直接写成 collect { ... } 的形式。
小结 :冷流先定义要让某个 FlowCollector 做什么事,再通过 collect() 找到具体执行者。这就是为什么只有调用 collect() 才会收集到所有数据。
热流:SharedFlow 和 StateFlow # 热流包括 StateFlow 和 SharedFlow 两种,其中 StateFlow 是 SharedFlow 的子类。不妨先理解 SharedFlow,再看 StateFlow 会更清晰。
热流的特点是持续发送数据,不关心是否有订阅者。示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
runBlocking {
// 创建一个 SharedFlow,支持多个订阅者
val eventFlow = MutableSharedFlow < String >()
// 启动两个收集者
launch {
eventFlow . collect { value ->
println ( "收集者A 收到事件: $value " )
}
}
launch {
eventFlow . collect { value ->
println ( "收集者B 收到事件: $value " )
}
}
// 模拟发送事件
delay ( 100 )
eventFlow . emit ( "用户点击按钮" )
eventFlow . emit ( "加载完成" )
delay ( 500 )
}
// 输出:
// 收集者A 收到事件:用户点击按钮
// 收集者B 收到事件:用户点击按钮
// 收集者A 收到事件:加载完成
// 收集者B 收到事件:加载完成
所有订阅者都会收到消息。默认情况下不会保留历史值,除非创建 SharedFlow 时设置了 replay 参数,将上一个值和最新值先后发送。
缓冲溢出处理 # SharedFlow 内部实现了缓冲机制,通过 replay 参数控制缓存容量。当缓冲区满时,通过 BufferOverflow 枚举决定处理策略:
SUSPEND:挂起等待直到有消费者处理缓冲区数据DROP_OLDEST:丢弃最早的数据DROP_LATEST:丢弃最新的数据emit 发送流程 # SharedFlow 的数据发送分为两个主要阶段:快速通道和常规通道。
快速通道 # 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private fun tryEmitLocked ( value : T ): Boolean {
// 快速路径:无订阅者 -> 无需缓冲
if ( nCollectors == 0 ) return tryEmitNoCollectorsLocked ( value )
// 有订阅者,需要检查缓冲区
if ( bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex ) {
when ( onBufferOverflow ) {
BufferOverflow . SUSPEND -> return false // 需要挂起
BufferOverflow . DROP_LATEST -> return true // 丢弃新值
BufferOverflow . DROP_OLDEST -> {} // 强制加入队列并丢弃旧值
}
}
enqueueLocked ( value )
bufferSize ++ // 值已加入缓冲区
// 如果缓冲区超过容量,删除最旧的值
if ( bufferSize > bufferCapacity ) dropOldestLocked ()
// 保持 replaySize 不超过必要值
if ( replaySize > replay ) {
updateBufferLocked ( replayIndex + 1 , minCollectorIndex , bufferEndIndex , queueEndIndex )
}
return true
}
快速通道的条件包括:
无订阅者 :只需更新缓冲区,无需发送数据有订阅者 :缓冲区未满:直接加入队列 缓冲区已满且有未处理数据:DROP_LATEST 策略:直接丢弃新值DROP_OLDEST 策略:新值加入,旧值被丢弃 常规通道(挂起等待) # 当无法走快速通道(缓冲区满且策略为 SUSPEND)时,系统会:
再次检查是否满足快速通道条件 若仍不满足,将发送操作封装为 CancellableContinuation 通过协程状态机机制,待订阅者消费后唤醒该操作 collect 收集流程 # collect() 时会将每个订阅者封装成 AbstractSharedFlowSlot。Slot 记录:
订阅者的 Continuation(协程中断点) 当前收集到的缓冲数据索引 执行流程如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
collect()
│
├─ allocateSlot() // 为当前订阅者分配 slot,跟踪其读取进度
│
├─ 无限循环处理数据:
│ ├─ tryTakeValue(slot) // 尝试快速取值(非阻塞)
│ │ ├─ 缓冲区有新值 → 立即返回该值
│ │ └─ 缓冲区无新值 → awaitValue(slot) 挂起等待
│ │
│ ├─ ensureActive() // 检查协程是否被取消
│ └─ emit(value) // 将值发送给下游
│
└─ finally → freeSlot() // 收集完成或异常时释放 slot
StateFlow # StateFlow 是 SharedFlow 的特殊版本,主要区别:
增加了 value 属性存储当前状态 禁止修改 replay 参数,只能缓存一个值 内部逻辑简化为纯粹的状态更新 每次调用 emit() 都会触发状态更新,订阅者同样被封装成 slot。状态更新通过 CAS(Compare-And-Set)原子操作实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun makePending () {
_state . loop { state ->
when {
state == null -> return // slot 已释放,跳过
state === PENDING -> return // 已标记为待发送,无需操作
state === NONE -> { // 标记为待发送
if ( _state . compareAndSet ( state , PENDING )) return
}
else -> { // slot 中存储着挂起的 Continuation
if ( _state . compareAndSet ( state , NONE )) {
( state as CancellableContinuationImpl < Unit >). resume ( Unit )
return
}
}
}
}
}
StateFlow 的 collect() 比 SharedFlow 简化,因为只需处理单个值的更新,无需考虑 replay 机制。
背压(Backpressure) # 背压是什么 # “背压”指的是上游发射数据的速度超过下游处理能力导致的积压问题。
在 Flow 中,背压常见于:
热流:数据持续产生,下游来不及消费。 冷流:虽然每次重新生产,但如果中间操作符不处理,也可能积压。 Flow 的内建背压:挂起机制 # Flow 的 emit() 是挂起函数,如果下游还没处理完,上游会自动挂起,不会像 RxJava 那样“暴走式”地发:
1
2
3
4
5
6
7
8
flow {
repeat ( 1000 ) {
emit ( it ) // 下游慢的话,这里会挂起
}
}. collect {
delay ( 100 ) // 模拟慢消费
println ( it )
}
这就是 Flow 的“自带背压控制”:天然限速。
SharedFlow 的背压策略:BufferOverflow # 热流场景下,SharedFlow 的背压控制靠 BufferOverflow:
1
2
3
4
5
val flow = MutableSharedFlow < Int >(
replay = 0 ,
extraBufferCapacity = 2 ,
onBufferOverflow = BufferOverflow . DROP_OLDEST
)
策略 行为 适用场景 SUSPEND挂起发送者等待消费完成 对数据完整性要求高 DROP_OLDEST丢掉最老的值,保留最新 UI 状态、最新数据为准 DROP_LATEST丢掉当前值,保留已有 已有值重要性更高
冷流的外部背压:操作符控制 # 冷流本身挂起就能控制背压,但还能通过操作符增强控制:
1
2
3
4
flow {
repeat ( 1000 ) { emit ( it ) }
}. buffer ( 100 )
. collect { delay ( 100 ) }
1
2
3
4
flow {
repeat ( 1000 ) { emit ( it ) }
}. conflate ()
. collect { delay ( 100 ) }
collectLatest():取消未完成的处理,只处理最新值。1
2
3
4
5
6
flow {
repeat ( 1000 ) { emit ( it ) }
}. collectLatest { value ->
delay ( 100 )
println ( "Processing $value " )
}
这些操作符本质上是“外部背压策略”,和 SharedFlow 的内部策略互补。
总结:Flow 的设计哲学与实战建议 # Flow 的设计哲学可以用一句话概括:“用挂起取代背压,用语义取代样板。”
它不像 RxJava 那样通过复杂的背压策略来管理数据流速,而是直接通过 suspend 的天然特性让上游“等一等”下游,简化了响应式流的心智负担。同时,Flow 的 API 更接近 Kotlin 本身的语义风格,与协程深度融合,让异步代码更直观、更易读。
在实际项目中:
数据有限、按需生产时优先用冷流,例如网络请求结果、数据库查询等场景; 数据持续产生、需要广播时使用热流,例如 UI 状态同步、事件分发; 涉及频繁事件时关注背压,灵活使用 buffer()、conflate()、collectLatest() 或 BufferOverflow 策略,避免性能问题; 复杂流组合场景下善用操作符(如 combine、flatMapLatest),可以极大简化数据流处理逻辑。 Flow 并不是 RxJava 的“轻量替代品”,而是站在协程语义之上的一套响应式思想实现。
理解了它的背压机制和冷热流模型之后,才可能用它构建出既优雅又高效的数据流管道。
上篇:一次 Android APK 体积优化实录
下篇:Kotlin 编译器插件(译)