Kotlin中协程的Channel通道(一)
Channel-通道
- Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信。
@Test
fun `test know channel`() = runBlocking<Unit> {
val channel = Channel<Int>()
//生产者
val producer = GlobalScope.launch {
var i = 0
while (true) {
delay(1000)
channel.send(++i)
println("send $i")
}
}
//消费者
val consumer = GlobalScope.launch {
while (true) {
val element = channel.receive()
println("receive $element")
}
}
joinAll(producer, consumer)
}
Channel的容量
- Channel实际上就是一个队列,队列中一定存在缓冲区,那么一旦这个缓冲区满了,并且也一直没有人调用receive并取走函数,
send就需要挂起。故意让接收端的节奏放慢,发现send总是会挂起,直到receive之后才会继续往下执行。
@Test
fun `test know channel2`() = runBlocking<Unit> {
val channel = Channel<Int>()
//生产者
val producer = GlobalScope.launch {
var i = 0
while (true) {
delay(1000)
channel.send(++i)
println("send $i")
}
}
//消费者
val consumer = GlobalScope.launch {
while (true) {
delay(2000)
val element = channel.receive()
println("receive $element")
}
}
joinAll(producer, consumer)
}
迭代Channel
- Channel本身确实像序列,所以我们在读取的时候可以直接获取一个Channel的iterator.
@Test
fun `test iterate channel`() = runBlocking<Unit> {
val channel = Channel<Int>(Channel.UNLIMITED)
//生产者
val producer = GlobalScope.launch {
for (x in 1..5) {
channel.send(x * x)
println("send ${x * x}")
}
}
//消费者
val consumer = GlobalScope.launch {
/*val iterator = channel.iterator()
while (iterator.hasNext()){
val element = iterator.next()
println("receive $element")
delay(2000)
}*/
for (element in channel) {
println("receive $element")
delay(2000)
}
}
joinAll(producer, consumer)
}
produce与actor
- 构造生产者与消费者的便捷方法
- 我们可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel, 其他协程就可以用这个Channel来接收数据了。
反过来,我们可以用actor启动一个消费者协程。
@Test
fun `test fast producer channel`() = runBlocking<Unit> {
val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce<Int> {
repeat(100) {
delay(1000)
send(it)
}
}
val consumer = GlobalScope.launch {
for (i in receiveChannel) {
println("received: $i")
}
}
consumer.join()
}
@Test
fun `test fast consumer channel`() = runBlocking<Unit> {
val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
while (true) {
val element = receive()
println(element)
}
}
val producer = GlobalScope.launch {
for (i in 0..3) {
sendChannel.send(i)
}
}
producer.join()
}
Channel的关闭
- produce和actor返回的Channel都会随着对应的协程执行完毕而关闭,也正是这样,Channel才被称之为热数据流。
- 对于一个Channel,如果我们调用了它的close方法,它会立即停止接收新元素,也就是说这时它的isClosedForSend会立即返回true。而由于Channel缓冲区的存在,这时候可能还有些元素没有被处理完,因此要等所有的元素都被读取之后 isclosedForReceive才会返回true。
- Channel的生命周期最好由主导方来维护,建议由主导的一方实现关闭。
@Test
fun `test close channel`() = runBlocking<Unit> {
val channel = Channel<Int>(3)
//生产者
val producer = GlobalScope.launch {
List(3) {
channel.send(it)
println("send $it")
}
channel.close()
println(
"""close channel.
| -CloseForSend:${channel.isClosedForSend}
| -CloseForReceive: ${channel.isClosedForReceive}""".trimMargin()
)
}
//消费者
val consumer = GlobalScope.launch {
for (element in channel) {
println("receive $element")
delay(1000)
}
println(
"""After Consuming.
| -CloseForSend:${channel.isClosedForSend}
| -CloseForReceive: ${channel.isClosedForReceive}""".trimMargin()
)
}
joinAll(producer, consumer)
}
BroadcastChannel
- 前面提到,发送端和接收端在Channel中存在一对多的情形,从数据处理本身来讲,虽然有多个接收端,
但是同一个元素只会被一个接收端读到,广播则不然,多个接收端不存在互斥行为。
@Test
fun `test broadcast`() = runBlocking<Unit> {
// val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
val channel = Channel<Int>()
val broadcastChannel = channel.broadcast(3)
val producer = GlobalScope.launch {
List(3) {
delay(100)
broadcastChannel.send(it)
}
broadcastChannel.close()
}
List(3) { index ->
GlobalScope.launch {
val receiveChannel = broadcastChannel.openSubscription()
for (i in receiveChannel) {
println("[#$index] received: $i")
}
}
}.joinAll()
}