0
点赞
收藏
分享

微信扫一扫

【Kotlin 协程】Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )


文章目录

  • ​​一、背压概念​​
  • ​​二、使用缓冲处理背压问题​​
  • ​​三、使用 flowOn 处理背压问题​​
  • ​​四、从提高收集元素效率方向解决背压问题​​
  • ​​1、Flow#conflate 代码示例​​
  • ​​2、Flow#collectLatest 代码示例​​






一、背压概念


" 背压 " 概念 指的是 数据 受到 与 流动方向 一致的压力 ,

数据 生产者 的 生产效率 大于 数据 消费者 的 消费效率 , 就会产生 背压 ;



处理背压问题 , 有 2 种方案 :

  • 降低 数据 生产者 的生产效率 ;
  • 提高 数据 消费者 的消费效率 ;


背压代码示例 : 以 100 ms间隔发射元素 , 以 200 ms 间隔收集元素 , 发射元素的效率 高于 收集元素的效率, 此时会产生背压 ;

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)

// 将主线程包装成协程
runBlocking {
val delta = measureTimeMillis {
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().collect {
delay(200)
println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
}
}
println("收集元素耗时 $delta ms")
}
}

suspend fun flowEmit() = flow<Int> {
// 以 100 ms 的间隔发射元素
for (i in 0..5) {
delay(100)
emit(i)
println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
}
}
}

执行结果 : 收集元素的耗时总共耗费了 2284 ms ;

23:37:49.496 System.out   kim.hsl.coroutine     I  收集元素 0 , 当前线程 main
23:37:49.496 System.out kim.hsl.coroutine I 发射元素 0 , 当前线程 main
23:37:49.878 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:37:49.879 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:37:50.259 System.out kim.hsl.coroutine I 收集元素 2 , 当前线程 main
23:37:50.259 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:37:50.600 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:37:50.600 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:37:50.973 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:37:50.973 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
23:37:51.352 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:37:51.353 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 main
23:37:51.353 System.out kim.hsl.coroutine I 收集元素耗时 2284 ms

【Kotlin 协程】Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )_背压






二、使用缓冲处理背压问题


调用 Flow#buffer 函数 , 为 收集元素 添加一个缓冲 , 可以指定缓冲区个数 ;



代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)

// 将主线程包装成协程
runBlocking {
val delta = measureTimeMillis {
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().buffer(10).collect {
delay(200)
println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
}
}
println("收集元素耗时 $delta ms")
}
}

suspend fun flowEmit() = flow<Int> {
// 以 100 ms 的间隔发射元素
for (i in 0..5) {
delay(100)
emit(i)
println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
}
}
}

执行结果 : 发射元素后 , 将发射的元素缓存起来 , 然后慢慢接收元素 ;

23:39:41.401 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 main
23:39:41.543 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:39:41.644 System.out kim.hsl.coroutine I 收集元素 0 , 当前线程 main
23:39:41.646 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:39:41.760 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:39:41.877 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:39:41.879 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
23:39:42.022 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 main
23:39:42.120 System.out kim.hsl.coroutine I 收集元素 2 , 当前线程 main
23:39:42.364 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:39:42.572 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:39:42.814 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:39:42.821 System.out kim.hsl.coroutine I 收集元素耗时 1601 ms

【Kotlin 协程】Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )_Flow_02






三、使用 flowOn 处理背压问题


上述 发射元素 和 收集元素 都是在同一个线程中执行的 , 这两个操作可以并行执行 , 即使用 flowOn 指定收集元素的线程 ;

使用 flowOn 更改了协程上下文 , 使得 发射元素 与 收集元素 在不同的线程中并行执行 ;



代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)

// 将主线程包装成协程
runBlocking {
val delta = measureTimeMillis {
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().flowOn(Dispatchers.Default).collect {
delay(200)
println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
}
}
println("收集元素耗时 $delta ms")
}
}

suspend fun flowEmit() = flow<Int> {
// 以 100 ms 的间隔发射元素
for (i in 0..5) {
delay(100)
emit(i)
println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
}
}
}

执行结果 :

23:45:19.675 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 DefaultDispatcher-worker-1
23:45:19.817 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 DefaultDispatcher-worker-1
23:45:19.918 System.out kim.hsl.coroutine I 收集元素 0 , 当前线程 main
23:45:19.921 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 DefaultDispatcher-worker-1
23:45:20.046 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 DefaultDispatcher-worker-1
23:45:20.124 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:45:20.186 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 DefaultDispatcher-worker-1
23:45:20.292 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 DefaultDispatcher-worker-1
23:45:20.333 System.out kim.hsl.coroutine I 收集元素 2 , 当前线程 main
23:45:20.548 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:45:20.790 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:45:21.000 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:45:21.007 System.out kim.hsl.coroutine I 收集元素耗时 1507 ms

【Kotlin 协程】Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )_android_03






四、从提高收集元素效率方向解决背压问题


从提高收集元素效率方向解决背压问题 :

  • 调用 Flow#conflate 函数 , 合并发射元素项
  • 调用 Flow#collectLatest 函数 , 取消并重新发射最后一个元素


1、Flow#conflate 代码示例



代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)

// 将主线程包装成协程
runBlocking {
val delta = measureTimeMillis {
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().conflate().collect {
delay(200)
println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
}
}
println("收集元素耗时 $delta ms")
}
}

suspend fun flowEmit() = flow<Int> {
// 以 100 ms 的间隔发射元素
for (i in 0..5) {
delay(100)
emit(i)
println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
}
}
}

执行结果 : 发射了 6 个元素 , 但是只接收到了 5 个元素 , 元素 2 被过滤掉了 ;

23:49:21.720 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 main
23:49:21.855 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:49:21.924 System.out kim.hsl.coroutine I 收集元素 0 , 当前线程 main
23:49:21.992 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:49:22.129 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:49:22.130 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:49:22.270 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
23:49:22.333 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:49:22.374 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 main
23:49:22.564 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:49:22.805 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:49:22.814 System.out kim.hsl.coroutine I 收集元素耗时 1277 ms

【Kotlin 协程】Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )_协程_04



2、Flow#collectLatest 代码示例



代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)

// 将主线程包装成协程
runBlocking {
val delta = measureTimeMillis {
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().collectLatest {
delay(200)
println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
}
}
println("收集元素耗时 $delta ms")
}
}

suspend fun flowEmit() = flow<Int> {
// 以 100 ms 的间隔发射元素
for (i in 0..5) {
delay(100)
emit(i)
println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
}
}
}

执行结果 : 只接收了最后一个元素 , 前几个元素没有接收 ;

23:53:01.328 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 main
23:53:01.461 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:53:01.603 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:53:01.712 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:53:01.857 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
23:53:02.004 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 main
23:53:02.246 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:53:02.255 System.out kim.hsl.coroutine I 收集元素耗时 1119 ms

【Kotlin 协程】Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )_kotlin_05


举报

相关推荐

0 条评论