0
点赞
收藏
分享

微信扫一扫

Kotlin学习手记——协程初步

素锦时年_1b00 2022-04-29 阅读 115

//yield是一个挂起函数 覆写GeneratorScope类的方法

override suspend fun yield(value: T) = suspendCoroutine {

continuation ->

println(“yield======== s t a t e . j a v a C l a s s . s i m p l e N a m e v a l u e = {state.javaClass.simpleName} value= state.javaClass.simpleNamevalue={value}”)

state = when(state) {

is State.NotReady -> State.Ready(continuation, value) //调用yield(xx)方法使状态进入Ready状态

is State.Ready<*> -> throw IllegalStateException(“Cannot yield a value while ready.”)

State.Done -> throw IllegalStateException(“Cannot yield a value while done.”)

}

//这里continuation没有直接调用resume方法,在后面用户调用hasNext()或next()时调用resume()

}

private fun resume() {

println(“resume()====================”)

//val currentState = state之后调用.continuation会自动类型转换

when(val currentState = state) {

is State.NotReady -> {

println(“resume()====================when NotReady”)

currentState.continuation.resume(Unit) // NotReady时调用Continuation的resume方法恢复挂起点继续执行

}

}

}

override fun hasNext(): Boolean {

println(“hasNext()====================”)

resume()

return state != State.Done

}

//next方法返回yield存入的值

override fun next(): T {

println(“next()========${state.javaClass.simpleName}”)

return when(val currentState = state) {

is State.NotReady -> {

resume()

return next() //NotReady时调用下次的next

}

is State.Ready<*> -> {

state = State.NotReady(currentState.continuation) //state状态流转

println(“next()====return value”)

(currentState as State.Ready).nextValue //Ready时才取值返回

}

State.Done -> throw IndexOutOfBoundsException(“No value left.”) //Done状态调用next()抛异常

}

}

//协程体执行完毕

override fun resumeWith(result: Result<Any?>) {

println(“resumeWith====================”)

state = State.Done

result.getOrThrow()

}

}

//这个是定义一个receiver类,保证yield()方法只能在lambda表达式的大括号内使用

abstract class GeneratorScope internal constructor(){

protected abstract val parameter: T

abstract suspend fun yield(value: T)

}

//返回值具有迭代器功能

fun generator(block: suspend GeneratorScope.(T) -> Unit): (T) -> Generator {

return { parameter: T ->

println(“parameter = $parameter”) // parameter = 10 这个是generator接收的start函数,nums(10)

GeneratorImpl(block, parameter)

}

}

fun main() {

val nums = generator { start: Int ->

for (i in 0…5) {

yield(start + i) //yield会挂起函数调用处

}

}

val seq = nums(10)

//println(seq.iterator().next())

for (j in seq) {

println(j)

}

//kotlin官方提供的sequence序列中有yield方法

val sequence = sequence {

yield(1)

yield(2)

yield(3)

yield(4)

yieldAll(listOf(1,2,3,4))

}

for(xx in sequence){

println(xx)

}

}

第一次看这个例子还是比较绕比较难理解的,我添加了log输出,打印执行的顺序:

![在这里插入图片描述](https://img-blog.csdnimg.cn/20210121211607164.png?x-oss-process=image/watermark,ty 《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》无偿开源 徽信搜索公众号【编程进阶路】 pe_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2x5YWJjMTIzNDU2,size_16,color_FFFFFF,t_70)

从log输出的顺序可以看出是每次for循环会先调用hasNext()方法,hasNext()中会调用resume()方法,第一次调用resume()相当于启动协程,这时协程会执行到yield调用处的代码yield(start + i)这行并挂起(实际上是一个lambda表达式,它是一个Continuation的实现类SuspendLambad的包装),那yield方法中就会向state中存入值,并同时保存当前的Continuation对象,然后流传状态变化,变成ready, 紧接着for循环里取值操作会调用next方法,然后在下一次的for循环中又会调用resume()方法,这时就是恢复前面一次挂起函数调用处的代码继续执行,也就是执行下一次for循环里yield(start + i)会继续放下一个值,又挂起,next()方法又取值,hasNext()方法又resume()恢复, 继续执行yeild…循环往复。

如果不调用for循环打印,直接调用next获取值呢

fun main() {

val nums = generator { start: Int ->

for (i in 0…5) {

yield(start + i) //yield会挂起函数调用处

}

}

val seq = nums(10)

println(seq.iterator().next())

}

在这里插入图片描述

这时next方法中也会先resume, 相当于启动协程,协程体里包含的lambda表达式开始执行,yield方法存值并挂起,next()方法取值,但这时好像没有调用resumeWith方法。。但还是能正常执行完毕。

这个例子比较绕的一点就是协程体中的代码并不会立即被执行,也就是下面的代码:

fun main() {

val nums = generator { start: Int ->

for (i in 0…5) {

yield(start + i) //yield会挂起函数调用处

}

}

val seq = nums(10) //这样并不会执行上面的for循环里面的代码

// for (j in seq) {

// println(j)

// }

你会发现把下面调用的for循环代码注释掉,上面的lambda表达式里面的代码并不会执行,不信看log输出:

在这里插入图片描述

只输出了一个parameter=10,yield方法里面的log一个也没有打印。很神奇,就是说只有调用获取值的时候,才会执行yield方法给你发送值,调用一次发送一次,kotlin官方提供的sequence序列大括号中的yield方法调用也是如此,sequence本身可以当做迭代器使用。

实例:仿 Lua 协程实现非对称协程 API

sealed class Status {

class Created(val continuation: Continuation): Status()

class Yielded

(val continuation: Continuation

): Status()

class Resumed(val continuation: Continuation): Status()

object Dead: Status()

}

class Coroutine<P, R> (

override val context: CoroutineContext = EmptyCoroutineContext,

private val block: suspend Coroutine<P, R>.CoroutineBody.§ -> R //receiver是Coroutine<P, R>.CoroutineBody 内部类

): Continuation {

companion object {

fun <P, R> create(

context: CoroutineContext = EmptyCoroutineContext,

block: suspend Coroutine<P, R>.CoroutineBody.§ -> R

): Coroutine<P, R> {

return Coroutine(context, block)

}

}

//内部类,保证yield()方法不能在外部调用 只能在lambda当中调用

inner class CoroutineBody {

var parameter: P? = null

suspend fun yield(value: R): P = suspendCoroutine { continuation ->

val previousStatus = status.getAndUpdate {

when(it) {

is Status.Created -> throw IllegalStateException(“Never started!”)

is Status.Yielded<*> -> throw IllegalStateException(“Already yielded!”)

is Status.Resumed<*> -> Status.Yielded(continuation)

Status.Dead -> throw IllegalStateException(“Already dead!”)

}

}

(previousStatus as? Status.Resumed)?.continuation?.resume(value)

}

}

private val body = CoroutineBody()

private val status: AtomicReference

val isActive: Boolean

get() = status.get() != Status.Dead

init {

val coroutineBlock: suspend CoroutineBody.() -> R = { block(parameter!!) }

val start = coroutineBlock.createCoroutine(body, this)

status = AtomicReference(Status.Created(start))

}

override fun resumeWith(result: Result) {

val previousStatus = status.getAndUpdate {

when(it) {

is Status.Created -> throw IllegalStateException(“Never started!”)

is Status.Yielded<*> -> throw IllegalStateException(“Already yielded!”)

is Status.Resumed<*> -> {

Status.Dead

}

Status.Dead -> throw IllegalStateException(“Already dead!”)

}

}

(previousStatus as? Status.Resumed)?.continuation?.resumeWith(result)

}

suspend fun resume(value: P): R = suspendCoroutine { continuation ->

val previousStatus = status.getAndUpdate {

when(it) {

is Status.Created -> {

body.parameter = value

Status.Resumed(continuation)

}

is Status.Yielded<*> -> {

Status.Resumed(continuation)

}

is Status.Resumed<*> -> throw IllegalStateException(“Already resumed!”)

Status.Dead -> throw IllegalStateException(“Already dead!”)

}

}

when(previousStatus){

is Status.Created -> {

previousStatus.continuation.resume(Unit)

}

is Status.Yielded<*> -> {

(previousStatus as Status.Yielded

).continuation.resume(value)

}

}

}

suspend fun SymCoroutine.yield(value: R): P {

return body.yield(value)

}

}

class Dispatcher: ContinuationInterceptor {

override val key = ContinuationInterceptor

private val executor = Executors.newSingleThreadExecutor()

//拦截器方法

override fun interceptContinuation(continuation: Continuation): Continuation {

return DispatcherContinuation(continuation, executor)

}

}

class DispatcherContinuation(val continuation: Continuation, val executor: Executor): Continuation by continuation { //接口代理

//切换线程 再resumeWith

override fun resumeWith(result: Result) {

executor.execute {

continuation.resumeWith(result)

}

}

}

suspend fun main() {

//生产者 接收Unit,返回Int类型

val producer = Coroutine.create<Unit, Int>(Dispatcher()) {

log(“producer start”)

for(i in 0…3){

log(“send”, i)

yield(i)

}

200

}

//消费者 接收Int类型,返回Unit

val consumer = Coroutine.create<Int, Unit>(Dispatcher()) { param: Int ->

log(“consumer start”, param)

for(i in 0…3){

val value = yield(Unit)

log(“receive”, value)

}

}

while (producer.isActive && consumer.isActive){

val result = producer.resume(Unit) //生产

consumer.resume(result) //消费

}

}

輸出log:

在这里插入图片描述

实例:模仿实现JS中的async/await语法

import android.os.Handler

import android.os.Looper

import com.bennyhuo.kotlin.coroutinebasics.api.githubApi

import com.bennyhuo.kotlin.coroutinebasics.common.Dispatcher

import com.bennyhuo.kotlin.coroutinebasics.common.DispatcherContext

import com.bennyhuo.kotlin.coroutinebasics.utils.log

import retrofit2.Call

import retrofit2.Callback

import retrofit2.HttpException

import retrofit2.Response

import kotlin.coroutines.*

interface AsyncScope

suspend fun AsyncScope.await(block: () -> Call) = suspendCoroutine {

continuation ->

val call = block()

//okhttp的回调异步处理 分别调用continuation.resume处理

call.enqueue(object : Callback{

override fun onFailure(call: Call, t: Throwable) {

continuation.resumeWithException(t)

}

override fun onResponse(call: Call, response: Response) {

if(response.isSuccessful){

response.body()?.let(continuation::resume) ?: continuation.resumeWithException(NullPointerException())

} else {

continuation.resumeWithException(HttpException(response))

}

}

})

}

fun async(context: CoroutineContext = EmptyCoroutineContext, block: suspend AsyncScope.() -> Unit) {

val completion = AsyncCoroutine(context)

block.startCoroutine(completion, completion)

}

class AsyncCoroutine(override val context: CoroutineContext = EmptyCoroutineContext): Continuation, AsyncScope {

override fun resumeWith(result: Result) {

result.getOrThrow()

}

}

fun main() {

Looper.prepare()

val handlerDispatcher = DispatcherContext(object : Dispatcher {

val handler = Handler()

override fun dispatch(block: () -> Unit) {

handler.post(block) //线程切换,block会切换到handler所在的线程执行,就是main所在的线程

}

})

async(handlerDispatcher) {

val user = await { githubApi.getUserCallback(“bennyhuo”) }

log(user)

}

Looper.loop()

}

其中Handler和Looper是作者移植的android的代码。

输出:

在这里插入图片描述

可挂起的main函数:

suspend fun main() {

}

编译器会为suspend的main函数生成下面等价的代码:

fun main(continuation: Continuation): Any? {

return println(“Hello”)

}

实际上,源码中是在main()函数中调用了runSuspend方法,runSuspend当中创建并启动了一个协程体,只不过我看不到这个代码。

fun main() {

runSuspend(::main1 as suspend () -> Unit)

}

/**

  • Wrapper for suspend fun main and @Test suspend fun testXXX functions.

*/

@SinceKotlin(“1.3”)

internal fun runSuspend(block: suspend () -> Unit) {

val run = RunSuspend()

block.startCoroutine(run)

run.await()

}

private class RunSuspend : Continuation {

override val context: CoroutineContext

get() = EmptyCoroutineContext

//-Xallow-result-return-type

var result: Result? = null

override fun resumeWith(result: Result) = synchronized(this) {

this.result = result

@Suppress(“PLATFORM_CLASS_MAPPED_TO_KOTLIN”) (this as Object).notifyAll()

}

fun await() = synchronized(this) {

while (true) {

when (val result = this.result) {

null -> @Suppress(“PLATFORM_CLASS_MAPPED_TO_KOTLIN”) (this as Object).wait()

else -> {

result.getOrThrow() // throw up failure

举报

相关推荐

0 条评论