0
点赞
收藏
分享

微信扫一扫

Kotlin中协程的Channel通道(一)

Python百事通 2022-02-18 阅读 33

Kotlin中协程的Channel通道(一)


Channel-通道

  • Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信。
 @Test
    fun `test know channel`() = runBlocking<Unit> {
        val channel = Channel<Int>()
        //生产者
        val producer = GlobalScope.launch {
            var i = 0
            while (true) {
                delay(1000)
                channel.send(++i)
                println("send $i")
            }
        }
        //消费者
        val consumer = GlobalScope.launch {
            while (true) {
                val element = channel.receive()
                println("receive $element")
            }
        }
        joinAll(producer, consumer)
    }

Channel的容量

  • Channel实际上就是一个队列,队列中一定存在缓冲区,那么一旦这个缓冲区满了,并且也一直没有人调用receive并取走函数,
    send就需要挂起。故意让接收端的节奏放慢,发现send总是会挂起,直到receive之后才会继续往下执行。
@Test
    fun `test know channel2`() = runBlocking<Unit> {
        val channel = Channel<Int>()
        //生产者
        val producer = GlobalScope.launch {
            var i = 0
            while (true) {
                delay(1000)
                channel.send(++i)
                println("send $i")
            }
        }
        //消费者
        val consumer = GlobalScope.launch {
            while (true) {
                delay(2000)
                val element = channel.receive()
                println("receive $element")
            }
        }
        joinAll(producer, consumer)
    }

迭代Channel

  • Channel本身确实像序列,所以我们在读取的时候可以直接获取一个Channel的iterator.
 @Test
    fun `test iterate channel`() = runBlocking<Unit> {
        val channel = Channel<Int>(Channel.UNLIMITED)
        //生产者
        val producer = GlobalScope.launch {
            for (x in 1..5) {
                channel.send(x * x)
                println("send ${x * x}")
            }
        }
        //消费者
        val consumer = GlobalScope.launch {
            /*val iterator = channel.iterator()
            while (iterator.hasNext()){
                val element = iterator.next()
                println("receive $element")
                delay(2000)
            }*/
            for (element in channel) {
                println("receive $element")
                delay(2000)
            }
        }
        joinAll(producer, consumer)
    }

produce与actor

  • 构造生产者与消费者的便捷方法
  • 我们可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel, 其他协程就可以用这个Channel来接收数据了。
    反过来,我们可以用actor启动一个消费者协程。
 @Test
    fun `test fast producer channel`() = runBlocking<Unit> {
        val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce<Int> {
            repeat(100) {
                delay(1000)
                send(it)
            }
        }
        val consumer = GlobalScope.launch {
            for (i in receiveChannel) {
                println("received: $i")
            }
        }
        consumer.join()
    }

    @Test
    fun `test fast consumer channel`() = runBlocking<Unit> {
        val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
            while (true) {
                val element = receive()
                println(element)
            }
        }
        val producer = GlobalScope.launch {
            for (i in 0..3) {
                sendChannel.send(i)
            }
        }
        producer.join()
    }

Channel的关闭

  • produce和actor返回的Channel都会随着对应的协程执行完毕而关闭,也正是这样,Channel才被称之为热数据流。
  • 对于一个Channel,如果我们调用了它的close方法,它会立即停止接收新元素,也就是说这时它的isClosedForSend会立即返回true。而由于Channel缓冲区的存在,这时候可能还有些元素没有被处理完,因此要等所有的元素都被读取之后 isclosedForReceive才会返回true。
  • Channel的生命周期最好由主导方来维护,建议由主导的一方实现关闭。
@Test
    fun `test close channel`() = runBlocking<Unit> {
        val channel = Channel<Int>(3)
        //生产者
        val producer = GlobalScope.launch {
            List(3) {
                channel.send(it)
                println("send $it")
            }
            channel.close()
            println(
                """close channel. 
                | -CloseForSend:${channel.isClosedForSend} 
                | -CloseForReceive: ${channel.isClosedForReceive}""".trimMargin()
            )
        }
        //消费者
        val consumer = GlobalScope.launch {
            for (element in channel) {
                println("receive $element")
                delay(1000)
            }
            println(
                """After Consuming. 
                | -CloseForSend:${channel.isClosedForSend} 
                | -CloseForReceive: ${channel.isClosedForReceive}""".trimMargin()
            )
        }
        joinAll(producer, consumer)
    }

BroadcastChannel

  • 前面提到,发送端和接收端在Channel中存在一对多的情形,从数据处理本身来讲,虽然有多个接收端,
    但是同一个元素只会被一个接收端读到,广播则不然,多个接收端不存在互斥行为。
  @Test
    fun `test broadcast`() = runBlocking<Unit> {
//        val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
        val channel = Channel<Int>()
        val broadcastChannel = channel.broadcast(3)
        val producer = GlobalScope.launch {
            List(3) {
                delay(100)
                broadcastChannel.send(it)
            }
            broadcastChannel.close()
        }
        List(3) { index ->
            GlobalScope.launch {
                val receiveChannel = broadcastChannel.openSubscription()
                for (i in receiveChannel) {
                    println("[#$index] received: $i")
                }
            }
        }.joinAll()
    }
举报

相关推荐

0 条评论