安卓上协程需要的依赖
# build.gradle
kotlin {
experimental {
coroutines 'enable'
}
}
// Coroutines
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.2'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.2'
作用域
- GlobalScope
- CoroutineScope
- MainScope
CoroutineContext
- Default
- Main
- Unconfined
- IO
ViewModel中的Scope作用域
private const val JOB_KEY = "androidx.lifecycle.ViewModelCoroutineScope.JOB_KEY"
/**
* [CoroutineScope] tied to this [ViewModel].
* This scope will be canceled when ViewModel will be cleared, i.e [ViewModel.onCleared] is called
*
* This scope is bound to
* [Dispatchers.Main.immediate][kotlinx.coroutines.MainCoroutineDispatcher.immediate]
*/
public val ViewModel.viewModelScope: CoroutineScope
get() {
val scope: CoroutineScope? = this.getTag(JOB_KEY)
if (scope != null) {
return scope
}
return setTagIfAbsent(
JOB_KEY,
CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)
)
}
internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope {
override val coroutineContext: CoroutineContext = context
override fun close() {
coroutineContext.cancel()
}
}
一些验证
//test()
// foreach(0) //launch 会复用线程池
//test4()
// test5()
//test6()
//test7() // 优先主线, 协程才做
//test8() //创建多个协程, 两个 async 交叉运行
// test9() // coroutineScope会一个新的作用域
// test10()
// test11()
//test12()//launch中的this只是当前launch
//test13()
//test14() //验证cancelchild
//test15() // 先取消上一个任务,在进行下一个任务
//test16()
test17()
fun test17() {
// 任务一个一个排队执行
// val controlledRunner = ControlledRunner<String>() // 放到公共区
val controlledRunner = ControlledRunner<String>() // 排队
MainScope().launch {
var num: Int = 0
repeat(10) {
num++
controlledRunner.joinPreviousOrRun {
for (i in 0..10) {
log("$num controlledRunner.cancelPreviousThenRun $i")
delay(300)
}
""
}
delay(600)
}
}
}
fun test16() {
// 任务一个一个排队执行
// val controlledRunner = ControlledRunner<String>() // 放到公共区
val singleRunner = SingleRunner() // 排队
MainScope().launch {
repeat(10) {
singleRunner.afterPrevious {
for (i in 0..10) {
log("controlledRunner.cancelPreviousThenRun $i")
delay(300)
}
}
}
}
}
fun test15() {
// 下一个任务, 先取消前一个任务
val controlledRunner = ControlledRunner<String>() // 放到公共区
MainScope().launch {
repeat(10) {
controlledRunner.cancelPreviousThenRun {
for (i in 0..1000) {
log("controlledRunner.cancelPreviousThenRun $i")
delay(300)
}
""
}
}
}
}
fun test14() { // 验证cancelchild
vm.viewModelScope.launch {
val child1 = launch {
for (i in 0..1000) {
log("launch $i")
delay(800)
}
}
val child2 = async {
for (i in 0..1000) {
log("async $i")
delay(600)
}
}
for (i in 0..100000) {
delay(200)
log("viewModelScope $i")
if (i == 10) {
this.coroutineContext.job.cancelChildren()
// child1.cancel()
// child2.cancel()
break
}
}
}.invokeOnCompletion {
log("invokeOnCompletion $it")
}
}
fun test13() { // 验证取消job,子协程是否能取消
// 结论:子协程也被取消
vm.viewModelScope.launch {
launch {
for (i in 0..1000) {
log("launch $i")
delay(800)
}
}.invokeOnCompletion {
log("launch $it")
}
async {
for (i in 0..1000) {
log("async $i")
delay(600)
}
}
for (i in 0..1000) {
log("viewModelScope $i")
delay(600)
if (i == 10) this.cancel()
}
}.invokeOnCompletion {
log("invokeOnCompletion $it")
}
}
fun test12() { // 取消正在运行的协程
// launch中的this只是当前launch
val job: Job = vm.viewModelScope.launch {
for (i in 0..1000) {
log("viewModelScope $i")
delay(500)
if (i == 10) this.cancel()
}
}
vm.viewModelScope.launch {
for (i in 0..1000) {
log("**** $i")
delay(800)
}
}
}
fun test11() {
vm.viewModelScope.launch {
for (i in 0..10) {
delay(6)
log("viewModelScope ")
}
}
for (i in 0..10) {
log("test11 $i")
Thread.sleep(6000)
}
}
fun test10() { //验证在corouteinScope中,任意一个子失败,就会停止
log("test10 start")
MainScope().launch(CoroutineExceptionHandler { c: CoroutineContext, t: Throwable ->
// log("$t")
log("$c")
}) {
log("MainScope start")
coroutineScope {
launch {
log("luanch start")
delay(2000)
throw IllegalArgumentException("")
}
async {
log("async start")
for (i in 0..100) {
delay(500)
log("async 001 - $i")
}
log("async end")
}
}
log("MainScope end")
}
log("test10 end")
}
fun test9() {
MainScope().launch {
coroutineScope {
launch {
for (i in 0..100) {
delay(1000)
log("launch 001 - $i")
}
}
async {
for (i in 0..100) {
delay(500)
log("async 001 - $i")
}
}
}
}
}
fun test8() {
// 揭露, async两个交叉运行
log("test8 start")
CoroutineScope(Dispatchers.Main).launch { // 作用域
log("CoroutineScope start")
async {
for (i in 0..100) {
delay(1000)
log("async 001 - $i")
}
}
async {
for (i in 0..100) {
delay(500)
log("async 002 - $i")
}
}
// a1.await()
log("CoroutineScope end")
}
log("test8 end")
}
fun test7() { // 测试异步会不会影响协程
for (i in 0..50) {
log("$i start -> ")
Thread.sleep(100)
if (i == 26) Thread.sleep(5000)
}
for (i in 0..100) {
CoroutineScope(Dispatchers.Main).launch {
delay((200 * i).toLong())
log("$i")
}
}
Thread {
Thread.sleep(5000) //休眠5秒
runOnUiThread {
// 干别的事情会不会被影响到
for (i in 0..50) {
log("$i end -> ")
Thread.sleep(100)
if (i == 26) Thread.sleep(5000)
}
}
}.start()
}
fun test6() { // 测试在main中创建多个协程
// 结论,协程不会阻塞主线程的工作
// 干别的事情会不会被影响到
for (i in 0..50) {
log("$i start -> ")
Thread.sleep(100)
if (i == 26) Thread.sleep(5000)
}
for (i in 0..100) {
CoroutineScope(Dispatchers.Main).launch {
delay((200 * i).toLong())
log("$i")
}
}
// 干别的事情会不会被影响到
for (i in 0..50) {
log("$i end -> ")
Thread.sleep(100)
if (i == 26) Thread.sleep(5000)
}
}
fun test5() { // 测试在main中创建多个协程
for (i in 0..100) {
CoroutineScope(Dispatchers.Main).launch {
log("$i")
delay(800)
}
}
// 干别的事情会不会被影响到
for (i in 0..100) {
log("$i other -> ")
Thread.sleep(200)
}
}
fun test4() {
// 结论: Main中是在main里面
// 其他: 单独创建单独的线程
runBlocking {
log("runBlocking")
}
GlobalScope.launch {
log("GlobalScope.launch")
}
val launch: Job = CoroutineScope(SupervisorJob()).launch {
log("CoroutineScope(SupervisorJob()).launch")
}
CoroutineScope(Job()).launch {
log("CoroutineScope(Job()).launch")
}
CoroutineScope(Dispatchers.IO).launch {
log("CoroutineScope(Dispatchers.IO).launch")
}
CoroutineScope(Dispatchers.Main).launch {
log("CoroutineScope(Dispatchers.Main).launch")
}
CoroutineScope(Dispatchers.Default).launch {
log("CoroutineScope(Dispatchers.Default).launch")
}
vm.viewModelScope.launch {
log("vm.viewModelScope.launch")
}
vm.viewModelScope.launch(Dispatchers.IO) {
log("vm.viewModelScope.launch(Dispatchers.IO)")
}
}
val vm: VM by viewModels()
fun log(msg: String) {
Log.e("SCOPE", "$msg - ${Thread.currentThread().name} \t ${System.currentTimeMillis()}")
}
val map = hashMapOf<String, Int>()
fun HashMap<String, Int>.put2(key: String) {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
val num = map.getOrDefault(key, 0) + 1
map[key] = num
}
}
fun HashMap<String, Int>.pritln() {
val currentTimeMillis = System.currentTimeMillis()
map.forEach { (key, num) ->
log("$currentTimeMillis \t\t $num \t $key")
}
}
fun foreach(i: Int) {
// 验证 : launch也是会复用线程池
if (i > 10) return
GlobalScope.launch {
foreach(i + 1)
map.put2(Thread.currentThread().name)
log("$i GlobalScope")
delay(800)
map.pritln()
}
// 统计个数
}
fun test() {
log("test()")
GlobalScope.launch {
log("1. GlobalScope")
GlobalScope.launch {
log("2. GlobalScope")
GlobalScope.launch {
log("3. GlobalScope")
GlobalScope.launch {
log("4. GlobalScope")
GlobalScope.launch {
log("5. GlobalScope")
GlobalScope.launch {
log("6. GlobalScope")
}
}
}
}
}
withContext(Dispatchers.Main) {
log("withContext Main")
}
withContext(Dispatchers.IO) {
log("withContext IO")
}
}
vm.viewModelScope.launch {
log("viewModelScope")
}
}
}
class VM : ViewModel()
ConcurrencyHelpers.kt
package com.joyy.androidproject
import kotlinx.coroutines.CoroutineStart.LAZY
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.yield
import java.util.concurrent.atomic.AtomicReference
import kotlin.DeprecationLevel.ERROR
/**
* A helper class to execute tasks sequentially in coroutines.
*
* Calling [afterPrevious] will always ensure that all previously requested work completes prior to
* calling the block passed. Any future calls to [afterPrevious] while the current block is running
* will wait for the current block to complete before starting.
*/
class SingleRunner {
/**
* A coroutine mutex implements a lock that may only be taken by one coroutine at a time.
*/
private val mutex = Mutex()
/**
* Ensure that the block will only be executed after all previous work has completed.
*
* When several coroutines call afterPrevious at the same time, they will queue up in the order
* that they call afterPrevious. Then, one coroutine will enter the block at a time.
*
* In the following example, only one save operation (user or song) will be executing at a time.
*
* ```
* class UserAndSongSaver {
* val singleRunner = SingleRunner()
*
* fun saveUser(user: User) {
* singleRunner.afterPrevious { api.post(user) }
* }
*
* fun saveSong(song: Song) {
* singleRunner.afterPrevious { api.post(song) }
* }
* }
* ```
*
* @param block the code to run after previous work is complete.
*/
suspend fun <T> afterPrevious(block: suspend () -> T): T {
// Before running the block, ensure that no other blocks are running by taking a lock on the
// mutex.
// The mutex will be released automatically when we return.
// If any other block were already running when we get here, it will wait for it to complete
// before entering the `withLock` block.
mutex.withLock {
return block()
}
}
}
/**
* A controlled runner decides what to do when new tasks are run.
*
* Note: This implementation is for example only. It will not work in the presence of
* multi-threading and is not safe to call from Dispatchers.IO or Dispatchers.Default. In
* real code use the thread-safe implementation of [ControlledRunner] code listed below.
*
* By calling [joinPreviousOrRun], the new task will be discarded and the result of the previous task
* will be returned. This is useful when you want to ensure that a network request to the same
* resource does not flood.
*
* By calling [cancelPreviousThenRun], the old task will *always* be cancelled and then the new task will
* be run. This is useful in situations where a new event implies that the previous work is no
* longer relevant such as sorting or filtering a list.
*/
@Deprecated(
"This code is not thread-safe and should not be used. Use " +
"the ControlledRunner implementation below instead.", level = ERROR
)
class ControlledRunnerExampleImplementation<T> {
private var activeTask: Deferred<T>? = null
/**
* Cancel all previous tasks before calling block.
*
* When several coroutines call cancelPreviousThenRun at the same time, only one will run and
* the others will be cancelled.
*/
@Deprecated(
"This code is not thread-safe. Use ControlledRunner below instead.",
level = ERROR
)
suspend fun cancelPreviousThenRun(block: suspend () -> T): T {
// If there is an activeTask, cancel it because it's result is no longer needed
//
// By waiting for the cancellation to complete with `cancelAndJoin` we know that activeTask
// has stopped executing before continuing.
activeTask?.cancelAndJoin()
// use a coroutineScope builder to safely start a new coroutine in a suspend function
return coroutineScope {
// create a new task to call the block
val newTask = async {
block()
}
// when the new task completes, reset activeTask to null
// this will be called by cancellation as well as normal completion
newTask.invokeOnCompletion {
activeTask = null
}
// save the newTask to activeTask, then wait for it to complete and return the result
activeTask = newTask
newTask.await()
}
}
/**
* Don't run the new block if a previous block is running, instead wait for the previous block
* and return it's result.
*
* When several coroutines call joinPreviousOrRun at the same time, only one will run and
* the others will return the result from the winner.
*/
@Deprecated(
"This code is not thread-safe. Use ControlledRunner below instead.",
level = ERROR
)
suspend fun joinPreviousOrRun(block: suspend () -> T): T {
// if there is an activeTask, return it's result and don't run the block
activeTask?.let {
return it.await()
}
// use a coroutineScope builder to safely start a new coroutine in a suspend function
return coroutineScope {
// create a new task to call the block
val newTask = async {
block()
}
// when the task completes, reset activeTask to null
newTask.invokeOnCompletion {
activeTask = null
}
// save newTask to activeTask, then wait for it to complete and return the result
activeTask = newTask
newTask.await()
}
}
}
/**
* A controlled runner decides what to do when new tasks are run.
*
* By calling [joinPreviousOrRun], the new task will be discarded and the result of the previous task
* will be returned. This is useful when you want to ensure that a network request to the same
* resource does not flood.
*
* By calling [cancelPreviousThenRun], the old task will *always* be cancelled and then the new task will
* be run. This is useful in situations where a new event implies that the previous work is no
* longer relevant such as sorting or filtering a list.
*/
class ControlledRunner<T> {
/**
* The currently active task.
*
* This uses an atomic reference to ensure that it's safe to update activeTask on both
* Dispatchers.Default and Dispatchers.Main which will execute coroutines on multiple threads at
* the same time.
*/
private val activeTask = AtomicReference<Deferred<T>?>(null)
/**
* Cancel all previous tasks before calling block.
*
* When several coroutines call cancelPreviousThenRun at the same time, only one will run and
* the others will be cancelled.
*
* In the following example, only one sort operation will execute and any previous sorts will be
* cancelled.
*
* ```
* class Products {
* val controlledRunner = ControlledRunner<Product>()
*
* fun sortAscending(): List<Product> {
* return controlledRunner.cancelPreviousThenRun { dao.loadSortedAscending() }
* }
*
* fun sortDescending(): List<Product> {
* return controlledRunner.cancelPreviousThenRun { dao.loadSortedDescending() }
* }
* }
* ```
*
* @param block the code to run after previous work is cancelled.
* @return the result of block, if this call was not cancelled prior to returning.
*/
suspend fun cancelPreviousThenRun(block: suspend () -> T): T {
// fast path: if we already know about an active task, just cancel it right away.
activeTask.get()?.cancelAndJoin()
return coroutineScope {
// Create a new coroutine, but don't start it until it's decided that this block should
// execute. In the code below, calling await() on newTask will cause this coroutine to
// start.
val newTask = async(start = LAZY) {
block()
}
// When newTask completes, ensure that it resets activeTask to null (if it was the
// current activeTask).
newTask.invokeOnCompletion {
activeTask.compareAndSet(newTask, null)
}
// Kotlin ensures that we only set result once since it's a val, even though it's set
// inside the while(true) loop.
val result: T
// Loop until we are sure that newTask is ready to execute (all previous tasks are
// cancelled)
while (true) {
if (!activeTask.compareAndSet(null, newTask)) {
// some other task started before newTask got set to activeTask, so see if it's
// still running when we call get() here. If so, we can cancel it.
// we will always start the loop again to see if we can set activeTask before
// starting newTask.
activeTask.get()?.cancelAndJoin()
// yield here to avoid a possible tight loop on a single threaded dispatcher
yield()
} else {
// happy path - we set activeTask so we are ready to run newTask
result = newTask.await()
break
}
}
// Kotlin ensures that the above loop always sets result exactly once, so we can return
// it here!
result
}
}
/**
* Don't run the new block if a previous block is running, instead wait for the previous block
* and return it's result.
*
* When several coroutines call jonPreviousOrRun at the same time, only one will run and
* the others will return the result from the winner.
*
* In the following example, only one network operation will execute at a time and any other
* requests will return the result from the "in flight" request.
*
* ```
* class Products {
* val controlledRunner = ControlledRunner<Product>()
*
* fun fetchProducts(): List<Product> {
* return controlledRunner.joinPreviousOrRun {
* val results = api.fetchProducts()
* dao.insert(results)
* results
* }
* }
* }
* ```
*
* @param block the code to run if and only if no other task is currently running
* @return the result of block, or if another task was running the result of that task instead.
*/
suspend fun joinPreviousOrRun(block: suspend () -> T): T {
// fast path: if there's already an active task, just wait for it and return the result
activeTask.get()?.let {
return it.await()
}
return coroutineScope {
// Create a new coroutine, but don't start it until it's decided that this block should
// execute. In the code below, calling await() on newTask will cause this coroutine to
// start.
val newTask = async(start = LAZY) {
block()
}
newTask.invokeOnCompletion {
activeTask.compareAndSet(newTask, null)
}
// Kotlin ensures that we only set result once since it's a val, even though it's set
// inside the while(true) loop.
val result: T
// Loop until we figure out if we need to run newTask, or if there is a task that's
// already running we can join.
while (true) {
if (!activeTask.compareAndSet(null, newTask)) {
// some other task started before newTask got set to activeTask, so see if it's
// still running when we call get() here. There is a chance that it's already
// been completed before the call to get, in which case we need to start the
// loop over and try again.
val currentTask = activeTask.get()
if (currentTask != null) {
// happy path - we found the other task so use that one instead of newTask
newTask.cancel()
result = currentTask.await()
break
} else {
// retry path - the other task completed before we could get it, loop to try
// setting activeTask again.
// call yield here in case we're executing on a single threaded dispatcher
// like Dispatchers.Main to allow other work to happen.
yield()
}
} else {
// happy path - we were able to set activeTask, so start newTask and return its
// result
result = newTask.await()
break
}
}
// Kotlin ensures that the above loop always sets result exactly once, so we can return
// it here!
result
}
}
}
其它
// 1. MainScope 是异步的
// 2. 两个launch之间是同步执行的
// 3.
private fun test() {
logI { "Coroutines Step start" }
val mainScope = MainScope()
mainScope.launch {
logI { "Coroutines MainScope Step 1" }
delay(2000)
logI { "Coroutines MainScope Step 2" }
}
mainScope.launch {
logI { "Coroutines MainScope1 Step 1" }
withContext(Dispatchers.IO) {
logI { "Coroutines MainScope1 Step IO 2" }
delay(2000)
logI { "Coroutines MainScope1 Step IO 3" }
}
logI { "Coroutines MainScope1 Step 2" }
withContext(Dispatchers.Main) {
logI { "Coroutines MainScope1 Step Main 1" }
}
logI { "Coroutines MainScope1 Step 2" }
}
logI { "Coroutines Step end" }
}