三 简单协程设计
简单协程设计
这是协程系列第三篇,目录:
Kotlin coroutines 协程 学习笔记
本章来设计一些简单协程的小工具,帮助我们更好的理解协程的基础设施,当然这些许多在库中都有提供,将在下一章开始分析复杂协程设计,在之后开始分析库中自带的一些协程工具。
线程分发器 Dispatcher
以下是我们的目标:
fun main(){
launch(Dispatchers.SINGLE){
println("1 ${Thread.currentThread()}")
dispatch(Dispatchers.NEW_THREAD)
println("2 ${Thread.currentThread()}")
dispatch(Dispatchers.NEW_THREAD)
println("3 ${Thread.currentThread()}")
val result = await(Dispatchers.NEW_THREAD){
println("4 ${Thread.currentThread()}")
3
}
println("5 ${Thread.currentThread()} $result")
}
}
当我们调用 launch 时启动协程,然后再协程中提供 dispatch 方法分发以下的代码,以下是输出:
1 Thread[pool-2-thread-1,5,main]
2 Thread[Thread-0,5,main]
3 Thread[Thread-1,5,main]
4 Thread[Thread-2,5,main]
5 Thread[Thread-3,5,main] 3
这里的 dispatch 和切换线程的意思有点区别,指的是重新分发之后的代码,再讲一下效果之前,我们先来看看我们实现的 Dispatcher:
interface Dispatcher {
fun dispatcher (block: ()->Unit)
}
就是一个接口,然后我们写几个默认实现,放到单例模式中:
object Dispatchers {
// 不进行任何分发,直接执行
val SYNC: Dispatcher = object: Dispatcher{
override fun dispatcher(block: () -> Unit) {
block()
}
override fun toString(): String {
return "SYNC"
}
}
// 将之后的代码分发到新线程
val NEW_THREAD: Dispatcher = object: Dispatcher{
override fun dispatcher(block: () -> Unit) {
thread {
block()
}
}
override fun toString(): String {
return "NEW_THREAD"
}
}
// 将之后的代码分发到线程池
private val executor = Executors.newFixedThreadPool(5)
val IO : Dispatcher = object : Dispatcher{
override fun dispatcher(block: () -> Unit) {
executor.execute(block)
}
override fun toString(): String {
return "IO"
}
}
// 将之后的代码分发到单线程的线程池(保证串行化)
private val singleThread = Executors.newSingleThreadExecutor()
val SINGLE : Dispatcher = object : Dispatcher {
override fun dispatcher(block: () -> Unit) {
singleThread.execute(block)
}
override fun toString(): String {
return "SINGLE"
}
}
}
然后我们来看我们的最终效果,来简化一下:
fun main(){
launch(Dispatchers.SINGLE){
// 将这一行以下的代码分发到新线程
dispatch(Dispatchers.NEW_THREAD)
println("1 ${Thread.currentThread()}")
// 在 新线程中执行代码块中代码,然后将结果返回,在使用原分发器分发后面的代码
val result = await(Dispatchers.NEW_THREAD){
3
}
println("2 ${Thread.currentThread()} $result")
}
}
相当于以下代码:
fun main(){
Disaptchers.SINGLE.dispatch {
// 默认使用 单线程 线程池分发
Dispatchers.NEW_THREAD.dispatch {
// 使用 dispatch 进行 新线程分发
println("1 ${Thread.currentThread()}")
// await 分发到新线程,然后之后使用原分发器再次分发
Dispatchers.NEW_THREAD.dispatch {
val result = 3
// await 执行结束,使用原线程分发器分发剩下代码,注,不是回到之前的线程
// 当然如果是单线程池的分发会回到原线程,但此时 runnable 已经切换
// 原分发器指的是之前切换过的最后一个分发器
Dispatcher.NEW_THREAD.dispatch {
println("2 ${Thread.currentThread()} $result")
}
}
}
}
}
在这里我们可以清楚的看见协程在阻止回调地狱下的绝妙,他不是阻塞模型,例如在 await 的时候,不会阻塞原线程,而是在结束后通过分发器再次分发到同一个分发器,在安卓的 handler loop 机制中有很大的作用,可以实现俗称 回到主线程 的功能。
我们来实现一下该效果,首先是 launch 方法,要传入一个分发器和 方法块:
fun launch(
dispatcher: Dispatcher = Dispatchers.SYNC,
block: suspend CoroutineScope.()->Unit,
){
然后我们需要再分发器中执行该代码块,注意该代码块是 suspend 修饰,我们需要在分发器中先开一个协程然后再调用 block :
fun launch(
dispatcher: Dispatcher = Dispatchers.SYNC,
// 可以注意到代码块的 receiver 是 CoroutineScope 也就是我们需要实现的协程体 receiver 对象
block: suspend CoroutineScope.()->Unit,
){
dispatcher.dispatcher {
// 分发器中启动协程
block.startCoroutine(CoroutineScope(dispatcher), object: Continuation<Unit>{
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
// 有异常直接抛
result.getOrThrow()
}
})
}
}
然后是 CoroutineScope 对象,首先它必须有一个成员变量储存当前的分发器,其次他需要提供 await 方法和 dispatche 方法:
class CoroutineScope(@Volatile private var rootDispatcher: Dispatcher){
suspend fun <T> await(dispatcher: Dispatcher, block: ()->T): T {
// 挂起当前协程(不会阻塞当前线程)
return suspendCoroutine<T> {
// 使用新的分发器执行 block 的数据并获取结果
dispatcher.dispatcher {
val result = block()
// 回到原分发器中调用 resume 方法,这将会使协程之后的代码使用 原分发器 再次分发
rootDispatcher.dispatcher {
it.resume(result)
}
}
}
}
suspend fun dispatch(dispatcher: Dispatcher) = suspendCoroutine<Unit> { // 挂起
// 切换分发器
dispatcher.dispatcher {
// 切换分发器
rootDispatcher = dispatcher
// 在新的分发器中调用 resume 方法
it.resume(Unit)
}
}
}
至此,我们实现了一个很简单的分发器协程,可喜可贺,这里重点在 suspendCoroutine 方法的使用,会将当前协程阻塞,执行代码块后挂起,在调用 resume 方法时将传入的结果作为方法返回值继续执行,并且和 resume 处于同一个线程中。
实际上安卓协程中的 withContext 也是类似的设计,相当于 await 方法 。
Lua 风格的异步
这个是 《深入理解 Kotlin 协程》中的一个例子,我这里讲一下该例子,然后再使用的时候通过再不同的线程运行来查看效果,帮助我们搞懂协程和线程的区别,通过该例子,我们我们可以更深入的理解协程,或者说这个库中的协程,以下是结论(个人感性分析,只是为了方便理解,不具有严密性),先放出,之后会继续放出:
协程是一个 suspend 代码块,在执行到后某条指令之后我们可以将该代码块挂起,然后生成一个挂起点,在特定时候继续执行。在此之间处理数据交互和异常处理。
这是目标:
suspend fun main(){
val p = Coroutine.create<Int> {
var n = 1
while (n < 5){
println("1 ${Thread.currentThread()}")
// 每次调用 yield 之后都挂起当前协程,然后将参数作为 之后 resume 返回值
yield(n ++)
}
println("1 ${Thread.currentThread()}")
// 协程返回的最后一个值不用调用 yield ,直接返回,然后协程结束
3 // 需要返回最后一个值
}
while(p.isActive){ // 当调用 isActive 判断协程是否执行完毕
val r = p.resume() // 使用 resume 之后会挂起当前协程,之前的协程会继续执行,然后等待协程中执行 yield 方法或执行完毕最后一个值
println("2 ${Thread.currentThread()} $r")
}
}
结果:
1 Thread[main,5,main] # 协程体中
2 Thread[main,5,main] 1 # 协程体外与当前结果
1 Thread[main,5,main]
2 Thread[main,5,main] 2
1 Thread[main,5,main]
2 Thread[main,5,main] 3
1 Thread[main,5,main]
2 Thread[main,5,main] 4
1 Thread[main,5,main]
2 Thread[main,5,main] 3 # 循环后整个协程最后一个值是 3
可以看到这里运行的是同一个线程,然后实现了方法体的交替运行,接下来我们简化一下代码,将 resume 中加入线程调度,这里直接用上之前的分发器,实际上没区别:
suspend fun main(){
val p = Coroutine.create<Int> {
var n = 1
while (n < 5){
println("1 ${Thread.currentThread()}")
// 每次调用 yield 之后都挂起当前协程,然后将参数作为 之后 resume 返回值
yield(n ++)
}
println("1 ${Thread.currentThread()}")
// 协程返回的最后一个值不用调用 yield ,直接返回,然后协程结束
3 // 需要返回最后一个值
}
// 直接使用之前的切线程工具,避免重复创建协程
launch (Dispatchers.NEW_THREAD){
// 新线程
var result = p.resume()
println("2 ${Thread.currentThread()} $result")
// IO 线程池
dispatch(Dispatchers.IO)
result = p.resume()
println("2 ${Thread.currentThread()} $result")
}
}
我们这里使用了之前的切线程协程工具,避免在分发器中重复创建协程,然后我们在不同线程中执行 p.resume() 方法,我们会发现,协程体中之后的代码在哪里执行,取决于 p.resume 执行所在的线程,运行到 yield 或者协程体执行完后在回到该代码中并作为 resume 的结果。
1 Thread[Thread-0,5,main] # yield 执行之前的代码跟随 resume 方法执行所在线程,这里是分发到新线程执行
2 Thread[Thread-0,5,main] 1
1 Thread[pool-1-thread-1,5,main]
2 Thread[pool-1-thread-1,5,main] 2
在这一个工具中,我们将会深入理解挂起点和 cps 变换,首先我们来分析挂起点:
挂起点
首先分析该工具中有几个挂起代码块,实际上是两个,首先我们调用 Coroutine.create 会传入一个 suspend 代码块,其次,我们的 p.resume 方法也需要在 suspend 代码块中执行,这里递归调用后总会回到一个 suspend 代码块,而挂起点就是在一个代码块中,执行到一半将代码块挂起,作为一个挂起点,在其他地方调用该挂起点的 resume 方法后,挂起代码块会继续执行,并且在 resume 通个线程执行。因此这里会产生两种挂起点,并且交替执行另一个挂起点的 resume 方法(内部的),并挂起自己。
CPS 变换
两种挂起点交替运行对方,这里需要一个类来保持当前的挂起点或者说状态,然后进行一些状态转换,CPS 变换就是通过不同挂起点的相互调用和挂起,来达到切换运行的挂起代码块的目的,因此我们需要实现一个类来持有这些数据,包括我们调用 create 传入的代码块(外面调用 resume 的代码块不用持有也无法持有)以及 代码块运行的 receiver (负责提供 yield 方法)。我们可以使用状态机的概念来设计这个类,不同的状态会持有不同的挂起点,或者刚创建或已经运行完毕时不持有挂起点。
为了方便,以后将 create 中传入的 挂起代码块称为 Yielded 挂起代码块,调用 resume 的代码块称为 Resume 挂起代码块,包括其对应挂起点。
状态机
以下是该协程的状态,这里使用密封类来方便 when 遍历
sealed class Status {
class Created(val continuation: Continuation<Unit>) : Status()
class Yielded(val continuation: Continuation<Unit>) : Status()
class Resumed<R>(val continuation: Continuation<R>) : Status()
object Dead : Status()
}
以下是状态转移图:
四个状态还是很清晰明了的,其中 Resumed 和 Yield 状态分别代表当前协程持有这两个 挂起代码块 的挂起点,调用对应的方法会挂起当前代码块形成挂起点,切换状态,并将原状态挂起点恢复运行。
设计
开始最终设计了,首先是 receiver 的接口:
interface CoroutineScope<R> {
suspend fun yield(value: R)
}
然后是我们要实现的这个对象,首先这个对象必须要有一个状态的变量,还需要持有对应代码块,还需要持有一个 Scope ,这里为了防止在 resume 代码块中调用 yield 方法,以内部类的形式持有:
class Coroutine<R> (
override val context: CoroutineContext = EmptyCoroutineContext,
private val block: suspend CoroutineScope<R>.() -> R
): Continuation<R>{
private val scope = object: CoroutineScope<R>{
// 挂起当前协程
override suspend fun yield(value: R): Unit = suspendCoroutine { con ->
// 状态转移
val previousStatus = status.getAndUpdate {
// gau 操作
when(it) {
is Status.Created -> throw IllegalStateException("Never started!")
is Status.Yielded -> throw IllegalStateException("Already yielded!")
// 只有 Resumed 状态才能调用 yield 方法,然后创建新的挂起点放入新的状态
is Status.Resumed<*> -> Status.Yielded(con)
Status.Dead -> throw IllegalStateException("Already dead!")
}
}
// 恢复旧挂起点的运行
(previousStatus as? Status.Resumed<R>)?.continuation?.resume(value)
Unit
}
}
// 使用原子类防止并发冲突
private val status: AtomicReference<Status>
// 判断是否死亡
val isActive: Boolean
get() = status.get() != Status.Dead
init {
// 创建协程
val coroutineBlock: suspend CoroutineScope<R>.() -> R = { block() }
// 这里传入的回调是 this
val start = coroutineBlock.createCoroutine(receiver = scope, completion = this)
status = AtomicReference(Status.Created(start))
}
// 表示传入的 Yield 代码块运行完毕(在 createCoroutine 中传入的是 this)
// result 包装了 Yield 代码块的返回值
override fun resumeWith(result: Result<R>) {
val previousStatus = status.getAndUpdate {
when(it) {
is Status.Created -> throw IllegalStateException("Never started!")
is Status.Yielded -> throw IllegalStateException("Already yielded!")
is Status.Resumed<*> -> { // 状态转移为死亡
Status.Dead
}
Status.Dead -> throw IllegalStateException("Already dead!")
}
}
// 恢复挂起点,最后一次恢复
(previousStatus as? Status.Resumed<R>)?.continuation?.resumeWith(result)
}
// 由 Resume 代码块调用的 方法,获取 Yield 代码块中给出的下一个值
// 直接挂起
suspend fun resume(): R = suspendCoroutine { continuation ->
// 状态转移
val previousStatus = status.getAndUpdate {
when(it) {
// 创建状态会切换到 resumed 状态
is Status.Created -> {
Status.Resumed(continuation)
}
is Status.Yielded -> {
Status.Resumed(continuation)
}
is Status.Resumed<*> -> throw IllegalStateException("Already resumed!")
Status.Dead -> throw IllegalStateException("Already dead!")
}
}
// 有两个状态需要恢复挂起点的运行
when(previousStatus){
is Status.Created -> previousStatus.continuation.resume(Unit)
is Status.Yielded -> previousStatus.continuation.resume(Unit)
else -> throw IllegalStateException("unknown error")
}
}
}
可以说在之前的铺垫之后,我们能更加方便的看懂源码,至此,我们明确了协程的基本概念和用法,下一章将会进行复杂协程设计,包括回调,取消等高级操作。
再给出之前的结论,看看有无更加深刻理解协程:
协程是一个 suspend 代码块,在执行到后某条指令之后我们可以将该代码块挂起,然后生成一个挂起点,在特定时候继续执行。在此之间处理数据交互和异常处理。