0
点赞
收藏
分享

微信扫一扫

如何在android开发中使用Kotlin Flow(一)


Kotlin 的Flow可以对数据流进行建模,类似LiveData、RxJava的数据流。

Flow也是用观察者模式实现的。

观察者模式包括了可观察对象(Observable,生产者、发射者、源这些称呼都是指可观察对象,可以被观察)、观察对象(Observers,订阅者、收集者、接收者这些称呼都是指观察对象,可以观察Observable)。当有什么状态(数据)变化时,Observable会自动通知Observers。

Observable(可观察者)可以是hot(热)或者是cold(冷)的。

  • Hot Observables 可以在没有Observers(观察者)观察它们时,照常发送(发射)数据。
  • Cold Observables只可以在有Observers(观察者)观察它们时,才发送(发射)数据。

而Flow是cold(冷)的,而它发送数据的时机是当有Terminal operator(末端操作符)应用到这个Flow时。

创建Flow

  • flow操作符

fun fibonacci(): Flow<BigInteger> = flow {
var x = BigInteger.ZERO
var y = BigInteger.ONE
while (true) {
emit(x)
x = y.also {
y += x
}
}
}

上面的代码是不会使用flow发射数据的,因为flow是冷流,需要加上末端操作符才能使flow发射数据,如collect是最常见的末端操作符:

fibonacci().take(100).collect { println(it) }

flow操作符的源码是:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}

由此可知flow操作符返回的是SafeFlow,意安全的流。因为flow支持coroutine,所以在flow操作符中不能使用其他coroutine context,在编译时,它会做这个检查。下面这个例子就是在flow内使用了不同的协程上下文,导致报错:
​​​Caused by: java.lang.IllegalStateException: Flow invariant is violated:​

flow {
emit(1) // Ok
withContext(IO) {
emit(2) // 报错
}
}.collect {
println(it)
}

如果想在改变coroutine context,可以使用flowOn操作符:

withContext(Dispatchers.Main) {
val singleValue = flow {
this.emit(1)
} // 如果之间没有指定它的上下文那么就用下面指定的IO上下文
.map { ... } // 在 IO中执行
.flowOn(Dispatchers.IO) // 它指定的上下文只会作用在前面没有指定上下文的操作符上,对后面的操作符没有影响
.filter { ... } // 在Default上下文中运行,因为下面的flowOn指定了
.flowOn(Dispatchers.Default)
.single() // 在 Main上下文中执行
}

这个coroutine context是flow执行的上下文。flowOn实际上是这样实现的,所以它只是把flow里lamda部分的代码放到一个新的context里去执行,方式如下:

如何在android开发中使用Kotlin Flow(一)_rxjava

  • flowOf操作符
    这个操作符其实只是在flow操作符基础上帮我们做了数据发射的工作:

如何在android开发中使用Kotlin Flow(一)_数据_02

如何在android开发中使用Kotlin Flow(一)_kotlin_03

如何在android开发中使用Kotlin Flow(一)_操作符_04


但是跟flow操作符不同的是,flowOf操作符不会检查执行的上下文,flow则会检查。

例子:

flowOf(1, 2, 3).collect{
println(it)
}

  • asFlow操作符,与flowOf是一样的,只是对列表操作方便一些而已:

如何在android开发中使用Kotlin Flow(一)_rxjava_05

listOf(1, 2, 3, 5).asFlow().collect{
println(it)
}

flow是冷的

所以我们要让它流动起来,就要使用一个末端操作符,最常见的就是collect,应用到这个flow上。 上面的例子我们已有展示。除了collect还有一些其他末端操作符。

​toCollection()​

这个操作符接受一个​​MutableCollection​​实例,Kotlin官方提供了两个toList和toSet。所以参考它们的使用,不难得出toCollection()的使用,即flow发射出来的数据都收集起来,然后一起返回。

如何在android开发中使用Kotlin Flow(一)_操作符_06


例子:

val myList = flow{
emit(1)
emit(2)
emit(5)
}.toList()

println(myList)

​first()和last()​

它们分别是取第一个和取后一个。例子:

val firstItem = flow{
emit(1)
emit(2)
emit(5)
}.first()
val lastItem = flow{
emit(1)
emit(2)
emit(5)
}.last()

​single()​

这个操作符,有点特别,它只希望flow只发射一个数据,发多一个就报错。

val singleValue = flow {
this.emit(1)
}
.map {
it + 2
}
.flowOn(IO)
.filter { it > 0 }
.flowOn(Default)
.single()
println(singleValue)

​reduce() 和 fold()​

这两个操作符都是将flow处理成一个值。

val singleValue = flow {
this.emit(1)
this.emit(2)
this.emit(3)
}
.reduce { accumulator, value ->
accumulator + value
}

println(singleValue) // 6

fold()有一个初始值,而reduce()则没有:

val singleValue = flow {
this.emit(1)
this.emit(2)
this.emit(3)
}
.fold(4) { acc: Int, value: Int ->
acc + value
}

println(singleValue) // 10

还记得我们说过,flow是在协程的基础上的技术,所以这些操作符都是是suspend函数,是可以被暂时挂起的,必须在协程中调用这些操作符。

​launchIn()​

这个末端操作符通常与​​onEach​​​, ​​onCompletion​​​, ​​catch​​操作符一起使用。

如何在android开发中使用Kotlin Flow(一)_rxjava_07


使用这个操作符的效果与下面的代码是等价的:

scope.launch { flow.collect() }

这个操作符会返回一个Job实例,意味着我们可以使用这个job来取消当前flow的数据采集工作,避免它工作太长时间,把当前的coroutine scope挂起了。

例子:

val job = flow {
this.emit(1)
this.emit(2)
kotlinx.coroutines.delay(3000)
this.emit(3)
}
.onEach { value -> println("$value ***") }
.onCompletion { cause -> println(cause) }
.catch { cause -> println("Exception: $cause") }
.launchIn(this)

job.cancel()


举报

相关推荐

0 条评论