0
点赞
收藏
分享

微信扫一扫

深潜Kotlin协程(十一):协程作用域函数


想象一下,在挂起函数中,你需要同时从两个(或者多个)源头获取数据,在了解如何正确的处理之前,我们先来看看一些次优的方法。

引入协程作用域函数前使用的方法

第一种方法是从挂起函数中调用挂起函数,这个方案问题点是它不是并发的(如果从每一个源头获取数据需要1秒,那么这个函数将需要2秒而不是1秒)。

// 数据是线性准备好的,而非同时准备好
suspend fun getUserProfile(): UserProfileData {
val user = getUserData() // (1 sec)
val notifications = getNotifications() // (1 sec)

return UserProfileData(
user = user,
notifications = notifications,
)
}

要并发地执行两个挂起,最简单的方法是使用 ​​async​​​ 包装它们,然而, ​​async​​​ 需要一个作用域,使用 ​​GlobalScope​​ 并不是一个好主意。

// 不要这么做
suspend fun getUserProfile(): UserProfileData {
val user = GlobalScope.async { getUserData() }
val notifications = GlobalScope.async { getNotifications() }
return UserProfileData(
user = user.await(), // (1 sec)
notifications = notifications.await(),
)
}

​GlobalScope​​​ 只是一个带有 ​​EmptyCoroutineContext​​ 的作用域。

public object GlobalScope : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}

如果在 ​​GlobalScope​​​ 上调用 ​​async​​,则与父协程没有联系,这意味着协程:

  • 不能被取消(如果父协程被取消,​​async​​ 内部仍在运行,因此在它们会造成浪费资源)
  • 不从任何父节点上继承作用域(它将始终运行在默认的调度器上,不会遵守任何来自父协程的上下文)

重要的结论是:

  • 潜在的内存泄漏和 CPU 冗余的使用
  • 用于单元测试协程的工具在这里不起作用,所以测试这个函数是非常困难的

这不是一个很好的解决方案,让我们看看另外一个例子,我们将作用域做为参数传递:

// 不要这么做
suspend fun getUserProfile(
scope: CoroutineScope
): UserProfileData {
val user = scope.async { getUserData() }
val notifications = scope.async { getNotifications() }
return UserProfileData(
user = user.await(), // (1 sec)
notifications = notifications.await(),
)
}

// 或者

// 不要这么做
suspend fun CoroutineScope.getUserProfile(): UserProfileData {
val user = async { getUserData() }
val notifications = async { getNotifications() }
return UserProfileData(
user = user.await(), // (1 sec)
notifications = notifications.await(),
)
}

这一个稍微好一些,因为现在可以取消协程,和适当的进行单元测试。问题是这个解决方案要求将作用域从一个函数传递给另一个函数。此外,这样的函数在作用域中可能会造成不必要的副作用,例如,如果在 ​​async​​​ 中有一个异常,整个作用域将会被关闭(假设它使用的是 ​​Job​​​ 而非 ​​SuipervisorJob​​​)。而且,能够访问作用域的函数很容易在外部滥用这种访问,例如,使用 ​​cancel​​ 方法取消这个作用域,这就是为什么这种方法可能是棘手的和有潜在危险的。

data class Details(val name: String, val followers: Int)
data class Tweet(val text: String)

fun getFollowersNumber(): Int = throw Error("Service exception")

suspend fun getUserName(): String {
delay(500)
return "marcinmoskala"
}

suspend fun getTweets(): List<Tweet> {
return listOf(Tweet("Hello, world"))
}

suspend fun CoroutineScope.getUserDetails(): Details {
val userName = async { getUserName() }
val followersNumber = async { getFollowersNumber() }
return Details(userName.await(), followersNumber.await())
}

fun main() = runBlocking {
val details = try {
getUserDetails()
} catch (e: Error) {
null
}
val tweets = async { getTweets() }
println("User: $details")
println("Tweets: ${tweets.await()}")
}
// Only Exception...

在上面的代码中,即使我们在拉取 userDetails 时出现异常,也希望至少看到 ​​tweets​​​ 会被打印。但不幸的是,​​getFollowersNumber​​​ 上的一个异常中断了 ​​async​​​,它会中断整个作用域(包括 ​​getTweets()​​​ )并结束程序。与之相对的,我们更希望函数在发生异常时仅仅是抛出异常而非中止。是时候介绍我们的英雄: ​​coroutineScope​​ 了。

coroutineScope

​coroutineScope​​ 是一个用于启动作用域的挂起函数。它返回由入参函数产生的值。

suspend fun <R> coroutineScope(
block: suspend CoroutineScope.() -> R
):

与 ​​async​​​ 或者 ​​launch​​​ 不同, ​​coroutineScope​​​ 的函数体就是就地调用的。它会正式的创建一个新的协程,而且它会挂起前一个协程,直到新协程完成。因此它不会启动任何并发任务。看下下面的示例,其中两个 ​​delay​​​ 都挂起了 ​​runBlocking​​。

fun main() = runBlocking(CoroutineName("Main")) {
print("context $coroutineContext job:${coroutineContext[Job]}\n")
val a = coroutineScope {
print("context1: $coroutineContext job:${coroutineContext[Job]}\n")
delay(1000)
10
}
println("a is calculated")
val b = coroutineScope {
print("context2: $coroutineContext job:${coroutineContext[Job]}")
delay(1000)
20
}
println(a) // 10
println(b) // 20
}

//context [CoroutineName(Main), BlockingCoroutine{Active}@5ef04b5, BlockingEventLoop@5f4da5c3] job:BlockingCoroutine{Active}@5ef04b5
// context1: [CoroutineName(Main), ScopeCoroutine{Active}@108c4c35, BlockingEventLoop@5f4da5c3] job:ScopeCoroutine{Active}@108c4c35
// (1 sec)
// a is calculated
// context2: [CoroutineName(Main), ScopeCoroutine{Active}@2957fcb0, BlockingEventLoop@5f4da5c3] job:ScopeCoroutine{Active}@2957fcb010
// (1 sec)
// 10
// 20

​coroutineScope​​​ 提供的是继承了外部作用域的 ​​coroutineContext​​​ 的上下文作用域,但它覆盖了来自父节点上下文的 ​​Job​​​。因此,​​coroutineScope​​ 将和其父节点建立下面这些规则:

  • 从父节点那里继承上下文
  • 在完成自己之前等待所有的子节点
  • 当父节点被取消时,所有子节点也会被取消

下面的示例中,你可以观察到 “After” 将在末尾打印出来,因为 ​​coroutineScope​​​ 只有等待它所有子协程完成之后才会完成自己。此外, ​​CoroutineName​​ 也可以正常的从父节点传递给子节点:

suspend fun longTask() = coroutineScope {
launch {
delay(1000)
val name = coroutineContext[CoroutineName]?.name
println("[$name] Finished task 1")
}
launch {
delay(2000)
val name = coroutineContext[CoroutineName]?.name
println("[$name] Finished task 2")
}
}

fun main() = runBlocking(CoroutineName("Parent")) {
println("Before")
longTask()
println("After")
}

// Before
// (1 sec)
// [Parent] Finished task 1
// (1 sec)
// [Parent] Finished task 2
// After

在下一个代码段中,你可以看到取消是如何工作的,父节点被取消后,未完成的子节点也会被取消。

suspend fun longTask() = coroutineScope {
launch {
delay(1000)
val name = coroutineContext[CoroutineName]?.name
println("[$name] Finished task 1")
}
launch {
delay(2000)
val name = coroutineContext[CoroutineName]?.name
println("[$name] Finished task 2")
}
}

fun main(): Unit = runBlocking {
// job 是 coroutineScope 的父节点
val job = launch(CoroutineName("Parent")) {
longTask()
}
delay(1500)
job.cancel()
}
// [Parent] Finished task 1

与协程构建器不同,如果在 ​​coroutineScope​​​ 或它的任意子协程中有一个异常,它会取消所有其他的子协程并重新抛出它。这就是为什么使用 ​​coroutineScope​​​ 可以修复我们之前的 “Twitter”示例。为了展示同样的异常被重新抛出,我将泛型 Error 更改为一个具体的 ​​ApiException​​。

data class Details(val name: String, val followers: Int)
data class Tweet(val text: String)

class ApiException(
val code: Int,
message: String
) : Throwable(message)

fun getFollowersNumber(): Int = throw ApiException(500, "Service unavailable")

suspend fun getUserName(): String {
delay(500)
return "marcinmoskala"
}

suspend fun getTweets(): List<Tweet> {
return listOf(Tweet("Hello, world"))
}

suspend fun getUserDetails(): Details = coroutineScope {
val userName = async { getUserName() }
val followersNumber = async { getFollowersNumber() }
Details(userName.await(), followersNumber.await())
}

fun main() = runBlocking<Unit> {
val details = try {
getUserDetails()
} catch (e: ApiException) {
null
}
val tweets = async { getTweets() }
println("User: $details")
println("Tweets: ${tweets.await()}")
}
// User: null
// Tweets: [Tweet(text=Hello, world)]

这使得当我们只需要在一个挂起函数中启动多个并发的调用时, ​​coroutineScope​​ 在大多数情况下都是一个完美的候选。

suspend fun getUserProfile(): UserProfileData =
coroutineScope {
val user = async { getUserData() }
val notifications = async { getNotifications() }
UserProfileData(
user = user.await(),
notifications = notifications.await(),
)
}

正如我们已经提到的, ​​coroutineScope​​​ 现在经常被用来包装一个挂起函数的主体。你可以把它看出是 ​​runBlocking​​ 函数的现成替代品。

suspend fun main(): Unit = coroutineScope {
launch {
delay(1000)
println("World")
}
println("Hello, ")
}
// Hello
// (1 sec)
// World

函数 ​​coroutineScope​​​ 在挂起上下文之外创建了一个额外的作用域。它从父节点继承作用域,并支持结构化并发性。
为了说明这一点,下面的两个函数实际上没有区别,除了第一个函数是依次调用 ​​​getProfile​​​ 和 ​​getFriends​​,而第二个函数是同时调用它们。

suspend fun produceCurrentUserSeq(): User {
val profile = repo.getProfile()
val friends = repo.getFriends()
return User(profile, friends)
}
suspend fun produceCurrentUserSym(): User = coroutineScope {
val profile = async { repo.getProfile() }
val friends = async { repo.getFriends() }
User(profile.await(), friends.await())
}

​coroutineScope​​ 是一个有用的协程作用域函数,但它不是唯一的。

协程作用域函数

还有更多创建作用域的函数,它们的行为与 ​​coroutineScope​​​ 类似, ​​supervisorScope​​​ 类似于 ​​coroutineScope​​​,但它使用的是 ​​SupervisorJob​​​ 而不是 ​​Job​​​。 ​​withContext​​​ 是一个可以修改协程上下文的 ​​coroutineScope​​​。​​withTimeout​​​ 是一个带有超时设置的 ​​coroutineScope​​​。这些函数将在本章的以下部分更好的解释。现在,我只想让你们知道这这组函数如此的类似,那么它们应该有一个名字,我们该如何命名这个函数组呢?有些人称之为“scoping fuction”,但我觉得这会令人感到困惑,因为我不确定“scoping”是什么意思。我想命名的人只是想它不同于 “scope function”(作用域函数,如 ​​let​​​、​​with​​​、​​apply​​ 等),这并没有真正帮助,因为这两个术语经常被混淆。这就是为什么我决定使用“Coroutine scope fuction”(协程作用域函数),它名字更长但是应该会引起更少的误解,我后面发现它更正确,请你想一想:协程作用域函数是那些用于在挂起函数中创建协程作用域的函数。

另一方面,协程作用域函数经常与协程构建器混淆,这是不正确的,因为它们在概念和实践上都非常不同。为了证明这一点,下表给出了它们之间的比较:

协程构建器(除了 ​​runBlocking​​)

协程作用域函数

​launch​​​、​​async​​​、​​produce​

​coroutineScope​​​、​​supervisorScope​​​、​​withContext​​​、​​withTimeout​

都是 ​​CoroutineScope​​ 的扩展函数

都是挂起函数

携带来自 ​​CoroutineScope​​ 的协程上下文

携带挂起函数 ​​continuation​​ 的协程上下文

异常通过 ​​Job​​ 传递给父协程

异常的抛出方式与常规函数抛出的异常的方式相同

启动一个异步协程(可能不会马上执行)

就地启动一个协程

再来想想 ​​runBlocking​​​,你可能会注意到,它看起来和协程作用域函数有一些共同点,而不像构建器那样。 ​​runBlocking​​​ 也会就地调用它的主体并返回结果。最大的区别是 ​​runBlocking​​ 必须位于协程层次结构的根部,而协程作用域函数必须位于中间位置。

withContext

​withContext​​​ 函数类似于 ​​coroutineScope​​​,但它还允许对协程作用域进行一些更改。作为参数提供给这个函数的上下文将会覆盖来自父作用域的上下文(与协程构建器中的方法相同)。这意味着 ​​withContext(EmptyCooutineContext)​​​ 和 ​​coroutineScope​​ 的行为完全相同。

fun CoroutineScope.log(text: String) {
val name = this.coroutineContext[CoroutineName]?.name
println("[$name] $text")
}

fun main() = runBlocking(CoroutineName("Parent")) {
log("Before")

withContext(CoroutineName("Child 1")) {
delay(1000)
log("Hello 1")
}
withContext(CoroutineName("Child 2")) {
delay(1000)
log("Hello 2")
}

log("After")
}
// [Parent] Before
// (1 sec)
// [Child 1] Hello 1
// (1 sec)
// [Child 2] Hello 2
// [Parent] After

函数 ​​withContext​​​ 通常用于为部分代码设置不同的协程作用域。通常,你应该将它与 ​​dispatchers​​ 一起使用,这将在下一章节中描述。

launch(Dispatchers.Main) {
view.showProgressBar()
withContext(Dispatchers.IO) {
fileRepository.saveData(data)
}
view.hideProgressBar()
}

supervisorScope

​supervisorScope​​​ 函数的行为也很像 ​​coroutineScope​​​:它创建了一个 ​​CoroutineScope​​​,从外部作用域继承而来,并在其中调用指定的挂起块。不同的是,它用 ​​SupervisorJob​​​ 重写了来自上下文的 ​​Job​​,所以当子协程发生异常时,它不会被取消。

fun main() = runBlocking {
println("Before")
supervisorScope {
launch {
delay(1000)
throw Error()
}
launch {
delay(2000)
println("Done")
}
}
println("After")
}
// Before
// (1 sec)
// Exception...
// (1 sec)
// Done
// After

​supervisScope​​ 主要用于启动多个独立任务的函数。

suspend fun notifyAnalytics(actions: List<UserAction>) =
supervisorScope {
actions.forEach { action ->
launch {
notifyAnalytics(action)
}
}
}

如果使用 ​​async​​​,将其异常传播到父级是不够的。当调用 ​​await​​​ 时,​​async​​​ 协程将以一个异常结束,然后 ​​await​​​ 将重新抛出该异常。这就是为什么如果我们真的想忽略异常,我们还应该用 ​​try-catch​​​ 块来包装 ​​await​​ 调用。

class ArticlesRepositoryComposite(
private val articleRepositories: List<ArticleRepository>
) : ArticleRepository {
override suspend fun fetchArticles(): List<Article> = supervisorScope {
articleRepositories
.map { async { it.fetchArticles() } }
.mapNotNull {
try {
it.await()
} catch (e: Throwable) {
e.printStackTrace()
null
}
}
.flatten()
.sortedByDescending { it.publishedAt }
}
}

在我的知识星球中,我经常被问到是否可以使用 ​​withContext(SuperviseJob())​​​ 来代替 ​​supervisorScope​​​。答案是不,我们不能这样做。当我们使用 ​​withContext(SupervisorJob())​​​ 时,​​withContext​​​ 仍然使用一个常规的 ​​Job​​​,只是 ​​SupervisorJob​​​ 是它的父级 job。因此当一个子协程发生异常时,其它的子协程也会被取消。​​withContext​​​ 也会抛出异常,因此它的 ​​SuperviseJob​​​ 是无用的。这就是为什么我发现 ​​withContext(SuperviseJob())​​ 毫无意义且具有误导性,我认为这是一个糟糕的实践。

fun main() = runBlocking {
println("Before")
withContext(SupervisorJob()) {
launch {
delay(1000)
throw Error()
}
launch {
delay(2000)
println("Done")
}
}
println("After")
}
// Before
// (1 sec)
// Exception...

withTimeout

另一个类似 ​​coroutineScope​​​ 的函数是 ​​withTimeout​​​。它创建一个作用域并返回一个值。实际上,​​withTimeout​​​ 基本和 ​​coroutineScope​​​ 一样。不同的是, ​​withTimeout​​​ 为其函数体额外的设置了时间限制。如果花费的时间太长,则它取消该主体并抛出 ​​TimeoutCancellationException​​​(​​CancellationException​​ 的子类型)。

suspend fun test(): Int = withTimeout(1500) {
delay(1000)
println("Still thinking")
delay(1000)
println("Done!")
42
}

suspend fun main(): Unit = coroutineScope {
try {
test()
} catch (e: TimeoutCancellationException) {
println("Cancelled")
}
// `test` 函数体被取消
}
// (1 sec)
// Still thinking
// (0.5 sec)
// Cancelled

函数 ​​withTimeout​​​ 对于测试特别有用,它可以用来测试某个函数花费的时间是多还是少。如果在 ​​runTest​​​ 中使用它, 它将在虚拟的时间上运行。我们还在 ​​runBlocking​​​ 中使用它来限制某些函数的执行时间(这就像在 ​​@Test​​ 上设置超时一样)。

class Test {
@Test
fun testTime2() = runTest {
withTimeout(1000) {
// 有时候应该执行少于 1s
delay(900) // 虚拟时间
}
}

@Test(expected = TimeoutCancellationException::class)
fun testTime1() = runTest {
withTimeout(1000) {
// 有时候应该执行多于1s
delay(1100) // 虚拟时间
}
}

@Test
fun testTime3() = runBlocking {
withTimeout(1000) {
// 正常用例
delay(900) // 真实等待 900ms
}
}
}

注意, ​​withTimeout​​​ 会抛出 ​​TimeoutCancellationException​​​ 异常,它是 ​​CancellationException​​ 的子类(与协程取消时抛出的异常相同)。因此,当这个异常在协程构建器中被抛出时,协程只会取消它,而不会影响它的父协程(如前一章所讲的)。

suspend fun main(): Unit = coroutineScope {
launch { // 1
launch { // 2, 被其父协程取消
delay(2000)
println("Will not be printed")
}
withTimeout(1000) { // 我们取消了 launch
delay(1500)
}
}

launch { // 3
delay(2000)
println("Done")
}
}
// (2 sec)
// Done

在上面的例子中, ​​delay(1500)​​​ 花费的时间比 ​​withTimeout(1000)​​​ 预期的时间要长,所以它会抛出 ​​TimeoutCancellationException​​​。注释1处的 ​​launch​​ 会捕获到这个异常,然后它会取消自身和其子协程,也就是注释2处的协程。注释3处启动的协程并不会受到影响。

​withTimeout​​​ 一个不那么粗暴的变体是 ​​withTimeoutOrNull​​​,它不会抛出异常。如果超过了设置的时间,它就会取消函数体,并返回null。我发现 ​​withTimeoutOrNull​​ 对于包装那些如果出现异常则要花费较长时间的函数很有用,例如网络操作:如果我们等待一个回包超过了5s,那么我们之后不太可能会收到它了(有些库可能会永远等待)。

suspend fun fetchUser(): User {
// 永远运行下去
while (true) {
yield()
}
}

suspend fun getUserOrNull(): User? =
withTimeoutOrNull(5000) {
fetchUser()
}

suspend fun main(): Unit = coroutineScope {
val user = getUserOrNull()
println("User: $user")
}
// (5 sec)
// User: null

连接协程作用域函数

如果同时需要同时使用来自两个协程作用域函数的功能,则需要在一个协程作用域函数中使用另一个。例如,要设置超时和调度器,可以在 ​​withContext​​​ 内部使用 ​​withTimeoutOrNull​​:

suspend fun calculateAnswerOrNull(): User? =
withContext(Dispatchers.Default) {
withTimeoutOrNull(1000) {
calculateAnswer()
}
}

额外的操作

假设有这样一个情况,在某个处理过程中,你需要执行一个额外的操作。例如,在显示用户配置文件之后,你希望发送一个用于分析的请求。人们通常会在相同的作用域内继续操作:

class ShowUserDataUseCase(
private val repo: UserDataRepository,
private val view: UserDataView,
) {
suspend fun showUserData() = coroutineScope {
val name = async { repo.getName() }
val friends = async { repo.getFriends() }
val profile = async { repo.getProfile() }

val user = User(
name = name.await(),
friends = friends.await(),
profile = profile.await()
)
view.show(user)
launch { repo.notifyProfileShown() }
}
}

然而,这种方法存在一些问题。首先,这个 ​​launch​​​ 在这里会阻塞流程,因为 ​​coroutineScope​​​ 需要等待它的完成。因此,如果在更新视图时显示进度条,则用户需要等待直到 ​​notifyProfileShown()​​ 完成,这没有多大意义。

fun onCreate() {
viewModelScope.launch {
_progressBar.value = true
showUserData()
_progressBar.value = false
}
}

第二个问题是取消。协程被设计为(默认情况下)在出现异常时取消其他操作。这对于必要的操作非常重要。如果 ​​getProfile​​​ 有异常,我们应该取消 ​​getName​​​ 和 ​​getFriends​​​,因为它们的响应无论如何最终都是无用的。而另一面,仅仅是因为 ​​notifyProfileShown()​​ 调用失败而取消整个协程也并没有多大意义。

那么我们应该怎么做呢? 当你有一个不应该影响主流程的旁路(非必要)操作时,最好放在一个单独的作用域上启动它。创建自己的作用域很容易,在这个例子中,我们创建了 ​​analyticsScope​​。

val analyticsScope = CoroutineScope(SupervisorJob())

对于单元测试和控制此作用域,最好通过构造函数注入:

class ShowUserDataUseCase(
private val repo: UserDataRepository,
private val view: UserDataView,
private val analyticsScope: CoroutineScope,
) {
suspend fun showUserData() = coroutineScope {
val name = async { repo.getName() }
val friends = async { repo.getFriends() }
val profile = async { repo.getProfile() }
val user = User(
name = name.await(),
friends = friends.await(),
profile = profile.await()
)
view.show(user)
analyticsScope.launch { repo.notifyProfileShown() }
}
}

在注入的作用域上启动操作是很常见的。传递作用域可以清楚的表明可以单独的启动协程。这意味着挂起函数可能不会等待它启动的所有操作。如果没有传递作用域,则会预料到挂起函数直到所有操作完成之前不会结束。

总结

协程作用域非常有用,尤其是因为它们可以用于任何挂起函数。它们通常用于包装整个函数体。尽管它们通常用于一个作用域包装一堆调用(特别是 ​​withContext​​),但我希望你能欣赏它们的用处。它们是 Kotlin 协程生态中非常重要的一部分。你将在本书的其余部分看到我们将如何使用它们。


举报

相关推荐

0 条评论