一 协程的基础设施

何言 2021年11月08日 84次浏览

这是协程系列第一篇,目录:
Kotlin coroutines 协程 学习笔记

协程的基础设施

1.1 协程的创建

首先先来用最简单的代码创建一个最基础的协程:

fun main(){
    // s 为一个 Continuation 类型的对象
    val s = suspend { // suspend 方法体
        // 协程执行的内容
        println(2) 
        1 // 返回值
    }.createCoroutine(object : Continuation<Int> {
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            println(3)
        }
    })
    println(1)
    // 当调用 Continuation 的 resume 方法时开始执行协程
    s.resume(Unit)
    println(4)
}

打印结果为 1234,这里还没有发挥出协程的任何作用,我们注意到 s 是一个 Continuation 类型的对象,我们先来看看该类:

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}

可以看到就是一个简单的接口,有一个 context 作为协程上下文,这个之后会说明,以及一个 resumeWith 方法,当我们一个协程挂起的时候,可以通过调用该方法继续执行该协程并传入结果。

kotlin coroutines 这个库的设计思路和线程不一致,在 Java 的线程模型中,有一个类 Thread 与其对应,我们可以较方便的使用和学习线程。但是协程中并没有提供一个类型,而是提供了一些基础设施,我们可以用这些基础设施设计出自己的异步程序,在之后的学习中会更加体现这一点。

关于 result 就是一个简单的实体类,存放结果与异常,以及是否运行成功的标记,具体 api 这里不给出。

当我们调用 createCoroutine 之后,会返回一个 Continuation 对象,我们需要调用其 resumeWith 方法才会执行,一般我们创建协程都是直接执行,我们可以调用 startCoroutine 方法 。

此外,以上 api 还有方法重载,可以传入协程体的 reveiver:

fun <R, T> (suspend R.()-> T).createCoroutine(
	receiver: R,
    completion: Continuation<T>
): Continuation<Unit>

fun <R, T> (suspend R.()-> T).startCoroutine(
	receiver: R,
    completion: Continuation<T>
)

1.2 协程的挂起

除了创建的方法,kotlin coroutines 还提供了一个挂起当前协程的基础方法,suspendCoroutine ,该方法是一个 suspend 方法,因此必须在另一个 suspend 方法体中调用,这里为了方便编码,我们将 main 方法加入 suspend 关键字。

首先我们先明白带 suspend 的 main 函数,在 1.3 版本之后,kotlin 支持带 suspend 的 main 方法。其效果可以等价于以下代码(实际上不一样,只不过效果等价)

fun main(){
    ::suspendMain.startCoroutine(object: Continuation<Unit> {
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Unit>) {
            result.getOrThrow()
        }
    })
}
suspend fun suspendMain(){
    // 协程中执行
}

// 等价 =======================================================

suspend fun main(){
    // 协程中执行
}

接下来介绍 suspendCoroutine 方法 我们先来快看以下代码:

suspend fun main(){
    // 协程开始执行
    println(1)
    
    // 挂起自己
    suspendCoroutine<Unit> { co: Continuation<Unit> -> // 可控制协程恢复的变量
        
        // 挂起之前要执行的代码
        println(2)
        
        // 开异步线程
        thread{
            
            // 休眠
            println(4)
            Thread.sleep(1000)
            
            // 恢复 协程运行
            // 是一个拓展方法,直接用成功的 Result 包装传入的内容并调用 resumeWith
            co.resume(Unit)
            
            // 协程继续运行完继续运行
            println(6)
        }
        println(3)
    }
    
    // 挂起恢复之后
    println(5)
}

输出顺序为 123456,调用 suspendCoroutine 后,会执行传入的方法体,方法体的参数会传入一个可控制协程恢复的变量,然后当前协程会被挂起,这里开了一个新的线程,线程中会调用 co.resume 方法恢复协程运行。除此之外,suspendCoroutine 方法还具有返回值,并且返回值就是 co.resume 的参数,如之后的代码。

1.3 挂起点

还记得我们刚刚说的 Continuation 对象,这里指的挂起点,指的就是该对象,而我们调用 suspendCoroutine 传入的方法体的参数也是该对象,结合刚刚的协程的挂起,这就是 kotlin coroutines 的最核心的东西。让我们改一下刚刚的代码,打印一下当前线程:这里删去注释

suspend fun main(){
    println("1 ${Thread.currentThread()}")
    val i = suspendCoroutine<Int> { co: Continuation<Int> ->
        println("2 ${Thread.currentThread()}")
        thread{
            println("4 ${Thread.currentThread()}")
            Thread.sleep(1000)
            co.resume(15)
            println("6 ${Thread.currentThread()}")
        }
        println("3 ${Thread.currentThread()}")
    }
    println("5 ${Thread.currentThread()} $i")
}

以下是打印结果:

1 Thread[main,5,main]
2 Thread[main,5,main]
3 Thread[main,5,main]
4 Thread[Thread-0,5,main]
5 Thread[Thread-0,5,main] 15
6 Thread[Thread-0,5,main]

可以注意到输出 5,虽然写在 Thread 外面但是执行其代码的线程却是 Thread-0,也就是和调用 resume 相同的线程。

因此,与其说挂起点挂起的是协程,不如说挂起的是方法,将某个方法体执行一半之后挂起,然后当任何地方调用挂起点的恢复方法后,方法体会在调用恢复的所在线程继续执行,并且挂起方法的返回值就是恢复方法的参数。

让我们再看一个代码:

suspend fun getFromInternet() : Int{
    return suspendCoroutine {
        thread {
            // 耗时操作
            Thread.sleep(1000)
            it.resume(10)
        }
    }
}

suspend fun main(){
    println("${Thread.currentThread()}")
    val i = getFromInternet()
    println("${Thread.currentThread()} $i")
}

来看看打印结果:

Thread[main,5,main]
Thread[Thread-0,5,main] 10

可以看到在调用 getFromInnternet 之后,执行的线程发生了变化,如果使用传统的回调,我们需要这么写:

fun getFromInternet(callback: (Int) -> Unit){
   thread {
       Thread.sleep(1000)
       callback(10)
   }
}

fun main(){
    println("${Thread.currentThread()}")
    getFromInternet { i ->
        println("${Thread.currentThread()} $i")
    }
}

这些代码打印结果是一样的,不过同步我们需要一层回调,这也是协程可以消除回调地狱的原因。

1.4 CPS 变换

CPS 变换是通过传递 Continuation 来控制异步调用流程的,实际上,也正如我们刚刚的演示,suspendCoroutine 需要在协程中才能调用,也就是我们需要再 suspend 方法中调用(包括 suspend 方法体)。再退一步,普通的方法并无法调用 suspend 的方法,接下来我们在 java 中看看 kotlin 中的 suspend 方法:

suspend fun getFromInternet() : Int{
    return suspendCoroutine {
        thread {
            // 耗时操作
            Thread.sleep(1000)
            it.resume(10)
        }
    }
}

然后我们在 java 中调用:

public static void main(String[] args){
        Object o = TopKt.getFromInternet(new Continuation<Integer>() {
            @NotNull
            @Override
            public CoroutineContext getContext() {
                return EmptyCoroutineContext.INSTANCE;
            }

            @Override
            public void resumeWith(@NotNull Object o) {

            }
        });

    }

可以看到 suspend 默认就有一个参数需要传入一个挂起点,同时方法也有返回值。在 java 中该类型无法确定直接为 Object,这里采用 Kotlin 反射来执行:

suspend fun getFromInternet() : Int{
    return suspendCoroutine {
        thread {
            // 耗时操作
            Thread.sleep(1000)
            it.resume(10)
        }
    }
}

fun main(){
    val method = ::getFromInternet
    val result: Any = method.call(object: Continuation<Int> {
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            println("2 ${Thread.currentThread()} ${result.getOrNull()}")
        }
    })
    println("1 ${Thread.currentThread()} $result")
}

看一下输出:

1 Thread[main,5,main] COROUTINE_SUSPENDED
2 Thread[Thread-0,5,main] 10

可以看到直接返回的地方实际上还是主线程,然后返回了一个 OROUTINE_SUSPENDED 标记,并在回调中将结果返回,这里 OROUTINE_SUSPENDED 是一个常量,定义在 Intrinsics.kt 中

public val COROUTINE_SUSPENDED: Any
	get() = CoroutineSingletons.COROUTINE_SUSPENDED

internal enum class CoroutineSingletons{
    COROUTINE_SUSPENDED, UNDECIDED, RESUMED
}

我们做一下小小的改变,在 getFromInternet() 中不开新线程:

suspend fun getFromInternet() : Int{
    return suspendCoroutine {
        Thread.sleep(1000)
        it.resume(10)
    }
}

fun main(){
    val method = ::getFromInternet
    val result: Any = method.call(object: Continuation<Int> {
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            println("2 ${Thread.currentThread()} ${result.getOrNull()}")
        }
    })
    println("1 ${Thread.currentThread()} $result")
}

结果:

1 Thread[main,5,main] 10

可以看到直接同步输出了结果。

分点总结:

  • suspend 方法实际上是一个语法糖,调用放需要传入一个挂起点 Continuation
  • 当 suspend 方法中没有再次挂起(没有调用 suspendCoroutine 或调后直接在同个调用栈直接唤醒)时,结果直接同步返回,否则将结果传入传入的挂起点,同时返回一个 COROUTINE_SUSPENDED 标记
  • 当 suspend 方法调用 suspend 后,剩下的代码会自动根据情况放到回调中还是同步代码中,同时也会自动进行 挂起点 Continuation 的传输和唤醒,这就是 cps 变换。

以上就是协程的基础设施,值得注意的是,我们上面的 挂起点 Continuation 除了 resumeWith 方法,还有一个 context 的变量,下一章将具体分析一下协程的上下文 CoroutineContext 。