三 简单协程设计

简单协程设计
这是协程系列第三篇,目录:
Kotlin coroutines 协程 学习笔记

本章来设计一些简单协程的小工具,帮助我们更好的理解协程的基础设施,当然这些许多在库中都有提供,将在下一章开始分析复杂协程设计,在之后开始分析库中自带的一些协程工具。

线程分发器 Dispatcher

以下是我们的目标:

fun main(){
    launch(Dispatchers.SINGLE){
        println("1 ${Thread.currentThread()}")

        dispatch(Dispatchers.NEW_THREAD)
        println("2 ${Thread.currentThread()}")

        dispatch(Dispatchers.NEW_THREAD)
        println("3 ${Thread.currentThread()}")

        val result = await(Dispatchers.NEW_THREAD){
            println("4 ${Thread.currentThread()}")
            3
        }

        println("5 ${Thread.currentThread()} $result")
    }
}

当我们调用 launch 时启动协程,然后再协程中提供 dispatch 方法分发以下的代码,以下是输出:

1 Thread[pool-2-thread-1,5,main]
2 Thread[Thread-0,5,main]
3 Thread[Thread-1,5,main]
4 Thread[Thread-2,5,main]
5 Thread[Thread-3,5,main] 3

这里的 dispatch 和切换线程的意思有点区别,指的是重新分发之后的代码,再讲一下效果之前,我们先来看看我们实现的 Dispatcher:

interface Dispatcher {
    fun dispatcher (block: ()->Unit)
}

就是一个接口,然后我们写几个默认实现,放到单例模式中:

object Dispatchers {
    
    // 不进行任何分发,直接执行
    val SYNC: Dispatcher = object: Dispatcher{
        override fun dispatcher(block: () -> Unit) {
            block()
        }
        override fun toString(): String {
            return "SYNC"
        }
    }

    // 将之后的代码分发到新线程
    val NEW_THREAD: Dispatcher = object: Dispatcher{
        override fun dispatcher(block: () -> Unit) {
            thread {
                block()
            }
        }
        override fun toString(): String {
            return "NEW_THREAD"
        }
    }

    // 将之后的代码分发到线程池
    private val executor = Executors.newFixedThreadPool(5)
    val IO : Dispatcher = object : Dispatcher{
        override fun dispatcher(block: () -> Unit) {
            executor.execute(block)
        }
        override fun toString(): String {
            return "IO"
        }
    }

    // 将之后的代码分发到单线程的线程池(保证串行化)
    private val singleThread = Executors.newSingleThreadExecutor()
    val SINGLE : Dispatcher = object : Dispatcher {
        override fun dispatcher(block: () -> Unit) {
            singleThread.execute(block)
        }

        override fun toString(): String {
            return "SINGLE"
        }
    }

}

然后我们来看我们的最终效果,来简化一下:

fun main(){
    launch(Dispatchers.SINGLE){
        
        // 将这一行以下的代码分发到新线程
        dispatch(Dispatchers.NEW_THREAD)
        println("1 ${Thread.currentThread()}")

        // 在 新线程中执行代码块中代码,然后将结果返回,在使用原分发器分发后面的代码
        val result = await(Dispatchers.NEW_THREAD){
            3
        }

        println("2 ${Thread.currentThread()} $result")
    }
}

相当于以下代码:

fun main(){
    Disaptchers.SINGLE.dispatch {
        // 默认使用 单线程 线程池分发
        
        Dispatchers.NEW_THREAD.dispatch {
            // 使用 dispatch 进行 新线程分发
            println("1 ${Thread.currentThread()}")
            
            // await 分发到新线程,然后之后使用原分发器再次分发 
            Dispatchers.NEW_THREAD.dispatch {
                val result = 3
                // await 执行结束,使用原线程分发器分发剩下代码,注,不是回到之前的线程
                // 当然如果是单线程池的分发会回到原线程,但此时 runnable 已经切换
                // 原分发器指的是之前切换过的最后一个分发器
                Dispatcher.NEW_THREAD.dispatch {
                    println("2 ${Thread.currentThread()} $result")
                }
            }
        }
    }
}

在这里我们可以清楚的看见协程在阻止回调地狱下的绝妙,他不是阻塞模型,例如在 await 的时候,不会阻塞原线程,而是在结束后通过分发器再次分发到同一个分发器,在安卓的 handler loop 机制中有很大的作用,可以实现俗称 回到主线程 的功能。

我们来实现一下该效果,首先是 launch 方法,要传入一个分发器和 方法块:

fun launch(
    dispatcher: Dispatcher = Dispatchers.SYNC,
    block: suspend CoroutineScope.()->Unit,
){

然后我们需要再分发器中执行该代码块,注意该代码块是 suspend 修饰,我们需要在分发器中先开一个协程然后再调用 block :

fun launch(
    dispatcher: Dispatcher = Dispatchers.SYNC,
    // 可以注意到代码块的 receiver 是 CoroutineScope 也就是我们需要实现的协程体 receiver 对象
    block: suspend CoroutineScope.()->Unit,
){
    dispatcher.dispatcher {
        // 分发器中启动协程
        block.startCoroutine(CoroutineScope(dispatcher), object: Continuation<Unit>{
            override val context: CoroutineContext
            get() = EmptyCoroutineContext

            override fun resumeWith(result: Result<Unit>) {
                // 有异常直接抛
                result.getOrThrow()
            }
        })
    }

}

然后是 CoroutineScope 对象,首先它必须有一个成员变量储存当前的分发器,其次他需要提供 await 方法和 dispatche 方法:

class CoroutineScope(@Volatile private var rootDispatcher: Dispatcher){
    suspend fun <T> await(dispatcher: Dispatcher, block: ()->T): T {
        // 挂起当前协程(不会阻塞当前线程)
        return suspendCoroutine<T> {
            // 使用新的分发器执行 block 的数据并获取结果
            dispatcher.dispatcher {
                val result = block()
                
                // 回到原分发器中调用 resume 方法,这将会使协程之后的代码使用 原分发器 再次分发
                rootDispatcher.dispatcher {
                    it.resume(result)
                }
            }
        }
    }

    suspend fun dispatch(dispatcher: Dispatcher) = suspendCoroutine<Unit> { // 挂起
        // 切换分发器
        dispatcher.dispatcher {
            // 切换分发器
            rootDispatcher = dispatcher
            
            // 在新的分发器中调用 resume 方法
            it.resume(Unit)
        }
    }
}

至此,我们实现了一个很简单的分发器协程,可喜可贺,这里重点在 suspendCoroutine 方法的使用,会将当前协程阻塞,执行代码块后挂起,在调用 resume 方法时将传入的结果作为方法返回值继续执行,并且和 resume 处于同一个线程中。

实际上安卓协程中的 withContext 也是类似的设计,相当于 await 方法 。

Lua 风格的异步

这个是 《深入理解 Kotlin 协程》中的一个例子,我这里讲一下该例子,然后再使用的时候通过再不同的线程运行来查看效果,帮助我们搞懂协程和线程的区别,通过该例子,我们我们可以更深入的理解协程,或者说这个库中的协程,以下是结论(个人感性分析,只是为了方便理解,不具有严密性),先放出,之后会继续放出:

协程是一个 suspend 代码块,在执行到后某条指令之后我们可以将该代码块挂起,然后生成一个挂起点,在特定时候继续执行。在此之间处理数据交互和异常处理。

这是目标:

suspend fun main(){
    val p = Coroutine.create<Int> {
        var n = 1
        while (n < 5){
            println("1 ${Thread.currentThread()}")
            // 每次调用 yield 之后都挂起当前协程,然后将参数作为 之后 resume 返回值
            yield(n ++)
        }
        println("1 ${Thread.currentThread()}")
        // 协程返回的最后一个值不用调用 yield ,直接返回,然后协程结束
        3 // 需要返回最后一个值
    }

    while(p.isActive){ // 当调用 isActive 判断协程是否执行完毕
        val r = p.resume() // 使用 resume 之后会挂起当前协程,之前的协程会继续执行,然后等待协程中执行 yield 方法或执行完毕最后一个值
        println("2 ${Thread.currentThread()} $r")
    }
}

结果:

1 Thread[main,5,main]		# 协程体中
2 Thread[main,5,main] 1		# 协程体外与当前结果
1 Thread[main,5,main]
2 Thread[main,5,main] 2
1 Thread[main,5,main]
2 Thread[main,5,main] 3
1 Thread[main,5,main]
2 Thread[main,5,main] 4
1 Thread[main,5,main]
2 Thread[main,5,main] 3		# 循环后整个协程最后一个值是 3

可以看到这里运行的是同一个线程,然后实现了方法体的交替运行,接下来我们简化一下代码,将 resume 中加入线程调度,这里直接用上之前的分发器,实际上没区别:

suspend fun main(){
    val p = Coroutine.create<Int> {
        var n = 1
        while (n < 5){
            println("1 ${Thread.currentThread()}")
            // 每次调用 yield 之后都挂起当前协程,然后将参数作为 之后 resume 返回值
            yield(n ++)
        }
        println("1 ${Thread.currentThread()}")
        // 协程返回的最后一个值不用调用 yield ,直接返回,然后协程结束
        3 // 需要返回最后一个值
    }

    // 直接使用之前的切线程工具,避免重复创建协程
    launch (Dispatchers.NEW_THREAD){
        // 新线程
        var result = p.resume()
        println("2 ${Thread.currentThread()} $result")

        // IO 线程池
        dispatch(Dispatchers.IO)
        result = p.resume()
        println("2 ${Thread.currentThread()} $result")
    }
}

我们这里使用了之前的切线程协程工具,避免在分发器中重复创建协程,然后我们在不同线程中执行 p.resume() 方法,我们会发现,协程体中之后的代码在哪里执行,取决于 p.resume 执行所在的线程,运行到 yield 或者协程体执行完后在回到该代码中并作为 resume 的结果。

1 Thread[Thread-0,5,main] # yield 执行之前的代码跟随 resume 方法执行所在线程,这里是分发到新线程执行
2 Thread[Thread-0,5,main] 1
1 Thread[pool-1-thread-1,5,main]
2 Thread[pool-1-thread-1,5,main] 2

在这一个工具中,我们将会深入理解挂起点和 cps 变换,首先我们来分析挂起点:

挂起点

首先分析该工具中有几个挂起代码块,实际上是两个,首先我们调用 Coroutine.create 会传入一个 suspend 代码块,其次,我们的 p.resume 方法也需要在 suspend 代码块中执行,这里递归调用后总会回到一个 suspend 代码块,而挂起点就是在一个代码块中,执行到一半将代码块挂起,作为一个挂起点,在其他地方调用该挂起点的 resume 方法后,挂起代码块会继续执行,并且在 resume 通个线程执行。因此这里会产生两种挂起点,并且交替执行另一个挂起点的 resume 方法(内部的),并挂起自己。

CPS 变换

两种挂起点交替运行对方,这里需要一个类来保持当前的挂起点或者说状态,然后进行一些状态转换,CPS 变换就是通过不同挂起点的相互调用和挂起,来达到切换运行的挂起代码块的目的,因此我们需要实现一个类来持有这些数据,包括我们调用 create 传入的代码块(外面调用 resume 的代码块不用持有也无法持有)以及 代码块运行的 receiver (负责提供 yield 方法)。我们可以使用状态机的概念来设计这个类,不同的状态会持有不同的挂起点,或者刚创建或已经运行完毕时不持有挂起点。

为了方便,以后将 create 中传入的 挂起代码块称为 Yielded 挂起代码块,调用 resume 的代码块称为 Resume 挂起代码块,包括其对应挂起点。

状态机

以下是该协程的状态,这里使用密封类来方便 when 遍历

sealed class Status {
    class Created(val continuation: Continuation<Unit>) : Status()
    class Yielded(val continuation: Continuation<Unit>) : Status()
    class Resumed<R>(val continuation: Continuation<R>) : Status()
    object Dead : Status()
}

以下是状态转移图:

image20211113234757111.png

四个状态还是很清晰明了的,其中 Resumed 和 Yield 状态分别代表当前协程持有这两个 挂起代码块 的挂起点,调用对应的方法会挂起当前代码块形成挂起点,切换状态,并将原状态挂起点恢复运行。

设计

开始最终设计了,首先是 receiver 的接口:

interface CoroutineScope<R> {
    suspend fun yield(value: R)
}

然后是我们要实现的这个对象,首先这个对象必须要有一个状态的变量,还需要持有对应代码块,还需要持有一个 Scope ,这里为了防止在 resume 代码块中调用 yield 方法,以内部类的形式持有:

class Coroutine<R> (
    override val context: CoroutineContext = EmptyCoroutineContext,
    private val block: suspend CoroutineScope<R>.() -> R
): Continuation<R>{
    
    private val scope = object: CoroutineScope<R>{
        // 挂起当前协程
        override suspend fun yield(value: R): Unit = suspendCoroutine { con ->
            // 状态转移
            val previousStatus = status.getAndUpdate {
                // gau 操作
                when(it) {
                    is Status.Created -> throw IllegalStateException("Never started!")
                    is Status.Yielded -> throw IllegalStateException("Already yielded!")
                    // 只有 Resumed 状态才能调用 yield 方法,然后创建新的挂起点放入新的状态
                    is Status.Resumed<*> -> Status.Yielded(con)
                    Status.Dead -> throw IllegalStateException("Already dead!")
                }
            }
            // 恢复旧挂起点的运行
            (previousStatus as? Status.Resumed<R>)?.continuation?.resume(value)
            Unit
        }
    }


    // 使用原子类防止并发冲突
    private val status: AtomicReference<Status>

    // 判断是否死亡
    val isActive: Boolean
        get() = status.get() != Status.Dead

    init {
        // 创建协程
        val coroutineBlock: suspend CoroutineScope<R>.() -> R = { block() }
        // 这里传入的回调是 this
        val start = coroutineBlock.createCoroutine(receiver = scope, completion = this)
        status = AtomicReference(Status.Created(start))
    }
    
    // 表示传入的 Yield 代码块运行完毕(在 createCoroutine 中传入的是 this)
    // result 包装了 Yield 代码块的返回值
    override fun resumeWith(result: Result<R>) {
        val previousStatus = status.getAndUpdate {
            when(it) {
                is Status.Created -> throw IllegalStateException("Never started!")
                is Status.Yielded -> throw IllegalStateException("Already yielded!")
                is Status.Resumed<*> -> { // 状态转移为死亡
                    Status.Dead
                }
                Status.Dead -> throw IllegalStateException("Already dead!")
            }
        }
        // 恢复挂起点,最后一次恢复
        (previousStatus as? Status.Resumed<R>)?.continuation?.resumeWith(result)
    }
    
    // 由 Resume 代码块调用的 方法,获取  Yield 代码块中给出的下一个值
    // 直接挂起
    suspend fun resume(): R = suspendCoroutine { continuation ->
        	// 状态转移
            val previousStatus = status.getAndUpdate {
            when(it) {
                // 创建状态会切换到 resumed 状态
                is Status.Created -> {
                    Status.Resumed(continuation)
                }
                is Status.Yielded -> {
                    Status.Resumed(continuation)
                }
                is Status.Resumed<*> -> throw IllegalStateException("Already resumed!")
                Status.Dead -> throw IllegalStateException("Already dead!")
            }
        }

        // 有两个状态需要恢复挂起点的运行
        when(previousStatus){
            is Status.Created -> previousStatus.continuation.resume(Unit)
            is Status.Yielded -> previousStatus.continuation.resume(Unit)
            else ->  throw IllegalStateException("unknown error")
        }
    }
    
}

可以说在之前的铺垫之后,我们能更加方便的看懂源码,至此,我们明确了协程的基本概念和用法,下一章将会进行复杂协程设计,包括回调,取消等高级操作。

再给出之前的结论,看看有无更加深刻理解协程:

协程是一个 suspend 代码块,在执行到后某条指令之后我们可以将该代码块挂起,然后生成一个挂起点,在特定时候继续执行。在此之间处理数据交互和异常处理。