//yield是一个挂起函数 覆写GeneratorScope类的方法
override suspend fun yield(value: T) = suspendCoroutine {
continuation ->
println(“yield======== s t a t e . j a v a C l a s s . s i m p l e N a m e v a l u e = {state.javaClass.simpleName} value= state.javaClass.simpleNamevalue={value}”)
state = when(state) {
is State.NotReady -> State.Ready(continuation, value) //调用yield(xx)方法使状态进入Ready状态
is State.Ready<*> -> throw IllegalStateException(“Cannot yield a value while ready.”)
State.Done -> throw IllegalStateException(“Cannot yield a value while done.”)
}
//这里continuation没有直接调用resume方法,在后面用户调用hasNext()或next()时调用resume()
}
private fun resume() {
println(“resume()====================”)
//val currentState = state之后调用.continuation会自动类型转换
when(val currentState = state) {
is State.NotReady -> {
println(“resume()====================when NotReady”)
currentState.continuation.resume(Unit) // NotReady时调用Continuation的resume方法恢复挂起点继续执行
}
}
}
override fun hasNext(): Boolean {
println(“hasNext()====================”)
resume()
return state != State.Done
}
//next方法返回yield存入的值
override fun next(): T {
println(“next()========${state.javaClass.simpleName}”)
return when(val currentState = state) {
is State.NotReady -> {
resume()
return next() //NotReady时调用下次的next
}
is State.Ready<*> -> {
state = State.NotReady(currentState.continuation) //state状态流转
println(“next()====return value”)
(currentState as State.Ready).nextValue //Ready时才取值返回
}
State.Done -> throw IndexOutOfBoundsException(“No value left.”) //Done状态调用next()抛异常
}
}
//协程体执行完毕
override fun resumeWith(result: Result<Any?>) {
println(“resumeWith====================”)
state = State.Done
result.getOrThrow()
}
}
//这个是定义一个receiver类,保证yield()方法只能在lambda表达式的大括号内使用
abstract class GeneratorScope internal constructor(){
protected abstract val parameter: T
abstract suspend fun yield(value: T)
}
//返回值具有迭代器功能
fun generator(block: suspend GeneratorScope.(T) -> Unit): (T) -> Generator {
return { parameter: T ->
println(“parameter = $parameter”) // parameter = 10 这个是generator接收的start函数,nums(10)
GeneratorImpl(block, parameter)
}
}
fun main() {
val nums = generator { start: Int ->
for (i in 0…5) {
yield(start + i) //yield会挂起函数调用处
}
}
val seq = nums(10)
//println(seq.iterator().next())
for (j in seq) {
println(j)
}
//kotlin官方提供的sequence序列中有yield方法
val sequence = sequence {
yield(1)
yield(2)
yield(3)
yield(4)
yieldAll(listOf(1,2,3,4))
}
for(xx in sequence){
println(xx)
}
}
第一次看这个例子还是比较绕比较难理解的,我添加了log输出,打印执行的顺序:

从log输出的顺序可以看出是每次for循环会先调用hasNext()
方法,hasNext()
中会调用resume()
方法,第一次调用resume()
相当于启动协程,这时协程会执行到yield调用处的代码yield(start + i)
这行并挂起(实际上是一个lambda表达式,它是一个Continuation的实现类SuspendLambad的包装),那yield方法中就会向state中存入值,并同时保存当前的Continuation对象,然后流传状态变化,变成ready, 紧接着for循环里取值操作会调用next
方法,然后在下一次的for循环中又会调用resume()
方法,这时就是恢复前面一次挂起函数调用处的代码继续执行,也就是执行下一次for循环里yield(start + i)
会继续放下一个值,又挂起,next()
方法又取值,hasNext()
方法又resume()
恢复, 继续执行yeild…循环往复。
如果不调用for循环打印,直接调用next获取值呢
fun main() {
val nums = generator { start: Int ->
for (i in 0…5) {
yield(start + i) //yield会挂起函数调用处
}
}
val seq = nums(10)
println(seq.iterator().next())
}
这时next方法中也会先resume, 相当于启动协程,协程体里包含的lambda表达式开始执行,yield方法存值并挂起,next()方法取值,但这时好像没有调用resumeWith方法。。但还是能正常执行完毕。
这个例子比较绕的一点就是协程体中的代码并不会立即被执行,也就是下面的代码:
fun main() {
val nums = generator { start: Int ->
for (i in 0…5) {
yield(start + i) //yield会挂起函数调用处
}
}
val seq = nums(10) //这样并不会执行上面的for循环里面的代码
// for (j in seq) {
// println(j)
// }
你会发现把下面调用的for循环代码注释掉,上面的lambda表达式里面的代码并不会执行,不信看log输出:
只输出了一个parameter=10,yield方法里面的log一个也没有打印。很神奇,就是说只有调用获取值的时候,才会执行yield方法给你发送值,调用一次发送一次,kotlin官方提供的sequence
序列大括号中的yield方法调用也是如此,sequence
本身可以当做迭代器使用。
实例:仿 Lua 协程实现非对称协程 API
sealed class Status {
class Created(val continuation: Continuation): Status()
class Yielded
(val continuation: Continuation
): Status()
class Resumed(val continuation: Continuation): Status()
object Dead: Status()
}
class Coroutine<P, R> (
override val context: CoroutineContext = EmptyCoroutineContext,
private val block: suspend Coroutine<P, R>.CoroutineBody.§ -> R //receiver是Coroutine<P, R>.CoroutineBody 内部类
): Continuation {
companion object {
fun <P, R> create(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend Coroutine<P, R>.CoroutineBody.§ -> R
): Coroutine<P, R> {
return Coroutine(context, block)
}
}
//内部类,保证yield()方法不能在外部调用 只能在lambda当中调用
inner class CoroutineBody {
var parameter: P? = null
suspend fun yield(value: R): P = suspendCoroutine { continuation ->
val previousStatus = status.getAndUpdate {
when(it) {
is Status.Created -> throw IllegalStateException(“Never started!”)
is Status.Yielded<*> -> throw IllegalStateException(“Already yielded!”)
is Status.Resumed<*> -> Status.Yielded(continuation)
Status.Dead -> throw IllegalStateException(“Already dead!”)
}
}
(previousStatus as? Status.Resumed)?.continuation?.resume(value)
}
}
private val body = CoroutineBody()
private val status: AtomicReference
val isActive: Boolean
get() = status.get() != Status.Dead
init {
val coroutineBlock: suspend CoroutineBody.() -> R = { block(parameter!!) }
val start = coroutineBlock.createCoroutine(body, this)
status = AtomicReference(Status.Created(start))
}
override fun resumeWith(result: Result) {
val previousStatus = status.getAndUpdate {
when(it) {
is Status.Created -> throw IllegalStateException(“Never started!”)
is Status.Yielded<*> -> throw IllegalStateException(“Already yielded!”)
is Status.Resumed<*> -> {
Status.Dead
}
Status.Dead -> throw IllegalStateException(“Already dead!”)
}
}
(previousStatus as? Status.Resumed)?.continuation?.resumeWith(result)
}
suspend fun resume(value: P): R = suspendCoroutine { continuation ->
val previousStatus = status.getAndUpdate {
when(it) {
is Status.Created -> {
body.parameter = value
Status.Resumed(continuation)
}
is Status.Yielded<*> -> {
Status.Resumed(continuation)
}
is Status.Resumed<*> -> throw IllegalStateException(“Already resumed!”)
Status.Dead -> throw IllegalStateException(“Already dead!”)
}
}
when(previousStatus){
is Status.Created -> {
previousStatus.continuation.resume(Unit)
}
is Status.Yielded<*> -> {
(previousStatus as Status.Yielded
).continuation.resume(value)
}
}
}
suspend fun SymCoroutine.yield(value: R): P {
return body.yield(value)
}
}
class Dispatcher: ContinuationInterceptor {
override val key = ContinuationInterceptor
private val executor = Executors.newSingleThreadExecutor()
//拦截器方法
override fun interceptContinuation(continuation: Continuation): Continuation {
return DispatcherContinuation(continuation, executor)
}
}
class DispatcherContinuation(val continuation: Continuation, val executor: Executor): Continuation by continuation { //接口代理
//切换线程 再resumeWith
override fun resumeWith(result: Result) {
executor.execute {
continuation.resumeWith(result)
}
}
}
suspend fun main() {
//生产者 接收Unit,返回Int类型
val producer = Coroutine.create<Unit, Int>(Dispatcher()) {
log(“producer start”)
for(i in 0…3){
log(“send”, i)
yield(i)
}
200
}
//消费者 接收Int类型,返回Unit
val consumer = Coroutine.create<Int, Unit>(Dispatcher()) { param: Int ->
log(“consumer start”, param)
for(i in 0…3){
val value = yield(Unit)
log(“receive”, value)
}
}
while (producer.isActive && consumer.isActive){
val result = producer.resume(Unit) //生产
consumer.resume(result) //消费
}
}
輸出log:
实例:模仿实现JS中的async/await
语法
import android.os.Handler
import android.os.Looper
import com.bennyhuo.kotlin.coroutinebasics.api.githubApi
import com.bennyhuo.kotlin.coroutinebasics.common.Dispatcher
import com.bennyhuo.kotlin.coroutinebasics.common.DispatcherContext
import com.bennyhuo.kotlin.coroutinebasics.utils.log
import retrofit2.Call
import retrofit2.Callback
import retrofit2.HttpException
import retrofit2.Response
import kotlin.coroutines.*
interface AsyncScope
suspend fun AsyncScope.await(block: () -> Call) = suspendCoroutine {
continuation ->
val call = block()
//okhttp的回调异步处理 分别调用continuation.resume处理
call.enqueue(object : Callback{
override fun onFailure(call: Call, t: Throwable) {
continuation.resumeWithException(t)
}
override fun onResponse(call: Call, response: Response) {
if(response.isSuccessful){
response.body()?.let(continuation::resume) ?: continuation.resumeWithException(NullPointerException())
} else {
continuation.resumeWithException(HttpException(response))
}
}
})
}
fun async(context: CoroutineContext = EmptyCoroutineContext, block: suspend AsyncScope.() -> Unit) {
val completion = AsyncCoroutine(context)
block.startCoroutine(completion, completion)
}
class AsyncCoroutine(override val context: CoroutineContext = EmptyCoroutineContext): Continuation, AsyncScope {
override fun resumeWith(result: Result) {
result.getOrThrow()
}
}
fun main() {
Looper.prepare()
val handlerDispatcher = DispatcherContext(object : Dispatcher {
val handler = Handler()
override fun dispatch(block: () -> Unit) {
handler.post(block) //线程切换,block会切换到handler所在的线程执行,就是main所在的线程
}
})
async(handlerDispatcher) {
val user = await { githubApi.getUserCallback(“bennyhuo”) }
log(user)
}
Looper.loop()
}
其中Handler和Looper是作者移植的android的代码。
输出:
可挂起的main函数:
suspend fun main() {
…
}
编译器会为suspend的main函数生成下面等价的代码:
fun main(continuation: Continuation): Any? {
return println(“Hello”)
}
实际上,源码中是在main()函数中调用了runSuspend
方法,runSuspend
当中创建并启动了一个协程体,只不过我看不到这个代码。
fun main() {
runSuspend(::main1 as suspend () -> Unit)
}
/**
- Wrapper for
suspend fun main
and@Test suspend fun testXXX
functions.
*/
@SinceKotlin(“1.3”)
internal fun runSuspend(block: suspend () -> Unit) {
val run = RunSuspend()
block.startCoroutine(run)
run.await()
}
private class RunSuspend : Continuation {
override val context: CoroutineContext
get() = EmptyCoroutineContext
//-Xallow-result-return-type
var result: Result? = null
override fun resumeWith(result: Result) = synchronized(this) {
this.result = result
@Suppress(“PLATFORM_CLASS_MAPPED_TO_KOTLIN”) (this as Object).notifyAll()
}
fun await() = synchronized(this) {
while (true) {
when (val result = this.result) {
null -> @Suppress(“PLATFORM_CLASS_MAPPED_TO_KOTLIN”) (this as Object).wait()
else -> {
result.getOrThrow() // throw up failure