五. Flow 异常处理
Flow 可以使用传统的 try...catch 来捕获异常:
fun main() = runBlocking {
flow {
emit(1)
try {
throw RuntimeException()
} catch (e: Exception) {
e.stackTrace
}
}.onCompletion { println("Done") }
.collect { println(it) }
}
另外,也可以使用 catch 操作符来捕获异常。
5.1 catch 操作符
上一篇文章Flow VS RxJava2曾讲述过 onCompletion 操作符。
但是 onCompletion 不能捕获异常,只能用于判断是否有异常。
fun main() = runBlocking {
flow {
emit(1)
throw RuntimeException()
}.onCompletion { cause ->
if (cause != null)
println("Flow completed exceptionally")
else
println("Done")
}.collect { println(it) }
}
执行结果:
1
Flow completed exceptionally
Exception in thread "main" java.lang.RuntimeException
......
catch 操作符可以捕获来自上游的异常
fun main() = runBlocking {
flow {
emit(1)
throw RuntimeException()
}
.onCompletion { cause ->
if (cause != null)
println("Flow completed exceptionally")
else
println("Done")
}
.catch{ println("catch exception") }
.collect { println(it) }
}
执行结果:
1
Flow completed exceptionally
catch exception
上面的代码如果把 onCompletion、catch 交换一下位置,则 catch 操作符捕获到异常后,不会影响到下游。因此,onCompletion 操作符不再打印"Flow completed exceptionally"
fun main() = runBlocking {
flow {
emit(1)
throw RuntimeException()
}
.catch{ println("catch exception") }
.onCompletion { cause ->
if (cause != null)
println("Flow completed exceptionally")
else
println("Done")
}
.collect { println(it) }
}
执行结果:
1
catch exception
Done
但是,catch 只是中间操作符不能捕获下游的异常,类似 collect 内的异常。
对于下游的异常,可以多次使用 catch 操作符来解决。
对于 collect 内的异常,除了传统的 try...catch 之外,还可以借助 onEach 操作符。把业务逻辑放到 onEach 操作符内,在 onEach 之后是 catch 操作符,最后是 collect()。
fun main() = runBlocking<Unit> {
flow {
......
}
.onEach {
......
}
.catch { ... }
.collect()
}
5.2 retry、retryWhen 操作符
像 RxJava 一样,Flow 也有重试的操作符。
如果上游遇到了异常,并使用了 retry 操作符,则 retry 会让 Flow 最多重试 retries 指定的次数。
public fun <T> Flow<T>.retry(
retries: Long = Long.MAX_VALUE,
predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
require(retries > 0) { "Expected positive amount of retries, but had $retries" }
return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}
例如,下面打印了三次"Emitting 1"、"Emitting 2",最后两次是通过 retry 操作符打印出来的。
fun main() = runBlocking {
(1..5).asFlow().onEach {
if (it == 3) throw RuntimeException("Error on $it")
}.retry(2) {
if (it is RuntimeException) {
return@retry true
}
false
}
.onEach { println("Emitting $it") }
.catch { it.printStackTrace() }
.collect()
}
执行结果:
Emitting 1
Emitting 2
Emitting 1
Emitting 2
Emitting 1
Emitting 2
java.lang.RuntimeException: Error on 3
......
retry 操作符最终调用的是 retryWhen 操作符。下面的代码跟刚才的执行结果一致:
fun main() = runBlocking {
(1..5).asFlow().onEach {
if (it == 3) throw RuntimeException("Error on $it")
}
.onEach { println("Emitting $it") }
.retryWhen { cause, attempt ->
attempt < 2
}
.catch { it.printStackTrace() }
.collect()
}
因为 retryWhen 操作符的参数是谓词,当谓词返回 true 时才会进行重试。谓词还接收一个 attempt 作为参数表示尝试的次数,该次数是从0开始的。
六. Flow Lifecycle
RxJava 的 do 操作符能够监听 Observables 的生命周期的各个阶段。
Flow 并没有多那么丰富的操作符来监听其生命周期的各个阶段,目前只有 onStart、onCompletion 来监听 Flow 的创建和结束。
fun main() = runBlocking {
(1..5).asFlow().onEach {
if (it == 3) throw RuntimeException("Error on $it")
}
.onStart { println("Starting flow") }
.onEach { println("On each $it") }
.catch { println("Exception : ${it.message}") }
.onCompletion { println("Flow completed") }
.collect()
}
执行结果:
Starting flow
On each 1
On each 2
Flow completed
Exception : Error on 3
例举他们的使用场景:
比如,在 Android 开发中使用 Flow 创建网络请求时,通过 onStart 操作符调用 loading 动画以及网络请求结束后通过 onCompletion 操作符取消动画。
再比如,在借助这些操作符做一些日志的打印。
fun <T> Flow<T>.log(opName: String) = onStart {
println("Loading $opName")
}.onEach {
println("Loaded $opName : $it")
}.onCompletion { maybeErr ->
maybeErr?.let {
println("Error $opName: $it")
} ?: println("Completed $opName")
}
该系列的相关文章:
Kotlin Coroutines Flow 系列(一) Flow 基本使用
Kotlin Coroutines Flow 系列(二) Flow VS RxJava2
Kotlin Coroutines Flow 系列(四) 线程操作
Kotlin Coroutines Flow 系列(五) 其他的操作符