下面的代码是在Android studio最新版北极狐版本中运行并输出的debug结果信息。想在新建的Android项目中调用kotlin协程,需要先导入依赖:
dependencies {
implementation "org.jetbrains.anko:anko-common:$anko_version"
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
implementation 'androidx.core:core-ktx:1.3.2'
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.1"
implementation "androidx.constraintlayout:constraintlayout:2.0.4"
implementation "androidx.lifecycle:lifecycle-extensions:2.2.0"
implementation "androidx.appcompat:appcompat:1.2.0"
implementation "androidx.legacy:legacy-support-v4:1.0.0"
implementation "androidx.recyclerview:recyclerview:1.2.0"
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.3.1"
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.3.1"
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.3.1"
implementation "androidx.collection:collection-ktx:1.1.0"
implementation "androidx.activity:activity-ktx:1.2.3"
implementation "androidx.fragment:fragment-ktx:1.3.4"
implementation "androidx.localbroadcastmanager:localbroadcastmanager:1.0.0"
implementation 'androidx.appcompat:appcompat:1.2.0'
implementation 'com.google.android.material:material:1.3.0'
implementation 'androidx.constraintlayout:constraintlayout:2.0.4'
testImplementation 'junit:junit:4.+'
androidTestImplementation 'androidx.test.ext:junit:1.1.2'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.3.0'
}
在MainActivity的布局文件中加入一个TextView按钮,点击这个按钮时,调用我们想要调试的方法、协程、Flow等函数、操作。为了只看单个方法的输出结果,其他方法的调用被我注释掉了。你可以逐一放开运行调用,已查看结果。很多英文注释的理解我已经写在方法上。
package com.xw.inlinefunctiondemo
import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import android.view.View
import android.widget.TextView
import kotlinx.coroutines.*;
import kotlinx.coroutines.flow.*
/**
* 来自LQS的文章,转载请说明出处!
* 常用方法:
* yield(i):向正在生成的迭代器生成一个值并挂起,直到请求下一个值;
*/
class MainActivity : AppCompatActivity() {
private val TAG="MainActivity"
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val tips = findViewById<TextView>(R.id.tips);
tips.let {
it.text = "欢迎:开始"
it.textSize = 20.0f
it.setTextColor(0x70aa45cc);
it.setOnClickListener(View.OnClickListener {
/**协程的启动与返回值**/
//start();
/**创建简单的序列**/
//testSimpleSequence()
/**序列的创建方式之一:通过函数创建序列**/
//createSequence()
/**通过yield()、yieldAll()函数和generateSequence(7)函数方法创建一个序列**/
//createSequenceTestYield()
/**通过⼀个示例来看 Iterable 与 Sequence 之间的区别**/
//operateSequence()
/**通过⼀个示例来看 Iterable 与 Sequence 之间的区别**/
//operateIterable()
/**创建一个简单的Flow流**/
//startSimpleFlow()
/**验证Flow流是冷的直到被收集的时候才启动**/
//theFlowIsColdStart()
/**Flow流的取消**/
//cancelFlowStart()
/**流拥有的组合操作符zip**/
//zipStart()
/**流的值通过变换函数transform使用每个流最新发射的数据进行组合产生**/
//combineStart()
/**展平流操作符:flattenConcat()**/
//flattenConcatStart()
/**展平流操作符:flattenConcat() 加入延迟处理后的**/
//flattenConcatStart1()
/**展平流操作符:flattenMerge(),当concurrency=1,输出结果与加入延迟处理flattenConcat()完全相同**/
//flattenMergeStart1()
/**展平流操作符:flattenMerge()**/
//flattenMergeStart2()
/**
* 展平流操作符:flatMapConcat
* 每个值都会触发对另⼀个值序列的请求。
* 在调用flatMapConcat后,collect函数在收集新值之前会等待flatMapConcat{}内部的flow完成。
**/
//flatMapConcatStart1()
/**
* 展平流操作符:flatMapMerge
* 并发收集所有传⼊的流,并将它们的值合并到⼀个单独的流,以便尽快的发射值。
**/
//flatMapMergeStart()
/**“最新”展平模式**/
//flatMapLatestStart1()
//flatMapLatestStart2()
/**
* 末端操作符launchIn
* 使⽤流表示来⾃⼀些源的异步事件是很简单的,如果我们在 onEach之后使⽤collect末端操作符,那么后⾯的代码会⼀直等待(阻塞)直⾄流被收集。
* 这时使⽤launchIn替换collect我们可以在单独的协程中启动流的收集,这样就可以⽴即继续进⼀步执⾏代码**/
//launchInStart()
/**
* cancellable操作符
* 为⽅便起⻅,流构建器对每个发射值执⾏附加的ensureActive检测以进⾏取消。 这意味着从flow { ... } 发出的繁忙循环是可以取消的.
* 在协程处于繁忙循环的情况下,必须明确检测是否取消。 可以添加 .onEach {currentCoroutineContext().ensureActive() } ,
* 但是这⾥提供了⼀个现成的cancellable操作符来执⾏此操作。
*/
cancellableStart()
});
}
}
//launch{}启动一个协程
private fun start(){
GlobalScope.launch {
val withStr = withContext(Dispatchers.Default){
"a"
}
val awaitStr = async {
"b"
}
Log.d("test","withStr :$withStr")
Log.d("test","awaitStr :${awaitStr.await()}")
}
}
//通过函数创建序列
private fun createSequence(){
/**
* public fun <T : Any> generateSequence(seed: T?, nextFunction: (T) -> T?): Sequence<T>
* 要使⽤ generateSequence() 创建有限序列,请提供⼀个函数,该函数在需要的最后⼀个元素之后返回 null
* **/
val oddNumbersLessThan10 = generateSequence(1) {
if (it < 8) {
it + 2
} else {
null
}
}
/**count(): Returns the number of elements in this sequence.**/
println(oddNumbersLessThan10.count())
val sequenceIterator = oddNumbersLessThan10.toList().iterator()
while (sequenceIterator.hasNext()) {
println(sequenceIterator.next())
}
Log.d(TAG, "value :${oddNumbersLessThan10.count()}")
/**
* 输出结果:
* 通过函数创建序列,显示指定的generateSequence(1)函数参数seed包含在序列内
* I/System.out: 1
* I/System.out: 3
* I/System.out: 5
* I/System.out: 7
* I/System.out: 9
*/
}
private fun createSequenceTestYield(){
val oddNumbers = sequence {
yield(1)
yieldAll(listOf(3, 5))
yieldAll(generateSequence(7) {
if(it<12){it + 2}else{null}
})
}
println(oddNumbers.toList())
/**
* 输出结果:
* com.xw.inlinefunctiondemo I/System.out: [1, 3, 5, 7, 9, 11, 13]
*/
}
/**
* 通过⼀个示例来看 Iterable 与 Sequence 之间的区别
* 代码过滤⻓于三个字符的单词,并输出前四个单词的⻓度
* 迭代器Iterable实例
*/
private fun operateIterable(){
val words = "The quick brown fox jumps over the lazy dog".split(" ")
val lengthsList = words.filter { println("filter: $it"); it.length > 3 }
.map { println("length: ${it.length}"); it.length }
.take(4)
println("Lengths of first 4 words longer than 3 chars:")
println(lengthsList)
/**
* I/System.out: filter: The
* I/System.out: filter: quick
* I/System.out: filter: brown
* I/System.out: filter: fox
* I/System.out: filter: jumps
* I/System.out: filter: over
* I/System.out: filter: the
* I/System.out: filter: lazy
* I/System.out: filter: dog
* I/System.out: length: 5
* I/System.out: length: 5
* I/System.out: length: 4
* I/System.out: length: 4
* I/System.out: Lengths of first 4 words longer than 3 chars:
* I/System.out: [5, 5, 5, 4]
*/
}
/**
* 通过⼀个示例来看 Iterable 与 Sequence 之间的区别
* 代码过滤⻓于三个字符的单词,并输出前四个单词的⻓度
* 序列Sequence实例:
*/
private fun operateSequence(){
val words = "The quick brown fox jumps over the lazy dog".split(" ")
// 将列表转换为序列
val wordsSequence = words.asSequence()
val lengthsSequence = wordsSequence.filter { println("filter: $it"); it.length>3}
.map { println("length: ${it.length}"); it.length }
.take(4)
println("Lengths of first 4 words longer than 3 chars")
// 末端操作:以列表形式获取结果。
println(lengthsSequence.toList())
/**
* I/System.out: Lengths of first 4 words longer than 3 chars
* I/System.out: filter: The
* I/System.out: filter: quick
* I/System.out: length: 5
* I/System.out: filter: brown
* I/System.out: length: 5
* I/System.out: filter: fox
* I/System.out: filter: jumps
* I/System.out: length: 5
* I/System.out: filter: over
* I/System.out: length: 4
* I/System.out: [5, 5, 5, 4]
*/
}
/**
* sequence():构建一个序列,缓慢地一个接一个地生成值。
* public fun <T> sequence(@BuilderInference block: suspend SequenceScope<T>.() -> Unit): Sequence<T>
* = Sequence { iterator(block) }
* 在sequence()方法中是不可使用delay挂起函数来做延时的,只能使用Thread.sleep,因为sequence接收的是一个SequenceScope的扩展函数,而在SequenceScope类上使用了RestrictsSuspension注解。
* 此注解标记的类和接口在用作扩展挂起函数的接收器时受到限制,只能调用特定接收器上的其他成员或扩展挂起函数,并且不能调用任意的挂起函数。
*/
private fun createSimpleSequence(): Sequence<Int> = sequence {
for (i in 1..3) {
//不可使用delay挂起函数来做延时的,只能使用Thread.sleep;
Thread.sleep(100)
//yield(i):向正在生成的迭代器生成一个值并挂起,直到请求下一个值;
yield(i)
}
}
fun testSimpleSequence() {
createSimpleSequence().forEach { value ->
Log.d(TAG, "value :${value}")
}
}
/**================================Flow流================================**/
/** 创建简单的Flow流
* public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
* = SafeFlow(block)
* **/
private fun createSimpleFlow(): Flow<Int> = flow { // 流构建器
for (i in 1..3) {
delay(100) // 假装我们在这⾥做了⼀些有⽤的事情
emit(i) // 发送下⼀个值
}
}
/**
* 这段代码在不阻塞主线程的情况下每等待 100 毫秒打印⼀个数字。
* 在主线程中运⾏⼀个 单独的协程每 100 毫秒打印⼀次 “I'm not blocked” 已经经过了验证。
*/
private fun startSimpleFlow(){
runBlocking {
// 启动另外一个新的并发的协程以验证主线程并未阻塞
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// 收集这个流
createSimpleFlow().collect { value -> println(value) }
/**
* 输出结果:
* I/System.out: I'm not blocked 1
* I/System.out: 1
* I/System.out: I'm not blocked 2
* I/System.out: 2
* I/System.out: I'm not blocked 3
* I/System.out: 3
*/
}
}
/**Flow流是冷的,直到流被收集的时候才运⾏**/
private fun flowIsCold(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
/**
* Flow是⼀种类似于序列的冷流 — 这段flow构建器中的代码直到流被收集的时候才运⾏
*/
private fun theFlowIsColdStart(){
runBlocking {
println("Calling flowIsCold() function...")
val flow = flowIsCold()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
/**
* 输出结果:
* I/System.out: Calling flowIsCold() function...
* I/System.out: Calling collect...
* I/System.out: Flow started 注意:Flow流是冷的直到调用collect{}收集时才运行
* I/System.out: 1
* I/System.out: 2
* I/System.out: 3
* I/System.out: Calling collect again...
* I/System.out: Flow started
* I/System.out: 1
* I/System.out: 2
* I/System.out: 3
*/
}
/**
* 超时取消Flow流
*/
private fun cancelFlow(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
/**
* 在 250 毫秒后超时后,自动取消Flow流
* 流采⽤与协程同样的协作取消。
* 像往常⼀样,流的收集可以在当流在⼀个可取消的挂起函数中挂起的时候取消(例如 delay)。
* 以下示例展示了当 withTimeoutOrNull 块中代码在运⾏的时候流是如何在超时的情况下取消并停⽌执⾏其代码的
*/
private fun cancelFlowStart(){
runBlocking {
withTimeoutOrNull(250) { // 在 250 毫秒后超时
cancelFlow().collect { value -> println(value) }
}
println("Done")
}
/**
* 输出结果:流在发射到2的时候超时取消了
* I/System.out: Emitting 1
* I/System.out: 1
* I/System.out: Emitting 2
* I/System.out: 2
* I/System.out: Done
*/
}
/**
* 流Flow拥有⼀个 zip 操作符⽤于组合两个流中的相关值:
* public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
* = zipImpl(this, other, transform)
*/
private fun zipStart(){
runBlocking {
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) { a, b -> "$a -> $b" } // 使⽤“zip”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
/**
* 输出结果:
* I/System.out: 1 -> one at 419 ms from start
* I/System.out: 2 -> two at 820 ms from start
* I/System.out: 3 -> three at 1223 ms from start
*/
}
}
/**
* Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow.
* 返回一个流,该流的值是使用transform函数通过组合每个流最近发出的值而生成的。
* public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
* combineInternal(arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) })
* }
*/
private fun combineStart(){
runBlocking {
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.combine(strs) { a, b -> "$a -> $b" } // 使⽤“zip”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
/**
* 输出结果:
* I/System.out: 1 -> one at 431 ms from start
* I/System.out: 2 -> one at 635 ms from start
* I/System.out: 2 -> two at 834 ms from start
* I/System.out: 3 -> two at 938 ms from start
* I/System.out: 3 -> three at 1238 ms from start
*/
}
}
/*************************展平流************************
* 第一种分类概述
* flattenConcat:顺序的收集流,等价于flattenMerge(1)
* /
* flatten
* \
* flattenMerge:flattenMerge(concurrency),当concurrency>1时并发收集流。
*
*
* flatMapConcat:flatMapConcat是由map(transform)和flattenConcat()操作符实现。在调用flatMapConcat后,collect函数在收集新值之前会等待flatMapConcat内部的flow完成。
* /
* flatMap
* \
* flatMapMerge:flatMapMerge 由 map()、flattenMerge(concurrency) 操作符实现。flattenMerge():操作符将给定的流展平为单个流,并在流的数量上设置[concurrency]限制同时收集的流,
* 当[concurrency]=1时,此运算符与[flattenConcat]相同,当concurrency>1时并发收集流。
*
* 第二:也可以按照是否并发收集流进行归类:流展平操作符:flatMapMerge、flattenMerge
* flattenMerge:接收concurrency参数。解释见上。
* /
* Merge
* \
* flatMapMerge:接收concurrency参数。解释见上。
*
* 展平模式是并发收集所有传⼊的流,并将它们的值合并到⼀个单独的流,以便尽快的发射值。它由flattenMerge与flatMapMerge操作符实现。
* 它们都接收可选的⽤于限制并发收集的流的个数的concurrency参数(默认情况下,它等于DEFAULT_CONCURRENCY)。
* */
/**流表示异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另⼀个值序列(多个值)的请求。
* ⽐如说,我们可以拥有下⾯这样⼀个返回间隔500毫秒的两个字符串流的函数requestFlow(i:Int):Flow<String>:
* 这个requestFlow的函数类型:suspend (value: T) -> Flow<R>
*/
private fun requestFlow(i:Int):Flow<String> = flow {
emit("$i:First")
delay(500)
emit("$i:Second")
}
/**
* 将给定流(flows)按顺序展平为单个流(a single flow),而不交错嵌套流。此flattenConcat操作符方法在概念上与“flattenMerge(concurrency = 1)”完全相同,但实现速度更快。
* 内部流由flattenConcat该操作符"按顺序"收集。
*/
private fun flattenConcatStart(){
runBlocking {
val flowA = (1..5).asFlow()
val flowB = flowOf("one", "two", "three","four","five")
flowOf(flowA,flowB).flattenConcat().collect{ println("flattenConcat()-> $it") }
}
/**
* 顺序收集流的结果,无交差、嵌套
* I/System.out: flattenConcat()-> 1
* I/System.out: flattenConcat()-> 2
* I/System.out: flattenConcat()-> 3
* I/System.out: flattenConcat()-> 4
* I/System.out: flattenConcat()-> 5
* I/System.out: flattenConcat()-> one
* I/System.out: flattenConcat()-> two
* I/System.out: flattenConcat()-> three
* I/System.out: flattenConcat()-> four
* I/System.out: flattenConcat()-> five
*/
}
private fun flattenConcatStart1(){
runBlocking {
val flowA = (1..5).asFlow().onEach { delay(100) }
val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }
flowOf(flowA,flowB).flattenConcat().collect{ println("flattenConcat()-> $it") }
}
/**
* 加入延迟处理后的流的收集结果顺序
* I/System.out: flattenConcat()-> 1
* I/System.out: flattenConcat()-> 2
* I/System.out: flattenConcat()-> 3
* I/System.out: flattenConcat()-> 4
* I/System.out: flattenConcat()-> 5
* I/System.out: flattenConcat()-> one
* I/System.out: flattenConcat()-> two
* I/System.out: flattenConcat()-> three
* I/System.out: flattenConcat()-> four
* I/System.out: flattenConcat()-> five
*/
}
/**
* 为了能更清楚地看到flowA、flowB作为单个流的执行,对上面的的代码稍作改动,使用flattenMerge(concurrency)函数,
* 将concurrency参数设置为1、2来看收集流的效果(顺序收集和并发收集),flattenMerge(1),flattenMerge(2)。
**/
/**将concurrency参数设置为1来看flattenMerge(1)的收集流的效果(顺序收集)**/
private fun flattenMergeStart1(){
runBlocking {
val flowA = (1..5).asFlow().onEach { delay(100) }
val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }
flowOf(flowA,flowB).flattenMerge(1).collect{ println("flattenMerge(1) -> $it") }
}
/**
* I/System.out: flattenMerge(1) -> 1
* I/System.out: flattenMerge(1) -> 2
* I/System.out: flattenMerge(1) -> 3
* I/System.out: flattenMerge(1) -> 4
* I/System.out: flattenMerge(1) -> 5
* I/System.out: flattenMerge(1) -> one
* I/System.out: flattenMerge(1) -> two
* I/System.out: flattenMerge(1) -> three
* I/System.out: flattenMerge(1) -> four
* I/System.out: flattenMerge(1) -> five
*/
}
/**将concurrency参数设置为2来看flattenMerge(2)的收集流的效果(并发收集)**/
private fun flattenMergeStart2(){
runBlocking {
val flowA = (1..5).asFlow().onEach { delay(100) }
val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }
flowOf(flowA,flowB).flattenMerge(2).collect{ println("flattenMerge(2) -> $it") }
}
/**两个两个的进行并发收集
* I/System.out: flattenMerge(2) -> 1
* I/System.out: flattenMerge(2) -> one
* I/System.out: flattenMerge(2) -> 2
* I/System.out: flattenMerge(2) -> 3
* I/System.out: flattenMerge(2) -> two
* I/System.out: flattenMerge(2) -> 4
* I/System.out: flattenMerge(2) -> 5
* I/System.out: flattenMerge(2) -> three
* I/System.out: flattenMerge(2) -> four
* I/System.out: flattenMerge(2) -> five
*/
}
/**
* 每个值都会触发对另⼀个值序列的请求。
* ⼀个包含三个整数的流(1..3),并为每个整数调⽤ requestFlow(i:Int):Flow<String>字符串流,这样就得到了一个包含流的流( Flow<Flow<String>> ),
* 以进⾏下⼀步处理,需要将其进⾏展平为单个流。
*/
/** 流展平操作符:flatMapConcat
* 重点理解:在调用flatMapConcat后,collect函数在收集新值之前会等待flatMapConcat内部的flow完成。
* 我们来看它的源码定义:map(transform).flattenConcat(),它通过map应用transform(返回另一个流)转换原始流发出的元素,然后连接并展平这些流。
*
* Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>) 源码
* @FlowPreview
* public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> = map(transform).flattenConcat()
*
* 那么这个map()函数又是什么呢?
* Flow<T>.map(crossinline transform: suspend (value: T) -> R) 源码:
* Returns a flow containing the results of applying the given [transform] function to each value of the original flow.*
*
* 返回一个流,这个流包含将给定的[transform]函数应用于原始流的每个值的结果。说白了就是对原始流的每个值逐一进行[transform]函数变换操作形成一个新的流。
*
* public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
* return@transform emit(transform(value))
* }
*
* 注意,尽管这个操作符看起来非常熟悉,但我们不鼓励在常规的特定于应用程序的流中使用它。
**/
private fun flatMapConcatStart1(){
runBlocking {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射⼀个数字
/**在调用flatMapConcat后,collect函数在收集新值之前会等待flatMapConcat内部的flow完成。**/
.flatMapConcat { requestFlow(it) }
// 收集并打印
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
/**
* 输出结果:
* I/System.out: 1:First at 118 ms from start
* I/System.out: 1:Second at 620 ms from start
* I/System.out: 2:First at 722 ms from start
* I/System.out: 2:Second at 1223 ms from start
* I/System.out: 3:First at 1325 ms from start
* I/System.out: 3:Second at 1826 ms from start
*/
}
/**与flatMapConcatStart1()相同函数不同写法**/
private fun flatMapConcatStart2(){
runBlocking {
val start =System.currentTimeMillis()
(1..5).asFlow()
.onEach { delay(100) }
/**在调用flatMapConcat后,collect函数在收集新值之前会等待flatMapConcat内部的flow完成。**/
.flatMapConcat {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}.collect {
println("$it at ${System.currentTimeMillis() - start} ms from start")
}
}
}
/**
* 流展平操作符:flatMapMerge、flattenMerge
* 另⼀种展平模式是并发收集所有传⼊的流,并将它们的值合并到⼀个单独的流,以便尽快的发射值。它由flatMapMerge与flattenMerge操作符实现。
* 它们都接收可选的⽤于限制并发收集的流的个数的concurrency参数(默认情况下,它等于DEFAULT_CONCURRENCY)
*
* flatMapMerge 由 map()、flattenMerge() 操作符实现。
* flattenMerge():操作符将给定的流展平为单个流,并在流的数量上设置[concurrency]限制同时收集的流,当[concurrency]=1时,此运算符与[flattenConcat]相同。
* @FlowPreview
* public fun <T, R> Flow<T>.flatMapMerge(
* concurrency: Int = DEFAULT_CONCURRENCY,
* transform: suspend (value: T) -> Flow<R>
* ): Flow<R> = map(transform).flattenMerge(concurrency)
*
***/
private fun flatMapMergeStart(){
runBlocking {
val startTime =System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
/**flatMapMerge 由 map()、flattenMerge() 操作符实现。**/
.flatMapMerge{
flow {
emit("$it:First")
delay(500)
emit("$it:Second")
}
}.collect { value->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
/**输出结果:上游流通过onEach延迟100ms后发射整数1,进入展平流flatMapMerge函数体,执行 emit("$it:First")发射数据流"1:First",
* 执行delay(500)延迟500ms后,执行 emit("$it:Second")发射 "1:Second";
* 上游流通过onEach再延迟100ms后发射整数2,进入展平流flatMapMerge函数体,执行 emit("$it:First")发射数据流"2:First",
* 执行delay(500)延迟500ms后,执行 emit("$it:Second")发射 "2:Second";
* 上游流通过onEach再延迟100ms后发射整数3,进入展平流flatMapMerge函数体,执行 emit("$it:First")发射数据流"3:First",
* 执行delay(500)延迟500ms后,执行 emit("$it:Second")发射 "3:Second";
* 在前300ms(实际上是309ms)内,collect()函数按照时间顺序收集了"1:First"、"2:First"和"3:First",
* 在分别延迟的500ms结束后,按照时间顺序又收集了"1:Second"、"2:Second"和"3:Second"。
* 所以在时间顺序上结果如下:
* I/System.out: 1:First at 105 ms from start
* I/System.out: 2:First at 206 ms from start
* I/System.out: 3:First at 309 ms from start
* I/System.out: 1:Second at 606 ms from start
* I/System.out: 2:Second at 708 ms from start
* I/System.out: 3:Second at 811 ms from start
*/
}
/**
* “最新”展平模式
* 当原始流发出新值时,“transform”块生成的前一个流将被取消。
*/
private fun flatMapLatestStart1(){
runBlocking {
val startTime = System.currentTimeMillis() // 记录开始时间
flow {
emit("a")
delay(100)
emit("b")
}.flatMapLatest { value ->
flow {
emit(value)
delay(500)
emit(value + "_last")
}
}.collect {
println("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}
/**
* 输出结果:
* I/System.out: a at 77 ms from start
* I/System.out: b at 181 ms from start
* I/System.out: b_last at 683 ms from start
**/
}
/**
* “最新”展平模式
* 当原始流发出新值时,“transform”块生成的前一个流将被取消。
*/
private fun flatMapLatestStart2(){
runBlocking<Unit> {
/**收集器的try与catch,这段代码成功的在末端操作符 collect 中捕获了异常**/
try {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射⼀个数字
/**当原始流发出新值时,“transform”块生成的前一个流将被取消。**/
.flatMapLatest {
flow {
emit("$it:First")
delay(500)
emit("$it:Second")
}
}
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
} catch (e: Throwable){
e.printStackTrace()
println("Caught $e")
}
/**
* 输出结果:
* I/System.out: 1:First at 134 ms from start
* I/System.out: 2:First at 243 ms from start
* I/System.out: 3:First at 348 ms from start
* I/System.out: 3:Second at 849 ms from start
*/
}
}
private fun events(): Flow<Int> = (1..20).asFlow().onEach { delay(100) }
private fun collectStart(){
runBlocking<Unit> {
events().onEach { event -> println("Event: $event") }.collect() // <--- 等待流收集
println("Done")
}
/***
* I/System.out: Event: 1
* I/System.out: Event: 2
* I/System.out: Event: 3
* I/System.out: Event: 4
* I/System.out: Event: 5
* I/System.out: Event: 6
* I/System.out: Event: ...
* I/System.out: Event: 19
* I/System.out: Event: 20
* I/System.out: Done
*/
}
/**这时使⽤launchIn替换collect我们可以在单独的协程中启动流的收集,这样就可以⽴即继续进⼀步执⾏代码**/
private fun launchInStart(){
runBlocking<Unit> {
events().onEach { event -> println("Event: $event") }.launchIn(GlobalScope) // <---在开启的新协程中收集流,无等待流收集完成就可以输出println("Done")
println("Done")
}
/****
* I/System.out: Done
* I/System.out: Event: 1
* I/System.out: Event: 2
* I/System.out: Event: 3
* I/System.out: Event: 4
* I/System.out: Event: 5
* I/System.out: Event: 6
* I/System.out: Event: ...
* I/System.out: Event: 19
* I/System.out: Event: 20
*/
}
/**
* 流取消操作符:cancellable()
* 让繁忙的流可以取消
*/
private fun cancellableStart(){
runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
}
}
遇见即是缘分!愿岁月静好,彼此成就心中的那个我--LQS!