0
点赞
收藏
分享

微信扫一扫

Kotlin中协程的Channel通道(二)

钵仔糕的波波仔 2022-02-18 阅读 39

Kotlin中协程的Channel通道(二)


channel中select多路复用

  • 数据通信系统或计算机网络系统中,传输媒体的带宽或容量往往会大于传输单一信号的需求,为了有效地利用通信线路, 希望一个信道同时传输多路信号,这就是所谓的多路复用技术(Multiplexing)。

复用多个await

  • 两个API分别从网络和本地缓存获取数据,期望哪个先返回就先用哪个做展示。
private val cachePath = "E://coroutine.cache"
    private val gson = Gson()
    data class Response<T>(val value:T, val isLocal:Boolean)
    fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO){
    delay(1000)//故意延迟
    File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
    }
    fun CoroutineScope.getUserFromRemote(name:String) = async(Dispatchers.IO){
    userServiceApi.getUser(name)
    }

    @Test
    fun `test select await`() = runBlocking<Unit> {
        GlobalScope.launch {
            val localRequest = getUserFromLocal("xxx")
            val remoteRequest = getUserFromRemote("yyy")
            val userRespone = select<Response<User>> {
                localRequest.onAwait{ Response(it, true) }
                remoteRequest.onAwait{Response(it, false)}
            }
            userRespone.value?.let { println(it) }
        }.join()
    }

复用多个Channel

  • 跟await类似,会接收到最快的那个channel消息。
 @Test
    fun `test select channel`() = runBlocking<Unit> {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        GlobalScope.launch {
            delay(100)
            channels[0].send(200)
        }
        GlobalScope.launch {
            delay(50)
            channels[1].send(100)
        }
        val result = select<Int?> {
            channels.forEach { channel ->
                channel.onReceive { it }
            }
        }
        println(result)
    }

SelectClause

  • 我们怎么知道哪些事件可以被select呢?其实所有能够被select的事件都是SelectClauseN类型,包括:
    1.SelectClause0: 对应事件没有返回值,例如join没有返回值,那么onJoin就是SelectClauseN类型。使用时,
    onJoin的参数是一个无参函数
    2.SelectClause1:对应事件有返回值,前面的onAwait和onReceive都是此类情况。
    3.SelectClause2:对应事件有返回值,此外还需要一个额外的参数,例如:Channel.onSend有两个参数,第一个是Channel数据类型值,表示即将发送的值;第二个就是发送成功时的回调参数。
  • 如果我们想要确认挂起函数是否支持select,只需要查看其是否存在对应的SelectClauseN类型可回调即可。
@Test
    fun `test selectClause0`() = runBlocking<Unit> {
        val job1 = GlobalScope.launch {
            delay(100)
            println("job 1")
        }
        val job2 = GlobalScope.launch {
            delay(10)
            println("job 2")
        }
        select<Unit> {
            job1.onJoin { println("job 1 onJoin") }
            job2.onJoin { println("job 1 onJoin") }
        }
        delay(1000)
    }

    @Test
    fun `test selectClause2`() = runBlocking<Unit> {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        println(channels)

        launch(Dispatchers.IO) {
            select<Unit> {
                launch {
                    delay(10)
                    channels[1].onSend(200) { sentChannel ->
                        println("sent on $sentChannel")
                    }
                }
                launch {
                    delay(100)
                    channels[0].onSend(100) { sendChannel ->
                        println("send on $sendChannel")
                    }
                }
            }
        }
        GlobalScope.launch {
            println(channels[0].receive())
        }
        GlobalScope.launch {
            println(channels[1].receive())
        }
        delay(1000)
    }

使用Flow实现多路复用

  • 多数情况下,我们可以通过构造合适的Flow来实现多路复用的效果。
@Test
    fun `test select flow`() = runBlocking<Unit> {
    //函数 ->协程 -> Flow -> Flow合并
    val name = "guest"
    coroutineScope {
    listOf(::getUserFromLocal, ::getUserFromRemote)
    .map{ function ->
    function.call(name)
    }.map{deferred ->
    flow{emit(deferred.await())}
    }.merge().collect{user -> println(user)}
    }
    }

channel的并发安全

  • 不安全的并发访问
    我们使用线程在解决并发问题的时候总是会遇到线程安全的问题,而Java平台上的 Kotlin协程实现免不了存在并发调度的情况,因此线程安全同样值得留意。
@Test
    fun `test not safe concurrent`() = runBlocking<Unit> {
        var count = 0
        List(1000) {
            GlobalScope.launch { count++ }
        }.joinAll()
        println(count)
    }

    @Test
    fun `test safe concurrent`() = runBlocking<Unit> {
        var count = AtomicInteger(0)
        List(1000) {
            GlobalScope.launch { count.incrementAndGet() }
        }.joinAll()
        println(count)
    }

协程的并发工具

  • 除了我们在线程中常用的解决并发问题的手段之外,协程框架也提供了一些并发安全的工具,包括:
    1.Channel:并发安全的消息通道,我们已经非常熟悉。
    2.Mutex:轻量级锁,它的lock和unlock从语义上与线程锁比较类似,之所以轻量是因为它在获取不到锁时不会阻塞线程,而是挂起等待锁的释放。
    3.Semaphore:轻量级信号量,信号量可以有多个,协程在获取到信号量后即可执行并发操作。当Semaphore的参数为1时,效果等价于1Mutex.
 @Test
    fun `test safe concurrent tools`() = runBlocking<Unit> {
        var count = 0
        val mutex = Mutex()
        List(1000) {
            GlobalScope.launch {
                mutex.withLock {
                    count++
                }
            }
        }.joinAll()
        println(count)
    }

    @Test
    fun `test safe concurrent tools2`() = runBlocking<Unit> {
        var count = 0
        val semaphore = Semaphore(1)
        List(1000) {
            GlobalScope.launch {
                semaphore.withPermit {
                    count++
                }
            }
        }.joinAll()
        println(count)
    }

避免访问外部可变状态

  • 编写函数时要求它不得访问外部状态,只能基于参数做运算,通过返回值提供运算结果。
  @Test
    fun `test avoid access outer variable`() = runBlocking<Unit> {
        var count = 0
        val result = count + List(1000) {
            GlobalScope.async { 1 }
        }.map { it.await() }.sum()
        println(result)
    }
举报

相关推荐

0 条评论