一 协程的基础设施
这是协程系列第一篇,目录:
Kotlin coroutines 协程 学习笔记
协程的基础设施
1.1 协程的创建
首先先来用最简单的代码创建一个最基础的协程:
fun main(){
// s 为一个 Continuation 类型的对象
val s = suspend { // suspend 方法体
// 协程执行的内容
println(2)
1 // 返回值
}.createCoroutine(object : Continuation<Int> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
println(3)
}
})
println(1)
// 当调用 Continuation 的 resume 方法时开始执行协程
s.resume(Unit)
println(4)
}
打印结果为 1234,这里还没有发挥出协程的任何作用,我们注意到 s 是一个 Continuation 类型的对象,我们先来看看该类:
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
可以看到就是一个简单的接口,有一个 context 作为协程上下文,这个之后会说明,以及一个 resumeWith 方法,当我们一个协程挂起的时候,可以通过调用该方法继续执行该协程并传入结果。
kotlin coroutines 这个库的设计思路和线程不一致,在 Java 的线程模型中,有一个类 Thread 与其对应,我们可以较方便的使用和学习线程。但是协程中并没有提供一个类型,而是提供了一些基础设施,我们可以用这些基础设施设计出自己的异步程序,在之后的学习中会更加体现这一点。
关于 result 就是一个简单的实体类,存放结果与异常,以及是否运行成功的标记,具体 api 这里不给出。
当我们调用 createCoroutine 之后,会返回一个 Continuation 对象,我们需要调用其 resumeWith 方法才会执行,一般我们创建协程都是直接执行,我们可以调用 startCoroutine 方法 。
此外,以上 api 还有方法重载,可以传入协程体的 reveiver:
fun <R, T> (suspend R.()-> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit>
fun <R, T> (suspend R.()-> T).startCoroutine(
receiver: R,
completion: Continuation<T>
)
1.2 协程的挂起
除了创建的方法,kotlin coroutines 还提供了一个挂起当前协程的基础方法,suspendCoroutine ,该方法是一个 suspend 方法,因此必须在另一个 suspend 方法体中调用,这里为了方便编码,我们将 main 方法加入 suspend 关键字。
首先我们先明白带 suspend 的 main 函数,在 1.3 版本之后,kotlin 支持带 suspend 的 main 方法。其效果可以等价于以下代码(实际上不一样,只不过效果等价)
fun main(){
::suspendMain.startCoroutine(object: Continuation<Unit> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
result.getOrThrow()
}
})
}
suspend fun suspendMain(){
// 协程中执行
}
// 等价 =======================================================
suspend fun main(){
// 协程中执行
}
接下来介绍 suspendCoroutine 方法 我们先来快看以下代码:
suspend fun main(){
// 协程开始执行
println(1)
// 挂起自己
suspendCoroutine<Unit> { co: Continuation<Unit> -> // 可控制协程恢复的变量
// 挂起之前要执行的代码
println(2)
// 开异步线程
thread{
// 休眠
println(4)
Thread.sleep(1000)
// 恢复 协程运行
// 是一个拓展方法,直接用成功的 Result 包装传入的内容并调用 resumeWith
co.resume(Unit)
// 协程继续运行完继续运行
println(6)
}
println(3)
}
// 挂起恢复之后
println(5)
}
输出顺序为 123456,调用 suspendCoroutine 后,会执行传入的方法体,方法体的参数会传入一个可控制协程恢复的变量,然后当前协程会被挂起,这里开了一个新的线程,线程中会调用 co.resume 方法恢复协程运行。除此之外,suspendCoroutine 方法还具有返回值,并且返回值就是 co.resume 的参数,如之后的代码。
1.3 挂起点
还记得我们刚刚说的 Continuation 对象,这里指的挂起点,指的就是该对象,而我们调用 suspendCoroutine 传入的方法体的参数也是该对象,结合刚刚的协程的挂起,这就是 kotlin coroutines 的最核心的东西。让我们改一下刚刚的代码,打印一下当前线程:这里删去注释
suspend fun main(){
println("1 ${Thread.currentThread()}")
val i = suspendCoroutine<Int> { co: Continuation<Int> ->
println("2 ${Thread.currentThread()}")
thread{
println("4 ${Thread.currentThread()}")
Thread.sleep(1000)
co.resume(15)
println("6 ${Thread.currentThread()}")
}
println("3 ${Thread.currentThread()}")
}
println("5 ${Thread.currentThread()} $i")
}
以下是打印结果:
1 Thread[main,5,main]
2 Thread[main,5,main]
3 Thread[main,5,main]
4 Thread[Thread-0,5,main]
5 Thread[Thread-0,5,main] 15
6 Thread[Thread-0,5,main]
可以注意到输出 5,虽然写在 Thread 外面但是执行其代码的线程却是 Thread-0,也就是和调用 resume 相同的线程。
因此,与其说挂起点挂起的是协程,不如说挂起的是方法,将某个方法体执行一半之后挂起,然后当任何地方调用挂起点的恢复方法后,方法体会在调用恢复的所在线程继续执行,并且挂起方法的返回值就是恢复方法的参数。
让我们再看一个代码:
suspend fun getFromInternet() : Int{
return suspendCoroutine {
thread {
// 耗时操作
Thread.sleep(1000)
it.resume(10)
}
}
}
suspend fun main(){
println("${Thread.currentThread()}")
val i = getFromInternet()
println("${Thread.currentThread()} $i")
}
来看看打印结果:
Thread[main,5,main]
Thread[Thread-0,5,main] 10
可以看到在调用 getFromInnternet 之后,执行的线程发生了变化,如果使用传统的回调,我们需要这么写:
fun getFromInternet(callback: (Int) -> Unit){
thread {
Thread.sleep(1000)
callback(10)
}
}
fun main(){
println("${Thread.currentThread()}")
getFromInternet { i ->
println("${Thread.currentThread()} $i")
}
}
这些代码打印结果是一样的,不过同步我们需要一层回调,这也是协程可以消除回调地狱的原因。
1.4 CPS 变换
CPS 变换是通过传递 Continuation 来控制异步调用流程的,实际上,也正如我们刚刚的演示,suspendCoroutine 需要在协程中才能调用,也就是我们需要再 suspend 方法中调用(包括 suspend 方法体)。再退一步,普通的方法并无法调用 suspend 的方法,接下来我们在 java 中看看 kotlin 中的 suspend 方法:
suspend fun getFromInternet() : Int{
return suspendCoroutine {
thread {
// 耗时操作
Thread.sleep(1000)
it.resume(10)
}
}
}
然后我们在 java 中调用:
public static void main(String[] args){
Object o = TopKt.getFromInternet(new Continuation<Integer>() {
@NotNull
@Override
public CoroutineContext getContext() {
return EmptyCoroutineContext.INSTANCE;
}
@Override
public void resumeWith(@NotNull Object o) {
}
});
}
可以看到 suspend 默认就有一个参数需要传入一个挂起点,同时方法也有返回值。在 java 中该类型无法确定直接为 Object,这里采用 Kotlin 反射来执行:
suspend fun getFromInternet() : Int{
return suspendCoroutine {
thread {
// 耗时操作
Thread.sleep(1000)
it.resume(10)
}
}
}
fun main(){
val method = ::getFromInternet
val result: Any = method.call(object: Continuation<Int> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
println("2 ${Thread.currentThread()} ${result.getOrNull()}")
}
})
println("1 ${Thread.currentThread()} $result")
}
看一下输出:
1 Thread[main,5,main] COROUTINE_SUSPENDED
2 Thread[Thread-0,5,main] 10
可以看到直接返回的地方实际上还是主线程,然后返回了一个 OROUTINE_SUSPENDED 标记,并在回调中将结果返回,这里 OROUTINE_SUSPENDED 是一个常量,定义在 Intrinsics.kt 中
public val COROUTINE_SUSPENDED: Any
get() = CoroutineSingletons.COROUTINE_SUSPENDED
internal enum class CoroutineSingletons{
COROUTINE_SUSPENDED, UNDECIDED, RESUMED
}
我们做一下小小的改变,在 getFromInternet() 中不开新线程:
suspend fun getFromInternet() : Int{
return suspendCoroutine {
Thread.sleep(1000)
it.resume(10)
}
}
fun main(){
val method = ::getFromInternet
val result: Any = method.call(object: Continuation<Int> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
println("2 ${Thread.currentThread()} ${result.getOrNull()}")
}
})
println("1 ${Thread.currentThread()} $result")
}
结果:
1 Thread[main,5,main] 10
可以看到直接同步输出了结果。
分点总结:
- suspend 方法实际上是一个语法糖,调用放需要传入一个挂起点 Continuation
- 当 suspend 方法中没有再次挂起(没有调用 suspendCoroutine 或调后直接在同个调用栈直接唤醒)时,结果直接同步返回,否则将结果传入传入的挂起点,同时返回一个 COROUTINE_SUSPENDED 标记
- 当 suspend 方法调用 suspend 后,剩下的代码会自动根据情况放到回调中还是同步代码中,同时也会自动进行 挂起点 Continuation 的传输和唤醒,这就是 cps 变换。
以上就是协程的基础设施,值得注意的是,我们上面的 挂起点 Continuation 除了 resumeWith 方法,还有一个 context 的变量,下一章将具体分析一下协程的上下文 CoroutineContext 。