opoojkk

理解 Kotlin Flow:冷热流与背压处理

lxx
目次

使用场景 #

1. 持续输出数据(如定时器、进度条) #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
fun timerFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(1000)
        emit(i) // 每秒发出一个进度
    }
}

fun main() = runBlocking {
    timerFlow()
        .onEach { println("Progress: $it/5") }
        .onCompletion { println("Done!") }
        .collect()
}

// 输出:
// Progress: 1/5
// Progress: 2/5
// Progress: 3/5
// Progress: 4/5
// Progress: 5/5
// Done!

2. 在不同线程中执行 #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
fun fetchData(): Flow<String> = flow {
    println("Running in thread: ${Thread.currentThread().name}")
    delay(1000)
    emit("Data from network")
}.flowOn(Dispatchers.IO) // 在 IO 线程执行

fun main() = runBlocking {
    fetchData()
        .onEach { println("Collect on thread: ${Thread.currentThread().name}") }
        .collect { println("Received: $it") }
}

// 输出:
// Running in thread: DefaultDispatcher-worker-1
// Collect on thread: main
// Received: Data from network

3. 合并多个异步请求 #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
fun getUser(): Flow<String> = flow {
    delay(500)
    emit("User: Alice")
}

fun getMessagesCount(): Flow<Int> = flow {
    delay(800)
    emit(42)
}

fun main() = runBlocking {
    combine(getUser(), getMessagesCount()) { user, count ->
        "$user, Unread messages: $count"
    }.collect { println(it) }
}

// 输出:
// User: Alice, Unread messages: 42

这些特性让人想起了 RxJava…


冷热流 #

Flow 官方文档中区分为冷流热流两种类型,指的是数据的发送特性。

类型生活类比特征
冷流 (Cold Flow)点外卖:每次点餐,餐厅才开始做菜每次订阅(collect)都会重新触发一次数据生产
热流 (Hot Flow)看直播:主播持续直播,新进来的观众只能看到当前及之后的内容数据持续发送,不会重放过去的数据

另一个比喻是将 Flow 看作一条传送带:冷流是按需求启动传送,将所有商品传送给你;热流则是始终在运行,你只能捕获你到达时之后的商品。


实现原理 #

Flow 的基础接口 #

Flow 是一个接口,只定义了一个方法用于收集数据。FlowCollector 负责发送数据:

1
2
3
4
5
6
7
8
9
// Flow.kt
public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}

// FlowCollector.kt
public fun interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

冷流的执行流程 #

以下面的代码为例:

1
2
3
4
5
CoroutineScope(Job()).launch {
    flowOf(1, 2, 3).collect {
        print(it)
    }
}

冷流的发送分为两个步骤:定义数据生产逻辑,然后触发数据收集。

代码实际上执行了 unsafeFlow 方法:

1
2
3
4
5
6
7
8
9
internal inline fun <T> unsafeFlow(
    @BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit
): Flow<T> {
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}

其核心逻辑是:使用 FlowCollector 执行外部传入的 blockFlowCollector 来自哪里?正是 collect() 方法参数传入的对象。由于 FlowCollector 是函数式接口,可以直接写成 collect { ... } 的形式。

小结:冷流先定义要让某个 FlowCollector 做什么事,再通过 collect() 找到具体执行者。这就是为什么只有调用 collect() 才会收集到所有数据。

热流:SharedFlow 和 StateFlow #

热流包括 StateFlowSharedFlow 两种,其中 StateFlowSharedFlow 的子类。不妨先理解 SharedFlow,再看 StateFlow 会更清晰。

热流的特点是持续发送数据,不关心是否有订阅者。示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
runBlocking {
    // 创建一个 SharedFlow,支持多个订阅者
    val eventFlow = MutableSharedFlow<String>()

    // 启动两个收集者
    launch {
        eventFlow.collect { value ->
            println("收集者A 收到事件:$value")
        }
    }

    launch {
        eventFlow.collect { value ->
            println("收集者B 收到事件:$value")
        }
    }

    // 模拟发送事件
    delay(100)
    eventFlow.emit("用户点击按钮")
    eventFlow.emit("加载完成")

    delay(500)
}

// 输出:
// 收集者A 收到事件:用户点击按钮
// 收集者B 收到事件:用户点击按钮
// 收集者A 收到事件:加载完成
// 收集者B 收到事件:加载完成

所有订阅者都会收到消息。默认情况下不会保留历史值,除非创建 SharedFlow 时设置了 replay 参数,将上一个值和最新值先后发送。

缓冲溢出处理 #

SharedFlow 内部实现了缓冲机制,通过 replay 参数控制缓存容量。当缓冲区满时,通过 BufferOverflow 枚举决定处理策略:


emit 发送流程 #

SharedFlow 的数据发送分为两个主要阶段:快速通道和常规通道。

快速通道 #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private fun tryEmitLocked(value: T): Boolean {
    // 快速路径:无订阅者 -> 无需缓冲
    if (nCollectors == 0) return tryEmitNoCollectorsLocked(value)
    
    // 有订阅者,需要检查缓冲区
    if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
        when (onBufferOverflow) {
            BufferOverflow.SUSPEND -> return false // 需要挂起
            BufferOverflow.DROP_LATEST -> return true // 丢弃新值
            BufferOverflow.DROP_OLDEST -> {} // 强制加入队列并丢弃旧值
        }
    }
    
    enqueueLocked(value)
    bufferSize++ // 值已加入缓冲区
    
    // 如果缓冲区超过容量,删除最旧的值
    if (bufferSize > bufferCapacity) dropOldestLocked()
    
    // 保持 replaySize 不超过必要值
    if (replaySize > replay) {
        updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
    }
    
    return true
}

快速通道的条件包括:

  1. 无订阅者:只需更新缓冲区,无需发送数据
  2. 有订阅者
    • 缓冲区未满:直接加入队列
    • 缓冲区已满且有未处理数据:
      • DROP_LATEST 策略:直接丢弃新值
      • DROP_OLDEST 策略:新值加入,旧值被丢弃

常规通道(挂起等待) #

当无法走快速通道(缓冲区满且策略为 SUSPEND)时,系统会:

  1. 再次检查是否满足快速通道条件
  2. 若仍不满足,将发送操作封装为 CancellableContinuation
  3. 通过协程状态机机制,待订阅者消费后唤醒该操作

collect 收集流程 #

collect() 时会将每个订阅者封装成 AbstractSharedFlowSlot。Slot 记录:

执行流程如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
collect()
├─ allocateSlot()                // 为当前订阅者分配 slot,跟踪其读取进度
├─ 无限循环处理数据:
│   ├─ tryTakeValue(slot)        // 尝试快速取值(非阻塞)
│   │   ├─ 缓冲区有新值 → 立即返回该值
│   │   └─ 缓冲区无新值 → awaitValue(slot) 挂起等待
│   │
│   ├─ ensureActive()            // 检查协程是否被取消
│   └─ emit(value)               // 将值发送给下游
└─ finally → freeSlot()          // 收集完成或异常时释放 slot

StateFlow #

StateFlowSharedFlow 的特殊版本,主要区别:

每次调用 emit() 都会触发状态更新,订阅者同样被封装成 slot。状态更新通过 CAS(Compare-And-Set)原子操作实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
fun makePending() {
    _state.loop { state ->
        when {
            state == null -> return // slot 已释放,跳过
            state === PENDING -> return // 已标记为待发送,无需操作
            state === NONE -> { // 标记为待发送
                if (_state.compareAndSet(state, PENDING)) return
            }
            else -> { // slot 中存储着挂起的 Continuation
                if (_state.compareAndSet(state, NONE)) {
                    (state as CancellableContinuationImpl<Unit>).resume(Unit)
                    return
                }
            }
        }
    }
}

StateFlowcollect()SharedFlow 简化,因为只需处理单个值的更新,无需考虑 replay 机制。


背压处理 #

背压问题本质上是数据生产速率高于消费速率。Flow 框架通过 BufferOverflow 策略应对:

选择哪种策略取决于具体场景的需求。

总结 #

Kotlin Flow 是一套完整的异步数据流解决方案,它继承了 RxJava 的思想,同时又利用协程的优势,使得代码更简洁易懂。

核心要点回顾:

应用建议:

标签:
Categories: