Kotlin中协程的Channel通道(二)
channel中select多路复用
- 数据通信系统或计算机网络系统中,传输媒体的带宽或容量往往会大于传输单一信号的需求,为了有效地利用通信线路, 希望一个信道同时传输多路信号,这就是所谓的多路复用技术(Multiplexing)。
复用多个await
- 两个API分别从网络和本地缓存获取数据,期望哪个先返回就先用哪个做展示。
private val cachePath = "E://coroutine.cache"
private val gson = Gson()
data class Response<T>(val value:T, val isLocal:Boolean)
fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO){
delay(1000)//故意延迟
File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
}
fun CoroutineScope.getUserFromRemote(name:String) = async(Dispatchers.IO){
userServiceApi.getUser(name)
}
@Test
fun `test select await`() = runBlocking<Unit> {
GlobalScope.launch {
val localRequest = getUserFromLocal("xxx")
val remoteRequest = getUserFromRemote("yyy")
val userRespone = select<Response<User>> {
localRequest.onAwait{ Response(it, true) }
remoteRequest.onAwait{Response(it, false)}
}
userRespone.value?.let { println(it) }
}.join()
}
复用多个Channel
- 跟await类似,会接收到最快的那个channel消息。
@Test
fun `test select channel`() = runBlocking<Unit> {
val channels = listOf(Channel<Int>(), Channel<Int>())
GlobalScope.launch {
delay(100)
channels[0].send(200)
}
GlobalScope.launch {
delay(50)
channels[1].send(100)
}
val result = select<Int?> {
channels.forEach { channel ->
channel.onReceive { it }
}
}
println(result)
}
SelectClause
- 我们怎么知道哪些事件可以被select呢?其实所有能够被select的事件都是SelectClauseN类型,包括:
1.SelectClause0: 对应事件没有返回值,例如join没有返回值,那么onJoin就是SelectClauseN类型。使用时,
onJoin的参数是一个无参函数
2.SelectClause1:对应事件有返回值,前面的onAwait和onReceive都是此类情况。
3.SelectClause2:对应事件有返回值,此外还需要一个额外的参数,例如:Channel.onSend有两个参数,第一个是Channel数据类型值,表示即将发送的值;第二个就是发送成功时的回调参数。 - 如果我们想要确认挂起函数是否支持select,只需要查看其是否存在对应的SelectClauseN类型可回调即可。
@Test
fun `test selectClause0`() = runBlocking<Unit> {
val job1 = GlobalScope.launch {
delay(100)
println("job 1")
}
val job2 = GlobalScope.launch {
delay(10)
println("job 2")
}
select<Unit> {
job1.onJoin { println("job 1 onJoin") }
job2.onJoin { println("job 1 onJoin") }
}
delay(1000)
}
@Test
fun `test selectClause2`() = runBlocking<Unit> {
val channels = listOf(Channel<Int>(), Channel<Int>())
println(channels)
launch(Dispatchers.IO) {
select<Unit> {
launch {
delay(10)
channels[1].onSend(200) { sentChannel ->
println("sent on $sentChannel")
}
}
launch {
delay(100)
channels[0].onSend(100) { sendChannel ->
println("send on $sendChannel")
}
}
}
}
GlobalScope.launch {
println(channels[0].receive())
}
GlobalScope.launch {
println(channels[1].receive())
}
delay(1000)
}
使用Flow实现多路复用
- 多数情况下,我们可以通过构造合适的Flow来实现多路复用的效果。
@Test
fun `test select flow`() = runBlocking<Unit> {
//函数 ->协程 -> Flow -> Flow合并
val name = "guest"
coroutineScope {
listOf(::getUserFromLocal, ::getUserFromRemote)
.map{ function ->
function.call(name)
}.map{deferred ->
flow{emit(deferred.await())}
}.merge().collect{user -> println(user)}
}
}
channel的并发安全
- 不安全的并发访问
我们使用线程在解决并发问题的时候总是会遇到线程安全的问题,而Java平台上的 Kotlin协程实现免不了存在并发调度的情况,因此线程安全同样值得留意。
@Test
fun `test not safe concurrent`() = runBlocking<Unit> {
var count = 0
List(1000) {
GlobalScope.launch { count++ }
}.joinAll()
println(count)
}
@Test
fun `test safe concurrent`() = runBlocking<Unit> {
var count = AtomicInteger(0)
List(1000) {
GlobalScope.launch { count.incrementAndGet() }
}.joinAll()
println(count)
}
协程的并发工具
- 除了我们在线程中常用的解决并发问题的手段之外,协程框架也提供了一些并发安全的工具,包括:
1.Channel:并发安全的消息通道,我们已经非常熟悉。
2.Mutex:轻量级锁,它的lock和unlock从语义上与线程锁比较类似,之所以轻量是因为它在获取不到锁时不会阻塞线程,而是挂起等待锁的释放。
3.Semaphore:轻量级信号量,信号量可以有多个,协程在获取到信号量后即可执行并发操作。当Semaphore的参数为1时,效果等价于1Mutex.
@Test
fun `test safe concurrent tools`() = runBlocking<Unit> {
var count = 0
val mutex = Mutex()
List(1000) {
GlobalScope.launch {
mutex.withLock {
count++
}
}
}.joinAll()
println(count)
}
@Test
fun `test safe concurrent tools2`() = runBlocking<Unit> {
var count = 0
val semaphore = Semaphore(1)
List(1000) {
GlobalScope.launch {
semaphore.withPermit {
count++
}
}
}.joinAll()
println(count)
}
避免访问外部可变状态
- 编写函数时要求它不得访问外部状态,只能基于参数做运算,通过返回值提供运算结果。
@Test
fun `test avoid access outer variable`() = runBlocking<Unit> {
var count = 0
val result = count + List(1000) {
GlobalScope.async { 1 }
}.map { it.await() }.sum()
println(result)
}