0
点赞
收藏
分享

微信扫一扫

Kotlin中协程的Flow异步流(二)

janedaring 2022-02-18 阅读 56

Kotlin中协程的Flow异步流(二)


flow的背压

  • buffer(),并发运行流中发射元素的代码。
  • conflate(), 合并发射项,不对每个值进行处理。
  • collectLatest(),取消并重新发射最后一个值。
  • 当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer函数显式地请求缓冲而不改变执行上下文。
 fun simpleFlow8() = flow<Int> {
        for (i in 1..3) {
            delay(100)
            emit(i)
            println("Emitting $i ${Thread.currentThread().name}")
        }
    }

    @Test
    fun `test flow back pressure`() = runBlocking<Unit> {
        val time = measureTimeMillis {
            simpleFlow8()
//                .flowOn(Dispatchers.Default) //切换线程
//                .buffer(50) //增加缓存
//                .conflate()
                .collectLatest { value ->
//                .collect { value ->
                    delay(300) //处理这个元素消耗300ms
                    println("Collected $value ${Thread.currentThread().name}")
                }
        }
        println("Collected in $time ms")
    }

flow的操作符

过渡流操作符

  • 可以使用操作符转换流,就像使用集合与序列一样。
  • 过渡操作符应用于上游流,并返回下游流。
  • 这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数。
  • 它运行的速度很快,返回新的转换流的定义。
 suspend fun performRequest(request: Int): String {
        delay(1000)
        return "respone $request"
    }

    @Test
    fun `test transform flow operator`() = runBlocking<Unit> {
        (1..3).asFlow()
            .map { request -> performRequest(request) }
            .collect { value -> println(value) }
        (1..3).asFlow().transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
            .collect { value -> println(value) }
    }

    fun numbers() = flow<Int> {
        try {
            emit(1)
            emit(2)
            println("This line will not execute")
            emit(3)
        } finally {
            println("Finally in numbers")
        }
    }

    //限长操作符
    @Test
    fun `test limit length operator`() = runBlocking<Unit> {
        numbers().take(2) //只收集两个元素
            .collect { value -> println(value) }
    }

末端流操作符

  • 末端操作符是在流上用于启动流收集的挂起函数。collect是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符:
    1.转化为各种集合,例如toList与toSet.
    2.获取第一个(first)值与确保流发射单个(single)值的操作符。
    3.使用reduce与fold将流规约到单个值。
@Test
    fun `test terminal operator`() = runBlocking<Unit> {
        val sum = (1..5).asFlow()
            .map { it * it }
            .reduce { a, b -> a + b }
        println(sum)
    }

组合多个流

  • 就像Kotlin标准库中的Sequence.zip扩展函数一样,流拥有一个zip操作符用于组合两个流中的相关值。
 @Test
    fun `test zip`() = runBlocking<Unit> {
        val numbers = (1..3).asFlow().onEach { delay(300) }
        val strs = flowOf("One", "Two", "Three").onEach { delay(400) }
        val startTime = System.currentTimeMillis()
        numbers.zip(strs) { a, b -> "$a ->$b" }.collect {
            println("$it at ${System.currentTimeMillis() - startTime} ms from start")
        }
    }

展平流

  • 流表示异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不用的展平模式,因此,存在一系列的流展平操作符:
    1.flatMapConcat连接模式,
    2.flatMapMerge合并模式,
    3.flatMapLatest最新展平模式。
 fun requestFlow(i: Int) = flow<String> {
        emit("$i:First")
        delay(500)
        emit("$i:Second")
    }

    @Test
    fun `test flatMapConcat`() = runBlocking<Unit> {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow().onEach { delay(100) }
//            .map { requestFlow(it) }
            .flatMapConcat { requestFlow(it) }
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
    }

    @Test
    fun `test flatMapMerge`() = runBlocking<Unit> {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow().onEach { delay(100) }
//            .map { requestFlow(it) }
            .flatMapMerge { requestFlow(it) }
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
    }

    @Test
    fun `test flatMapLatest`() = runBlocking<Unit> {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow().onEach { delay(100) }
//            .map { requestFlow(it) }
            .flatMapLatest { requestFlow(it) }
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
    }

流的异常处理

  • 当运算符中的发射器或代码抛出异常时,有几种处理异常的方法:
  1. try/catch块
  2. catch函数
 fun simpleFlow10() = flow<Int> {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i)
        }
    }

    @Test
    fun `test flow exception`() = runBlocking<Unit> {
        try {
            simpleFlow10().collect { value ->
                println(value)
                check(value <= 1) { "Collected $value" }
            }
        } catch (e: Throwable) {
            println("Caught $e")
        }
    }

    @Test
    fun `test flow exception2`() = runBlocking<Unit> {
        flow {
            throw ArithmeticException("Div 0")
            emit(1)
        }.catch { e: Throwable ->
            println("Caught $e")
            emit(10)
        }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }
    }

流的完成

  • 当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。
  1. 命令式finally块
  2. onCompletion声明式处理
fun simpleFlow11() = (1..3).asFlow()

    @Test
    fun `test flow complete in finally`() = runBlocking<Unit> {
        try {
            simpleFlow11().collect { println(it) }
        } finally {
            println("Done")
        }
    }

    fun simpleFlow12() = flow<Int> {
        emit(1)
        throw RuntimeException()
    }

    @Test
    fun `test flow complete in onCompletion`() = runBlocking<Unit> {
        /*simpleFlow11()
            .onCompletion { println("Done") }
            .collect { println(it) }*/
        simpleFlow12()
            .onCompletion { exception ->
                if (exception != null) println("Flow completed exceptionall")
            }
            .catch { exception -> println("Caught $exception") }//捕获上游异常
            .collect { println(it) }
        //捕获下游收集时的异常
        simpleFlow12().onCompletion { exception ->
            if (exception != null) println("Flow completed exceptionall")
        } .catch { exception -> println("Caught $exception") }//捕获上游异常
            .collect { value ->
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    }

举报

相关推荐

0 条评论