0
点赞
收藏
分享

微信扫一扫

kotlin 使用channel 提高协程的使用效率


kotlin 使用channel 提高协程的使用效率

suspend fun loadBucketVendorData(billingCycle: YearMonth) {
val channels = listOf(
AwsBillService.beijingBillName(billingCycle) to AwsBillService.internalBillPath,
AwsBillService.ningXiaBillName(billingCycle) to AwsBillService.internalBillPath,
AwsBillService.overseasBillName(billingCycle) to AwsBillService.overseaBillPath
).mapNotNull { (billName, billPath) ->
val billFile = Path.of(billPath.toString(), "$billName.csv.zip")
if (billFile.notExists()) {
logger.warn("$billName not found,skip load aws bill")
null
} else {
val channel = Channel<List<AwsBill>>(1000)
val scope = CoroutineScope(Job() + Dispatchers.Default)
scope.launch {
var offset = 0
do {
measureTimeMillis {
val usages = scope.async {
awsBillService.queryAwsBills(
zipFilePath = billFile.toString(),
querySql = "select * from $billName limit $loadSize offset $offset"
)
.filterNot { it.recordId.isNullOrBlank() || it.recordId == "0" || it.usageQuantity.isNullOrBlank() }
.toList()
}
val bills = usages.await()
if (bills.isEmpty()) { // empty result, finish
channel.close()
return@launch
}
channel.send(bills)
offset += loadSize
}.also {
logger.debug("load csv zip data $loadSize use $it ms.")
logger.debug("select * from $billName limit $loadSize offset $offset")
}
} while (true)
}
channel to scope
}
}
measureTimeMillis {
val consumerJob = GlobalScope.launch {
channels.forEach { (channel, scope) ->
channel.consumeEach {
val usages = it
.asSequence()
.map { map2AwsUsage(it) }
.toList()
saveAll(usages)
}
scope.cancel()
}
}
consumerJob.join()
}.also {
logger.debug("saved takes $it ms.")
}
}


举报

相关推荐

0 条评论