在业务场景中经常会碰到,需要并发执行一些任务,但是又要控制并发的数量,以免并发协程过多导致资源瓶颈的问题,因此我们可以基于Channel实现协程池,既可以并发执行任务,又可以控制并发的数量,一举两得。
实现原理
1.创建一个缓冲区channel,缓冲区的大小,即为协程池最大并发连接数的大小
//创建一个最大并发连接数为10的协程池
c := make(chan struct{}, 10)
2. 每创建一个协程,往channel中写入一个空结构体,相当于协程池加1,当缓冲区达到最大值时,进程会产生阻塞,直到缓冲区队列产生空闲位置,利用这个机制,达到限制并发连接数的目的
c <- struct{}{}
3.每个协程结束前,从channel中取出一个空结构体,相当于协程池减1,为协程池添加新的协程创造空间
<-c
实例展示
创建一个最大并发连接数为10的协程池
package main
func main(){
//创建一个最大并发连接数为10的协程池
c := make(chan struct{}, 10)
defer close(c)
//并发执行任务
for _, task := range tasks {
// 每创建一个协程,往channel中写入一个空结构体,相当于协程池加1
c <- struct{}{}
//开启协程执行任务
go Action(c)
}
return
}
// Action 协程中的操作
func Action(c chan struct{}) {
/**
具体的功能实现
**/
// 每个协程结束前,从channel中取出一个空结构体,相当于协程池减1
<-c
return
}
上面的代码是没有并发等待的,也就是说,如果主协程结束,那么协程池将会被关闭,可能会出现部分协程无法完成的情况,因此我们可以对代码进行如下改造,让主协程等待所有并发协程执行结束后,再继续执行
package main
import "sync"
func main(){
//创建一个最大并发连接数为10的协程池
c := make(chan struct{}, 10)
defer close(c)
//创建并发控制
wg := sync.WaitGroup{}
wg.Add(任务数量)
//并发执行任务
for _, task := range tasks {
// 每创建一个协程,往channel中写入一个空结构体,相当于协程池加1
c <- struct{}{}
go Action(c)
}
//并发等待
wg.Wait()
return
}
// Action 协程中的操作
func Action(c chan struct{},wg *sync.WaitGroup) {
/**
具体的功能实现
**/
// 每个协程结束前,从channel中取出一个空结构体,相当于协程池减1
<-c
// 每个协程结束前,使并发控制队列减1
wg.Done()
return
}
利用WaitGroup实现并发等待,这样主线程等待所有goroutine都运行完毕后才结束,当然,在实际的应用场景中,有些情况可能主协程不需要等待并发协程的结束(如一个创建接口,主协程只需要创建一个子协程用于负责创建协程池,并创建并发任务,自己完成后直接返回,而自己创建的协程并不会中断,而是会交给main进行托管),则代码中不需要使用WaitGroup实现并发等待