joy keeps flowin'

理解Kotlin协程

xx
目次

记下自己的一点浅薄的理解,想要更深入学习的朋友推荐阅读霍老的破解 Kotlin 协程

使用 #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

启动时包含三个参数:

  1. 协程上下文
  2. 启动模式
  3. 执行方法

协程上下文 #

CoroutineContext中定义了get、plus、minusKey方法。类似list。

CoroutineContextList
get(Key)get(Int)
plus(CoroutineContext)plus(List)
minusKey(Key)removeAt(Int)

官方的注释也表明,这是一个set的接口类。

1
2
3
4
5
/**
 * Persistent context for the coroutine. It is an indexed set of [Element] instances.
 * An indexed set is a mix between a set and a map.
 * Every element in this set has a unique [Key].
 */

不用在意礼CoroutineContext类中是否有存储的数组或是其他存储数据的结构,这就是一个数组,怎么存储蛮可以交给子类实现。

key

1
public interface Key<E : Element>

element

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public interface Element : CoroutineContext {
        /**
         * A key of this coroutine context element.
         */
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }

默认情况在,CoroutineContext用的是EmptyCoroutineContext。几个协程上下文想加,实际上用的就是CombinedContext了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
‍‍```c#
internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {

    override fun <E : Element> get(key: Key<E>): E? {
        var cur = this
        while (true) {
            cur.element[key]?.let { return it }
            val next = cur.left
            if (next is CombinedContext) {
                cur = next
            } else {
                return next[key]
            }
        }
    }

	// 省略其他代码
}
‍‍```

可以在CombinedContext中看到,内部是类似链表的结构,一个是当前节点存储的值(Element),另一个是指向下一个节点的指针(left)。

到这里CoroutineContext的结构也就清晰了。响应的,EmptyCoroutineContext中都是空的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@SinceKotlin("1.3")
public object EmptyCoroutineContext : CoroutineContext, Serializable {
    private const val serialVersionUID: Long = 0
    private fun readResolve(): Any = EmptyCoroutineContext

    public override fun <E : Element> get(key: Key<E>): E? = null
    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
    public override fun plus(context: CoroutineContext): CoroutineContext = context
    public override fun minusKey(key: Key<*>): CoroutineContext = this
    public override fun hashCode(): Int = 0
    public override fun toString(): String = "EmptyCoroutineContext"
}

一个CoroutineContext+另一个CoroutineContext的写法很常见,实际上是CoroutineContext中定义了operator fun plus方法,可以用+调用。翻开源码可以注意到组合时总是要先把ContinuationInterceptor放到链表(CombinedContext)最前面的位置以保证最先执行。

因为CombinedContext的结构是类似3<-2<-1的,因此最先被拿到的是实际上是末尾的1这个元素。

CoroutineStart #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
/**
 * Defines start options for coroutines builders.
 *
 * It is used in the `start` parameter of coroutine builder functions like
 * [launch][CoroutineScope.launch] and [async][CoroutineScope.async]
 * to describe when and how the coroutine should be dispatched initially.
 *
 * This parameter only affects how the coroutine behaves until the code of its body starts executing.
 * After that, cancellability and dispatching are defined by the behavior of the invoked suspending functions.
 *
 * The summary of coroutine start options is:
 * - [DEFAULT] immediately schedules the coroutine for execution according to its context.
 * - [LAZY] delays the moment of the initial dispatch until the result of the coroutine is needed.
 * - [ATOMIC] prevents the coroutine from being cancelled before it starts, ensuring that its code will start
 *   executing in any case.
 * - [UNDISPATCHED] immediately executes the coroutine until its first suspension point _in the current thread_.
 */

决定以哪种策略执行任务。

DEFAULT:默认,立即安排执行。如果执行前被取消,将不会被执行。

LAZY:需要的时机执行。调用Job.join或是start。如果执行前被取消,将不会被执行。

ATOMIC:与默认类似,如果执行前被取消,会继续执行。

UNDISPATCHED:立即执行,直到第一个挂起点。

详细策略说明参见代码注释。路径:kotlinx.coroutines.CoroutineStart

拦截器 #

拦截器是什么东西呢?

归根到底也是CoroutineContext的一个子类,在他的基础上额外实现了ContinuationInterceptor接口中interceptContinuation方法。Kotlin中现成的拦截器可以一直顺到Dispatchers的Default、Main、Unconfined、IO上,

除了上面几个常用的,你也可以自己定义拦截器。

以Dispatchers.Main为例,最后创建的是一个HandlerContext,Android开发的朋友看这个名字就猜到了,没错,就是Handler。

1
2
3
4
5
override fun dispatch(context: CoroutineContext, block: Runnable) {
        if (!handler.post(block)) {
            cancelOnRejection(context, block)
        }
    }

再回到launch时,根据CoroutineStart的策略,创建好Continuation,开始执行拦截逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// kotlinx.coroutines.intrinsics.Cancelable.kt

/**
 * Use this function to start coroutine in a cancellable way, so that it can be cancelled
 * while waiting to be dispatched.
 */
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}


// kotlin.coroutines.jvm.internal.ContinuationImpl.kt

public fun intercepted(): Continuation<Any?> =
    intercepted
        ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
            .also { intercepted = it }

这是是Dispatchers.Main的,中间绕了几弯,最终会执行到HandlerContext(基类是HandlerDispatcher)。

1
2
3
4
5
override fun dispatch(context: CoroutineContext, block: Runnable) {
    if (!handler.post(block)) {
        cancelOnRejection(context, block)
    }
}

都已经Handler了,还有啥好说的🤷‍♂️。

这是Dispatcher.Main可以切换线程的原因。

Continuation #

Continuation又是什么呢?

the fact of continuing or a thing that continues or follows from something else 繼續;持續;延續;繼續發生(或進行)之事;某事的後續結果(或影響)

Cambridge Dictionary

按照CoroutineStart为Default找了半天,其实是封装了执行的一个类。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
 * Creates unintercepted coroutine without receiver and with result type [T].
 * This function creates a new, fresh instance of suspendable computation every time it is invoked.
 *
 * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
 * The [completion] continuation is invoked when coroutine completes with result or exception.
 *
 * This function returns unintercepted continuation.
 * Invocation of `resume(Unit)` starts coroutine immediately in the invoker's call stack without going through the
 * [ContinuationInterceptor] that might be present in the completion's [CoroutineContext].
 * It is the invoker's responsibility to ensure that a proper invocation context is established.
 * Note that [completion] of this function may get invoked in an arbitrary context.
 *
 * [Continuation.intercepted] can be used to acquire the intercepted continuation.
 * Invocation of `resume(Unit)` on intercepted continuation guarantees that execution of
 * both the coroutine and [completion] happens in the invocation context established by
 * [ContinuationInterceptor].
 *
 * Repeated invocation of any resume function on the resulting continuation corrupts the
 * state machine of the coroutine and may result in arbitrary behaviour or exception.
 */
@SinceKotlin("1.3")
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(probeCompletion)
    else
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}

执行过程中传递的都是Continuation对象。

async执行的流程大致相似,返回值是从await返回的,而不是launch方法。

协程Scope #

CoroutineScope是一个提供了协程上下文的接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// CoroutineScope.kt

//CoroutineScope
public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}

// MainScope
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
// ContextScope
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
    override val coroutineContext: CoroutineContext = context
    // CoroutineScope is used intentionally for user-friendly representation
    override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}

// GlobalScope
public object GlobalScope : CoroutineScope {
    /**
     * Returns [EmptyCoroutineContext].
     */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

LifecycleScope也没忘,LifecycleScope实际的实现类是LifecycleCoroutineScopeImpl。再原基础上增加Lifecycle以实现监听声明周期变化。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
internal class LifecycleCoroutineScopeImpl(
    override val lifecycle: Lifecycle,
    override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver{
    ...
    fun register() {
        launch(Dispatchers.Main.immediate) {
            if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) {
                lifecycle.addObserver(this@LifecycleCoroutineScopeImpl)
            } else {
                coroutineContext.cancel()
            }
        }
    }

    override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
        if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
            lifecycle.removeObserver(this)
            coroutineContext.cancel()
        }
    }
}

参考 #

破解 Kotlin 协程(3):协程调度篇

标签:
Categories: