理解 Kotlin Flow:冷热流与背压处理 lxx 2025年10月16日 目次 使用场景 # 1. 持续输出数据(如定时器、进度条) # 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun timerFlow (): Flow < Int > = flow {
for ( i in 1. . 5 ) {
delay ( 1000 )
emit ( i ) // 每秒发出一个进度
}
}
fun main () = runBlocking {
timerFlow ()
. onEach { println ( "Progress: $it /5" ) }
. onCompletion { println ( "Done!" ) }
. collect ()
}
// 输出:
// Progress: 1/5
// Progress: 2/5
// Progress: 3/5
// Progress: 4/5
// Progress: 5/5
// Done!
2. 在不同线程中执行 # 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun fetchData (): Flow < String > = flow {
println ( "Running in thread: ${Thread.currentThread().name} " )
delay ( 1000 )
emit ( "Data from network" )
}. flowOn ( Dispatchers . IO ) // 在 IO 线程执行
fun main () = runBlocking {
fetchData ()
. onEach { println ( "Collect on thread: ${Thread.currentThread().name} " ) }
. collect { println ( "Received: $it " ) }
}
// 输出:
// Running in thread: DefaultDispatcher-worker-1
// Collect on thread: main
// Received: Data from network
3. 合并多个异步请求 # 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun getUser (): Flow < String > = flow {
delay ( 500 )
emit ( "User: Alice" )
}
fun getMessagesCount (): Flow < Int > = flow {
delay ( 800 )
emit ( 42 )
}
fun main () = runBlocking {
combine ( getUser (), getMessagesCount ()) { user , count ->
" $user , Unread messages: $count "
}. collect { println ( it ) }
}
// 输出:
// User: Alice, Unread messages: 42
这些特性让人想起了 RxJava…
冷热流 # Flow 官方文档中区分为冷流 和热流 两种类型,指的是数据的发送特性。
类型 生活类比 特征 冷流 (Cold Flow) 点外卖 :每次点餐,餐厅才开始做菜每次订阅(collect)都会重新触发一次数据生产 热流 (Hot Flow) 看直播 :主播持续直播,新进来的观众只能看到当前及之后的内容数据持续发送,不会重放过去的数据
另一个比喻是将 Flow 看作一条传送带:冷流是按需求启动传送,将所有商品传送给你;热流则是始终在运行,你只能捕获你到达时之后的商品。
实现原理 # Flow 的基础接口 # Flow 是一个接口,只定义了一个方法用于收集数据。FlowCollector
负责发送数据:
1
2
3
4
5
6
7
8
9
// Flow.kt
public interface Flow < out T > {
public suspend fun collect ( collector : FlowCollector < T >)
}
// FlowCollector.kt
public fun interface FlowCollector < in T > {
public suspend fun emit ( value : T )
}
冷流的执行流程 # 以下面的代码为例:
1
2
3
4
5
CoroutineScope ( Job ()). launch {
flowOf ( 1 , 2 , 3 ). collect {
print ( it )
}
}
冷流的发送分为两个步骤:定义数据生产逻辑,然后触发数据收集。
代码实际上执行了 unsafeFlow
方法:
1
2
3
4
5
6
7
8
9
internal inline fun < T > unsafeFlow (
@BuilderInference crossinline block : suspend FlowCollector < T >.() -> Unit
): Flow < T > {
return object : Flow < T > {
override suspend fun collect ( collector : FlowCollector < T >) {
collector . block ()
}
}
}
其核心逻辑是:使用 FlowCollector
执行外部传入的 block
。FlowCollector
来自哪里?正是 collect()
方法参数传入的对象。由于 FlowCollector
是函数式接口,可以直接写成 collect { ... }
的形式。
小结 :冷流先定义要让某个 FlowCollector
做什么事,再通过 collect()
找到具体执行者。这就是为什么只有调用 collect()
才会收集到所有数据。
热流:SharedFlow 和 StateFlow # 热流包括 StateFlow
和 SharedFlow
两种,其中 StateFlow
是 SharedFlow
的子类。不妨先理解 SharedFlow
,再看 StateFlow
会更清晰。
热流的特点是持续发送数据,不关心是否有订阅者。示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
runBlocking {
// 创建一个 SharedFlow,支持多个订阅者
val eventFlow = MutableSharedFlow < String >()
// 启动两个收集者
launch {
eventFlow . collect { value ->
println ( "收集者A 收到事件: $value " )
}
}
launch {
eventFlow . collect { value ->
println ( "收集者B 收到事件: $value " )
}
}
// 模拟发送事件
delay ( 100 )
eventFlow . emit ( "用户点击按钮" )
eventFlow . emit ( "加载完成" )
delay ( 500 )
}
// 输出:
// 收集者A 收到事件:用户点击按钮
// 收集者B 收到事件:用户点击按钮
// 收集者A 收到事件:加载完成
// 收集者B 收到事件:加载完成
所有订阅者都会收到消息。默认情况下不会保留历史值,除非创建 SharedFlow
时设置了 replay
参数,将上一个值和最新值先后发送。
缓冲溢出处理 # SharedFlow
内部实现了缓冲机制,通过 replay
参数控制缓存容量。当缓冲区满时,通过 BufferOverflow
枚举决定处理策略:
SUSPEND
:挂起等待直到有消费者处理缓冲区数据DROP_OLDEST
:丢弃最早的数据DROP_LATEST
:丢弃最新的数据emit 发送流程 # SharedFlow 的数据发送分为两个主要阶段:快速通道和常规通道。
快速通道 # 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private fun tryEmitLocked ( value : T ): Boolean {
// 快速路径:无订阅者 -> 无需缓冲
if ( nCollectors == 0 ) return tryEmitNoCollectorsLocked ( value )
// 有订阅者,需要检查缓冲区
if ( bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex ) {
when ( onBufferOverflow ) {
BufferOverflow . SUSPEND -> return false // 需要挂起
BufferOverflow . DROP_LATEST -> return true // 丢弃新值
BufferOverflow . DROP_OLDEST -> {} // 强制加入队列并丢弃旧值
}
}
enqueueLocked ( value )
bufferSize ++ // 值已加入缓冲区
// 如果缓冲区超过容量,删除最旧的值
if ( bufferSize > bufferCapacity ) dropOldestLocked ()
// 保持 replaySize 不超过必要值
if ( replaySize > replay ) {
updateBufferLocked ( replayIndex + 1 , minCollectorIndex , bufferEndIndex , queueEndIndex )
}
return true
}
快速通道的条件包括:
无订阅者 :只需更新缓冲区,无需发送数据有订阅者 :缓冲区未满:直接加入队列 缓冲区已满且有未处理数据:DROP_LATEST
策略:直接丢弃新值DROP_OLDEST
策略:新值加入,旧值被丢弃 常规通道(挂起等待) # 当无法走快速通道(缓冲区满且策略为 SUSPEND
)时,系统会:
再次检查是否满足快速通道条件 若仍不满足,将发送操作封装为 CancellableContinuation
通过协程状态机机制,待订阅者消费后唤醒该操作 collect 收集流程 # collect()
时会将每个订阅者封装成 AbstractSharedFlowSlot
。Slot 记录:
订阅者的 Continuation
(协程中断点) 当前收集到的缓冲数据索引 执行流程如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
collect()
│
├─ allocateSlot() // 为当前订阅者分配 slot,跟踪其读取进度
│
├─ 无限循环处理数据:
│ ├─ tryTakeValue(slot) // 尝试快速取值(非阻塞)
│ │ ├─ 缓冲区有新值 → 立即返回该值
│ │ └─ 缓冲区无新值 → awaitValue(slot) 挂起等待
│ │
│ ├─ ensureActive() // 检查协程是否被取消
│ └─ emit(value) // 将值发送给下游
│
└─ finally → freeSlot() // 收集完成或异常时释放 slot
StateFlow # StateFlow
是 SharedFlow
的特殊版本,主要区别:
增加了 value
属性存储当前状态 禁止修改 replay
参数,只能缓存一个值 内部逻辑简化为纯粹的状态更新 每次调用 emit()
都会触发状态更新,订阅者同样被封装成 slot。状态更新通过 CAS(Compare-And-Set)原子操作实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun makePending () {
_state . loop { state ->
when {
state == null -> return // slot 已释放,跳过
state === PENDING -> return // 已标记为待发送,无需操作
state === NONE -> { // 标记为待发送
if ( _state . compareAndSet ( state , PENDING )) return
}
else -> { // slot 中存储着挂起的 Continuation
if ( _state . compareAndSet ( state , NONE )) {
( state as CancellableContinuationImpl < Unit >). resume ( Unit )
return
}
}
}
}
}
StateFlow
的 collect()
比 SharedFlow
简化,因为只需处理单个值的更新,无需考虑 replay 机制。
背压处理 # 背压问题本质上是数据生产速率高于消费速率 。Flow 框架通过 BufferOverflow
策略应对:
SUSPEND :生产者等待,直到消费者追上DROP_OLDEST :丢弃最早未被消费的数据DROP_LATEST :丢弃最新生成的数据选择哪种策略取决于具体场景的需求。
总结 # Kotlin Flow 是一套完整的异步数据流解决方案,它继承了 RxJava 的思想,同时又利用协程的优势,使得代码更简洁易懂。
核心要点回顾:
冷流 vs 热流:冷流按需生产,每次订阅都独立执行;热流持续输出,订阅者共享同一数据源 Flow 家族:基础 Flow 用于异步序列,SharedFlow 支持多订阅者广播,StateFlow 简化为单值状态管理 性能优化:FastPath 快速通道避免不必要的协程调度,大多数情况下可直接返回结果 背压策略:通过 BufferOverflow 灵活应对生产消费速率不匹配的场景 内部机制:Slot + Continuation 的组合,实现高效的事件分发和挂起恢复 应用建议:
数据流、事件系统、UI 状态管理等场景优先考虑 Flow 需要保留最新状态的地方用 StateFlow 需要事件广播的场景用 SharedFlow 合理配置缓冲大小和溢出策略,避免内存压力 下篇:Kotlin 编译器插件(译)