0
点赞
收藏
分享

微信扫一扫

Coroutine 协程


安卓上协程需要的依赖

# build.gradle
kotlin {
    experimental {
        coroutines 'enable'
    }
}

    // Coroutines
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.2'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.2'

作用域

  • GlobalScope
  • CoroutineScope
  • MainScope

CoroutineContext

  • Default
  • Main
  • Unconfined
  • IO

ViewModel中的Scope作用域

private const val JOB_KEY = "androidx.lifecycle.ViewModelCoroutineScope.JOB_KEY"

/**
 * [CoroutineScope] tied to this [ViewModel].
 * This scope will be canceled when ViewModel will be cleared, i.e [ViewModel.onCleared] is called
 *
 * This scope is bound to
 * [Dispatchers.Main.immediate][kotlinx.coroutines.MainCoroutineDispatcher.immediate]
 */
public val ViewModel.viewModelScope: CoroutineScope
    get() {
        val scope: CoroutineScope? = this.getTag(JOB_KEY)
        if (scope != null) {
            return scope
        }
        return setTagIfAbsent(
            JOB_KEY,
            CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)
        )
    }

internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope {
    override val coroutineContext: CoroutineContext = context

    override fun close() {
        coroutineContext.cancel()
    }
}

一些验证

//test()

// foreach(0) //launch 会复用线程池
 //test4()
 // test5()
 //test6()
 //test7() // 优先主线, 协程才做
 //test8() //创建多个协程, 两个 async 交叉运行
 // test9() // coroutineScope会一个新的作用域
 // test10()
 // test11()
 //test12()//launch中的this只是当前launch
 //test13()
 //test14() //验证cancelchild
 //test15() // 先取消上一个任务,在进行下一个任务
 //test16()
 test17()

fun test17() {
        // 任务一个一个排队执行
//        val controlledRunner = ControlledRunner<String>() // 放到公共区
        val controlledRunner = ControlledRunner<String>() // 排队

        MainScope().launch {
            var num: Int = 0
            repeat(10) {
                num++
                controlledRunner.joinPreviousOrRun {
                    for (i in 0..10) {
                        log("$num controlledRunner.cancelPreviousThenRun  $i")
                        delay(300)
                    }
                    ""
                }
                delay(600)
            }
        }
    }

    fun test16() {
        // 任务一个一个排队执行
//        val controlledRunner = ControlledRunner<String>() // 放到公共区
        val singleRunner = SingleRunner() // 排队

        MainScope().launch {
            repeat(10) {
                singleRunner.afterPrevious {
                    for (i in 0..10) {
                        log("controlledRunner.cancelPreviousThenRun  $i")
                        delay(300)
                    }
                }
            }
        }
    }


    fun test15() {
        // 下一个任务, 先取消前一个任务
        val controlledRunner = ControlledRunner<String>() // 放到公共区

        MainScope().launch {
            repeat(10) {
                controlledRunner.cancelPreviousThenRun {
                    for (i in 0..1000) {
                        log("controlledRunner.cancelPreviousThenRun  $i")
                        delay(300)
                    }
                    ""
                }
            }
        }
    }


    fun test14() { // 验证cancelchild
        vm.viewModelScope.launch {
            val child1 = launch {
                for (i in 0..1000) {
                    log("launch $i")
                    delay(800)
                }
            }
            val child2 = async {
                for (i in 0..1000) {
                    log("async $i")
                    delay(600)
                }
            }


            for (i in 0..100000) {
                delay(200)
                log("viewModelScope $i")
                if (i == 10) {
                    this.coroutineContext.job.cancelChildren()
//                    child1.cancel()
//                    child2.cancel()
                    break
                }
            }

        }.invokeOnCompletion {
            log("invokeOnCompletion $it")
        }
    }

    fun test13() { // 验证取消job,子协程是否能取消
        // 结论:子协程也被取消
        vm.viewModelScope.launch {
            launch {
                for (i in 0..1000) {
                    log("launch $i")
                    delay(800)
                }
            }.invokeOnCompletion {
                log("launch $it")
            }
            async {
                for (i in 0..1000) {
                    log("async $i")
                    delay(600)
                }
            }

            for (i in 0..1000) {
                log("viewModelScope $i")
                delay(600)
                if (i == 10) this.cancel()
            }
        }.invokeOnCompletion {
            log("invokeOnCompletion $it")
        }
    }

    fun test12() { // 取消正在运行的协程
        // launch中的this只是当前launch
        val job: Job = vm.viewModelScope.launch {
            for (i in 0..1000) {
                log("viewModelScope $i")
                delay(500)
                if (i == 10) this.cancel()
            }
        }
        vm.viewModelScope.launch {
            for (i in 0..1000) {
                log("****  $i")
                delay(800)
            }
        }
    }

    fun test11() {
        vm.viewModelScope.launch {
            for (i in 0..10) {
                delay(6)
                log("viewModelScope ")
            }

        }
        for (i in 0..10) {
            log("test11 $i")
            Thread.sleep(6000)
        }
    }


    fun test10() { //验证在corouteinScope中,任意一个子失败,就会停止
        log("test10 start")
        MainScope().launch(CoroutineExceptionHandler { c: CoroutineContext, t: Throwable ->
//            log("$t")
            log("$c")
        }) {
            log("MainScope start")
            coroutineScope {
                launch {
                    log("luanch start")
                    delay(2000)
                    throw IllegalArgumentException("")
                }
                async {
                    log("async start")
                    for (i in 0..100) {
                        delay(500)
                        log("async 001 - $i")
                    }
                    log("async end")
                }
            }
            log("MainScope end")
        }
        log("test10 end")
    }


    fun test9() {
        MainScope().launch {
            coroutineScope {
                launch {
                    for (i in 0..100) {
                        delay(1000)
                        log("launch 001 - $i")
                    }
                }
                async {
                    for (i in 0..100) {
                        delay(500)
                        log("async 001 - $i")
                    }
                }
            }
        }
    }

    fun test8() {
        // 揭露, async两个交叉运行
        log("test8 start")
        CoroutineScope(Dispatchers.Main).launch { // 作用域
            log("CoroutineScope start")
            async {
                for (i in 0..100) {
                    delay(1000)
                    log("async 001 - $i")
                }
            }
            async {
                for (i in 0..100) {
                    delay(500)
                    log("async 002 - $i")
                }
            }
//            a1.await()
            log("CoroutineScope end")
        }
        log("test8 end")
    }


    fun test7() { // 测试异步会不会影响协程
        for (i in 0..50) {
            log("$i start -> ")
            Thread.sleep(100)
            if (i == 26) Thread.sleep(5000)
        }
        for (i in 0..100) {
            CoroutineScope(Dispatchers.Main).launch {
                delay((200 * i).toLong())
                log("$i")
            }
        }

        Thread {
            Thread.sleep(5000) //休眠5秒
            runOnUiThread {
                // 干别的事情会不会被影响到
                for (i in 0..50) {
                    log("$i end -> ")
                    Thread.sleep(100)
                    if (i == 26) Thread.sleep(5000)
                }
            }
        }.start()


    }

    fun test6() { // 测试在main中创建多个协程
        // 结论,协程不会阻塞主线程的工作
        // 干别的事情会不会被影响到
        for (i in 0..50) {
            log("$i start -> ")
            Thread.sleep(100)
            if (i == 26) Thread.sleep(5000)
        }
        for (i in 0..100) {
            CoroutineScope(Dispatchers.Main).launch {
                delay((200 * i).toLong())
                log("$i")
            }
        }

        // 干别的事情会不会被影响到
        for (i in 0..50) {
            log("$i end -> ")
            Thread.sleep(100)
            if (i == 26) Thread.sleep(5000)
        }
    }

    fun test5() { // 测试在main中创建多个协程
        for (i in 0..100) {
            CoroutineScope(Dispatchers.Main).launch {
                log("$i")
                delay(800)
            }
        }

        // 干别的事情会不会被影响到
        for (i in 0..100) {
            log("$i other -> ")
            Thread.sleep(200)
        }
    }

    fun test4() {
        // 结论: Main中是在main里面
        // 其他: 单独创建单独的线程
        runBlocking {
            log("runBlocking")
        }
        GlobalScope.launch {
            log("GlobalScope.launch")
        }
        val launch: Job = CoroutineScope(SupervisorJob()).launch {
            log("CoroutineScope(SupervisorJob()).launch")
        }
        CoroutineScope(Job()).launch {
            log("CoroutineScope(Job()).launch")
        }
        CoroutineScope(Dispatchers.IO).launch {
            log("CoroutineScope(Dispatchers.IO).launch")
        }
        CoroutineScope(Dispatchers.Main).launch {
            log("CoroutineScope(Dispatchers.Main).launch")
        }
        CoroutineScope(Dispatchers.Default).launch {
            log("CoroutineScope(Dispatchers.Default).launch")
        }
        vm.viewModelScope.launch {
            log("vm.viewModelScope.launch")
        }
        vm.viewModelScope.launch(Dispatchers.IO) {
            log("vm.viewModelScope.launch(Dispatchers.IO)")
        }
    }

    val vm: VM by viewModels()
    fun log(msg: String) {
        Log.e("SCOPE", "$msg - ${Thread.currentThread().name} \t ${System.currentTimeMillis()}")
    }

    val map = hashMapOf<String, Int>()
    fun HashMap<String, Int>.put2(key: String) {
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
            val num = map.getOrDefault(key, 0) + 1
            map[key] = num
        }
    }

    fun HashMap<String, Int>.pritln() {
        val currentTimeMillis = System.currentTimeMillis()
        map.forEach { (key, num) ->
            log("$currentTimeMillis \t\t $num \t $key")
        }
    }

    fun foreach(i: Int) {
        // 验证 : launch也是会复用线程池
        if (i > 10) return
        GlobalScope.launch {
            foreach(i + 1)
            map.put2(Thread.currentThread().name)
            log("$i GlobalScope")
            delay(800)

            map.pritln()
        }
        // 统计个数
    }

    fun test() {
        log("test()")
        GlobalScope.launch {
            log("1. GlobalScope")
            GlobalScope.launch {
                log("2. GlobalScope")
                GlobalScope.launch {
                    log("3. GlobalScope")
                    GlobalScope.launch {
                        log("4. GlobalScope")
                        GlobalScope.launch {
                            log("5. GlobalScope")
                            GlobalScope.launch {
                                log("6. GlobalScope")
                            }
                        }
                    }
                }
            }
            withContext(Dispatchers.Main) {
                log("withContext Main")
            }
            withContext(Dispatchers.IO) {
                log("withContext IO")
            }
        }
        vm.viewModelScope.launch {
            log("viewModelScope")
        }
    }
}

class VM : ViewModel()

ConcurrencyHelpers.kt

package com.joyy.androidproject

import kotlinx.coroutines.CoroutineStart.LAZY
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.yield
import java.util.concurrent.atomic.AtomicReference
import kotlin.DeprecationLevel.ERROR

/**
 * A helper class to execute tasks sequentially in coroutines.
 *
 * Calling [afterPrevious] will always ensure that all previously requested work completes prior to
 * calling the block passed. Any future calls to [afterPrevious] while the current block is running
 * will wait for the current block to complete before starting.
 */
class SingleRunner {
    /**
     * A coroutine mutex implements a lock that may only be taken by one coroutine at a time.
     */
    private val mutex = Mutex()

    /**
     * Ensure that the block will only be executed after all previous work has completed.
     *
     * When several coroutines call afterPrevious at the same time, they will queue up in the order
     * that they call afterPrevious. Then, one coroutine will enter the block at a time.
     *
     * In the following example, only one save operation (user or song) will be executing at a time.
     *
     * ```
     * class UserAndSongSaver {
     *    val singleRunner = SingleRunner()
     *
     *    fun saveUser(user: User) {
     *        singleRunner.afterPrevious { api.post(user) }
     *    }
     *
     *    fun saveSong(song: Song) {
     *        singleRunner.afterPrevious { api.post(song) }
     *    }
     * }
     * ```
     *
     * @param block the code to run after previous work is complete.
     */
    suspend fun <T> afterPrevious(block: suspend () -> T): T {
        // Before running the block, ensure that no other blocks are running by taking a lock on the
        // mutex.

        // The mutex will be released automatically when we return.

        // If any other block were already running when we get here, it will wait for it to complete
        // before entering the `withLock` block.
        mutex.withLock {
            return block()
        }
    }
}

/**
 * A controlled runner decides what to do when new tasks are run.
 *
 * Note: This implementation is for example only. It will not work in the presence of
 *       multi-threading and is not safe to call from Dispatchers.IO or Dispatchers.Default. In
 *       real code use the thread-safe implementation of [ControlledRunner] code listed below.
 *
 * By calling [joinPreviousOrRun], the new task will be discarded and the result of the previous task
 * will be returned. This is useful when you want to ensure that a network request to the same
 * resource does not flood.
 *
 * By calling [cancelPreviousThenRun], the old task will *always* be cancelled and then the new task will
 * be run. This is useful in situations where a new event implies that the previous work is no
 * longer relevant such as sorting or filtering a list.
 */
@Deprecated(
    "This code is not thread-safe and should not be used. Use " +
            "the ControlledRunner implementation below instead.", level = ERROR
)
class ControlledRunnerExampleImplementation<T> {
    private var activeTask: Deferred<T>? = null

    /**
     * Cancel all previous tasks before calling block.
     *
     * When several coroutines call cancelPreviousThenRun at the same time, only one will run and
     * the others will be cancelled.
     */
    @Deprecated(
        "This code is not thread-safe. Use ControlledRunner below instead.",
        level = ERROR
    )
    suspend fun cancelPreviousThenRun(block: suspend () -> T): T {
        // If there is an activeTask, cancel it because it's result is no longer needed
        //
        // By waiting for the cancellation to complete with `cancelAndJoin` we know that activeTask
        // has stopped executing before continuing.
        activeTask?.cancelAndJoin()

        // use a coroutineScope builder to safely start a new coroutine in a suspend function
        return coroutineScope {
            // create a new task to call the block
            val newTask = async {
                block()
            }
            // when the new task completes, reset activeTask to null
            // this will be called by cancellation as well as normal completion
            newTask.invokeOnCompletion {
                activeTask = null
            }
            // save the newTask to activeTask, then wait for it to complete and return the result
            activeTask = newTask
            newTask.await()
        }
    }

    /**
     * Don't run the new block if a previous block is running, instead wait for the previous block
     * and return it's result.
     *
     * When several coroutines call joinPreviousOrRun at the same time, only one will run and
     * the others will return the result from the winner.
     */
    @Deprecated(
        "This code is not thread-safe. Use ControlledRunner below instead.",
        level = ERROR
    )
    suspend fun joinPreviousOrRun(block: suspend () -> T): T {
        // if there is an activeTask, return it's result and don't run the block
        activeTask?.let {
            return it.await()
        }

        // use a coroutineScope builder to safely start a new coroutine in a suspend function
        return coroutineScope {
            // create a new task to call the block
            val newTask = async {
                block()
            }
            // when the task completes, reset activeTask to null
            newTask.invokeOnCompletion {
                activeTask = null
            }
            // save newTask to activeTask, then wait for it to complete and return the result
            activeTask = newTask
            newTask.await()
        }
    }
}

/**
 * A controlled runner decides what to do when new tasks are run.
 *
 * By calling [joinPreviousOrRun], the new task will be discarded and the result of the previous task
 * will be returned. This is useful when you want to ensure that a network request to the same
 * resource does not flood.
 *
 * By calling [cancelPreviousThenRun], the old task will *always* be cancelled and then the new task will
 * be run. This is useful in situations where a new event implies that the previous work is no
 * longer relevant such as sorting or filtering a list.
 */
class ControlledRunner<T> {
    /**
     * The currently active task.
     *
     * This uses an atomic reference to ensure that it's safe to update activeTask on both
     * Dispatchers.Default and Dispatchers.Main which will execute coroutines on multiple threads at
     * the same time.
     */
    private val activeTask = AtomicReference<Deferred<T>?>(null)

    /**
     * Cancel all previous tasks before calling block.
     *
     * When several coroutines call cancelPreviousThenRun at the same time, only one will run and
     * the others will be cancelled.
     *
     * In the following example, only one sort operation will execute and any previous sorts will be
     * cancelled.
     *
     * ```
     * class Products {
     *    val controlledRunner = ControlledRunner<Product>()
     *
     *    fun sortAscending(): List<Product> {
     *        return controlledRunner.cancelPreviousThenRun { dao.loadSortedAscending() }
     *    }
     *
     *    fun sortDescending(): List<Product> {
     *        return controlledRunner.cancelPreviousThenRun { dao.loadSortedDescending() }
     *    }
     * }
     * ```
     *
     * @param block the code to run after previous work is cancelled.
     * @return the result of block, if this call was not cancelled prior to returning.
     */
    suspend fun cancelPreviousThenRun(block: suspend () -> T): T {
        // fast path: if we already know about an active task, just cancel it right away.
        activeTask.get()?.cancelAndJoin()

        return coroutineScope {
            // Create a new coroutine, but don't start it until it's decided that this block should
            // execute. In the code below, calling await() on newTask will cause this coroutine to
            // start.
            val newTask = async(start = LAZY) {
                block()
            }

            // When newTask completes, ensure that it resets activeTask to null (if it was the
            // current activeTask).
            newTask.invokeOnCompletion {
                activeTask.compareAndSet(newTask, null)
            }

            // Kotlin ensures that we only set result once since it's a val, even though it's set
            // inside the while(true) loop.
            val result: T

            // Loop until we are sure that newTask is ready to execute (all previous tasks are
            // cancelled)
            while (true) {
                if (!activeTask.compareAndSet(null, newTask)) {
                    // some other task started before newTask got set to activeTask, so see if it's
                    // still running when we call get() here. If so, we can cancel it.

                    // we will always start the loop again to see if we can set activeTask before
                    // starting newTask.
                    activeTask.get()?.cancelAndJoin()
                    // yield here to avoid a possible tight loop on a single threaded dispatcher
                    yield()
                } else {
                    // happy path - we set activeTask so we are ready to run newTask
                    result = newTask.await()
                    break
                }
            }

            // Kotlin ensures that the above loop always sets result exactly once, so we can return
            // it here!
            result
        }
    }

    /**
     * Don't run the new block if a previous block is running, instead wait for the previous block
     * and return it's result.
     *
     * When several coroutines call jonPreviousOrRun at the same time, only one will run and
     * the others will return the result from the winner.
     *
     * In the following example, only one network operation will execute at a time and any other
     * requests will return the result from the "in flight" request.
     *
     * ```
     * class Products {
     *    val controlledRunner = ControlledRunner<Product>()
     *
     *    fun fetchProducts(): List<Product> {
     *        return controlledRunner.joinPreviousOrRun {
     *            val results = api.fetchProducts()
     *            dao.insert(results)
     *            results
     *        }
     *    }
     * }
     * ```
     *
     * @param block the code to run if and only if no other task is currently running
     * @return the result of block, or if another task was running the result of that task instead.
     */
    suspend fun joinPreviousOrRun(block: suspend () -> T): T {
        // fast path: if there's already an active task, just wait for it and return the result
        activeTask.get()?.let {
            return it.await()
        }
        return coroutineScope {
            // Create a new coroutine, but don't start it until it's decided that this block should
            // execute. In the code below, calling await() on newTask will cause this coroutine to
            // start.
            val newTask = async(start = LAZY) {
                block()
            }

            newTask.invokeOnCompletion {
                activeTask.compareAndSet(newTask, null)
            }

            // Kotlin ensures that we only set result once since it's a val, even though it's set
            // inside the while(true) loop.
            val result: T

            // Loop until we figure out if we need to run newTask, or if there is a task that's
            // already running we can join.
            while (true) {
                if (!activeTask.compareAndSet(null, newTask)) {
                    // some other task started before newTask got set to activeTask, so see if it's
                    // still running when we call get() here. There is a chance that it's already
                    // been completed before the call to get, in which case we need to start the
                    // loop over and try again.
                    val currentTask = activeTask.get()
                    if (currentTask != null) {
                        // happy path - we found the other task so use that one instead of newTask
                        newTask.cancel()
                        result = currentTask.await()
                        break
                    } else {
                        // retry path - the other task completed before we could get it, loop to try
                        // setting activeTask again.

                        // call yield here in case we're executing on a single threaded dispatcher
                        // like Dispatchers.Main to allow other work to happen.
                        yield()
                    }
                } else {
                    // happy path - we were able to set activeTask, so start newTask and return its
                    // result
                    result = newTask.await()
                    break
                }
            }

            // Kotlin ensures that the above loop always sets result exactly once, so we can return
            // it here!
            result
        }
    }
}

其它

// 1. MainScope 是异步的
    // 2. 两个launch之间是同步执行的
    // 3.
    private fun test() {
        logI { "Coroutines Step start" }
        val mainScope = MainScope()
        mainScope.launch {
            logI { "Coroutines MainScope Step 1" }
            delay(2000)
            logI { "Coroutines MainScope Step 2" }
        }
        mainScope.launch {
            logI { "Coroutines MainScope1 Step 1" }
            withContext(Dispatchers.IO) {
                logI { "Coroutines MainScope1 Step IO 2" }
                delay(2000)
                logI { "Coroutines MainScope1 Step IO 3" }
            }
            logI { "Coroutines MainScope1 Step 2" }
            withContext(Dispatchers.Main) {
                logI { "Coroutines MainScope1 Step Main 1" }
            }
            logI { "Coroutines MainScope1 Step 2" }
        }
        logI { "Coroutines Step end" }
    }


举报

相关推荐

0 条评论