0
点赞
收藏
分享

微信扫一扫

【Go 快速入门】协程 | 通道 | select 多路复用 | sync 包

无愠色 03-02 07:00 阅读 5

文章目录

项目代码地址:05-GoroutineChannelSync

前言

Go 1.22 版本于不久前推出,更新的新特性可以参考官文。从此篇章开始,后续 go 版本更为 1.22.0 及以上,自行官网下载。

协程

常见的并发模型

  • 线程与锁模型
  • Actor 模型
  • CSP 模型
  • Fork 与 Join 模型

Go 语言天生支持并发,主要通过基于通信顺序过程(Communicating Sequential Processes, CSP)的 goroutine 和通道 channel 实现,同时也支持传统的多线程共享内存的并发方式。

goroutine 会以一个很小的栈开始其生命周期,一般只需要 2 KB。goroutine 由 Go 运行时(runtime)调度,Go 运行时会智能地将 m 个 goroutine 合理的分配给 n 个操作系统线程,实现类似 m:n 的调度机制,不再需要开发者在代码层面维护线程池。

goroutine 调度

操作系统线程的调度:操作系统线程在被内核调度时挂起当前执行的线程,并将它的寄存器内容保存到内存中,然后选出下一次要执行的线程,并从内存中恢复该线程的寄存器信息,恢复现场并执行该线程,这样就完成一次完整的线程上下文切换。

goroutine 调度:区别于操作系统线程的调度,goroutine 调度在 Go 语言运行时层面实现,完全由 Go 语言本身实现,按照一定规则将所有的 goroutine 调度到操作系统线程上执行。

goroutine 调度器采用 GPM 调度模型,如下所示:

在这里插入图片描述

  • G:表示 goroutine,每执行一次go f()就创建一个 G,包含要执行的函数和上下文信息。

  • 全局队列(Global Queue):存放等待运行的 G。

  • P:表示 goroutine 执行所需的资源,最多有 GOMAXPROCS 个。GOMAXPROCS 默认 CPU 核心数,指定需要使用多少个操作系统线程来同时执行代码。

  • P 的本地队列:同全局队列类似,存放的也是等待运行的G,存的数量有限,不超过256个。新建 G 时,G 优先加入到 P 的本地队列,如果本地队列满了会批量移动部分 G 到全局队列。

  • M:线程想运行任务就得获取 P,从 P 的本地队列获取 G,当 P 的本地队列为空时,M 也会尝试从全局队列或其他 P 的本地队列获取 G。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。

  • Goroutine 调度器和操作系统调度器是通过 M 结合起来的,每个 M 都代表了1个内核线程,操作系统调度器负责把内核线程分配到 CPU 的核上执行。

参考:https://www.liwenzhou.com/posts/Go/concurrence/


使用 goroutine

启动 goroutine 只需要在函数前加 go 关键字:

func f(msg string) {
	for i := 0; i < 3; i++ {
		fmt.Println(msg, ":", i)
	}
}

func functino01() {
	go f("goroutine")

	go func(msg string) {
		fmt.Println(msg)
	}("going")

	time.Sleep(time.Second)
	fmt.Println("done")
}
going
goroutine : 0
goroutine : 1
goroutine : 2
done

使用 time.Sleep 等待协程 goroutine 的运行不优雅,同时也不够精确,后续会采用 sync 包提供的常用并发原语,对协程的运行状态进行控制。

在 go 1.22.0 版本后,如下使用可正常在协程闭包函数中捕获外部的变量,而不是每个 loop 仅一份变量了。

参考:https://zhuanlan.zhihu.com/p/674158675

func function05() {
	for i := 0; i < 5; i++ {
		go func() {
			fmt.Println(i)
		}() // 正常输出 0~4 中的数字,而不是全是 4
	}
	time.Sleep(time.Second)
}

通道

通道 channel 是一种特殊类型,遵循先入先出(FIFO)的特性,用于 goroutine 之间的同步、通信。

声明 channel 语法如下:

chan T 		// 双向通道
chan <- T  	// 只能发送的通道
<- chan T	// 只能接收的通道

channel 是一个引用类型,在被初始化前值为 nil,需要使用 make 函数进行初始化。缓冲区大小可选:

  • 有缓冲通道:make(chan T, capacity int)
  • 无缓冲通道:make(chan T)make(chan T, 0)

通道共有三种操作,发送、接受、关闭:

  • 定义通道
ch := make(chan int)
  • 发送一个值到通道中
ch <- 10
  • 从通道中接收值
v := <- ch 		// 从 ch 接收值赋给 v
v, ok := <- ch 	// 多返回值,ok 表示通道是否被关闭
<- ch			// 从 ch 接收值,忽略结果
  • 关闭通道
close(ch)

tips

  • 对一个关闭的通道发送值会导致 panic
  • 对一个关闭的通道一直获取值会直到通道为空
  • 重复关闭通道会 panic
  • 通道值可以被垃圾回收
  • 对一个关闭并且没值的通道接收值,会获取对应类型零值

无缓冲通道

又称阻塞通道,同步通道。

无缓冲通道必须至少有一个接收方才能发送成功,即发送操作会阻塞,直到另一个 goroutine 在该通道上接收。相反,接收操作先执行,也会阻塞至有 goroutine 往通道发送数据。

发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输。

func function02() {
	ch := make(chan int, 0)
	go func() {
		time.Sleep(time.Second)
		ch <- 1
	}()
	v := <-ch
	fmt.Println(v)
}
func function03() {
	ch := make(chan int)
	go func() {
		v := <-ch
		fmt.Println(v)
	}()
	time.Sleep(time.Second)
	ch <- 1
	time.Sleep(time.Second)
}

有缓冲通道

又称异步通道

有缓冲通道可以通过 cap 获取通道容量,len 获取通道内元素数量。如果通道元素数量达到上限,那么继续往通道发送数据也会被阻塞,直至有 goroutine 从通道获取数据。

通常选择使用 for range 循环从通道中接收值,当通道被关闭后,通道内所有值被接收完毕后会自动退出循环。

func function04() {
	ch := make(chan int, 2)
	fmt.Println(len(ch), cap(ch)) // 0 2
	ch <- 1
	ch <- 2
	go func() {
		for v := range ch {
			fmt.Println(v)
		}
	}() // 1 2 3
	ch <- 3
	time.Sleep(time.Second)
}
  • 多返回值模式

基本格式:value, ok := <- ch

ok :如果为 false 表示 value 为无效值(通道关闭后的默认零值);如果为 true 表示 value 为通道中的实际数据值。

func function06() {
	ch := make(chan int, 1)
	ch <- 1
	close(ch)
	go func() {
		for {
			if v, ok := <-ch; ok {
				fmt.Println(v)
			} else {
				break
			}
		}
	}()
	time.Sleep(time.Second)
}

单向通道

通常会在函数参数中限制通道只能用于接收或发送。控制通道在函数中只读或只写,提升程序的类型安全。

// Producer 生产者
func Producer() <-chan int {
	ch := make(chan int, 1)
	go func() {
		for i := 0; i < 3; i++ {
			ch <- i
		}
		close(ch) // 任务完成关闭通道
	}()
	return ch
}

// Consumer 消费者
func Consumer(ch <-chan int) int {
	sum := 0
	for v := range ch {
		sum += v
	}
	return sum
}

func function07() {
	ch := Producer()
	sum := Consumer(ch)
	fmt.Println(sum) // 3
}

在函数传参及赋值过程中,全向通道可以转为单向通道,但单向通道不可转为全向通道。

func function08() {
	ch := make(chan int, 1)

	go func(ch chan<- int) {
		for i := 0; i < 2; i++ {
			ch <- i
		}
		close(ch)
	}(ch)

	for v := range ch {
		fmt.Println(v)
	} // 0 1
}

select 多路复用

在从多个通道获取数据的场景下, 需要使用 select 选择器,使用方式类似于 switch 语句,有一系列的 case 分支和一个默认分支。

基本格式:

select {
case <- ch1:
	...
case data := <- ch2:
	...
case ch3 <- 3:
	...
default:
	...
}

select 会一直等待,直到其中某个 case 的通信操作完成,执行该 case 语句。

  • 可处理一个或多个 channel 的接收和发送
  • 如果多个 case 同时满足,select 随机选择一个执行
func function09() {
	now := time.Now()
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- "one"
	}()
	go func() {
		time.Sleep(2 * time.Second)
		ch2 <- "two"
	}()

	for i := 0; i < 2; i++ {
		select {
		case msg1 := <-ch1:
			fmt.Println(msg1)
		case msg2 := <-ch2:
			fmt.Println(msg2)
		}
	} // one two
	fmt.Println(time.Since(now)) // 2.0003655s
}

sync

在上述示例中,使用了大量的 time.Sleep 等待 goroutine 的结束。但还有更好的方式,使用内置的 sync 包管理协程的运行状态。

sync.WaitGroup

使用 wait group 等待多个协程完成,如果 WaitGroup 计数器恢复为 0,即所有协程的工作都完成:

var (
	x  int64
	wg sync.WaitGroup
)

func function10() {
	add := func() {
		defer wg.Done()

		for i := 0; i < 5000; i++ {
			x = x + 1
		}
	}
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

使用 go run -race main.go 可查看代码是否存在竞态问题,上述代码存在两个 goroutine 操作同一个资源,输出结果不定。

方法作用
WaitGroup.Add(delta)计数器值 +delta,建议在 goroutine 外部累加计数器
WaitGroup.Done()计数器值 -1
WaitGroup.Wait()阻塞代码,直到计数器值减为 0

注意:WaitGroup 对象不是一个引用类型,在通过函数传值的时候需要使用地址。


sync.Mutex

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 goroutine 可以访问共享资源。

方法作用
Mutex.Lock()获取互斥锁
Mutex.Unlock()释放互斥锁

使用互斥锁对代码修改如下:

var (
	x   int64
	wg  sync.WaitGroup
	mtx sync.Mutex
)

func function11() {
	add := func() {
		defer wg.Done()

		for i := 0; i < 5000; i++ {
			mtx.Lock()	// 修改数据前,加锁
			x = x + 1
			mtx.Unlock() // 修改完数据后,释放锁
		}
	}
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x) // 10000
}

sync.RWMutex

读写互斥锁,某些场景中读操作较为频繁,不涉及对数据的修改时,读写锁可能是更好的选择。

方法作用
RWMutex.Lock()获取写锁
RWMutex.Unlock()释放写锁
RWMutex.RLock()获取读锁
RWMutex.RUnlock()释放读锁

读写锁分为两种:读锁和写锁。当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。


sync.Once

在高并发场景下,可以使用 sync.Once,保证操作只执行一次。当且仅当第一次访问某个变量时,进行初始化。变量初始化过程中,所有读都被阻塞,直到初始化完成。

sync.Once 其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的,并且初始化操作也不会被执行多次。

sync.Once 仅提供了一个方法 Do,参数 f 是对象初始化函数。

  • func (o *Once) Do(f func())

单例模式:

type Singleton struct{}

var (
	instance *Singleton
	once     sync.Once
	wg       sync.WaitGroup
)

func GetInstance() *Singleton {
	once.Do(func() {
		instance = &Singleton{}
		fmt.Println("Get Instance")
	})
	return instance
}

func function12() {
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			_ = GetInstance()
		}()
	} // Get Instance
	wg.Wait()
}

程序只会输出一次 Get Instance,说明 sync.Once 是线程安全的,支持并发,仅会执行一次初始化数据的函数。


sync.Map

Go 内置的 map 不是并发安全的,下述代码多个 goroutine 对 map 操作会出现竞态问题,报错不能正常运行。

var (
	mp = make(map[string]interface{})
	wg sync.WaitGroup
)

func function13() {
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			key := strconv.Itoa(i)
			mp[key] = i
			fmt.Println(key, mp[key])
		}()
	}
	wg.Wait()
}

sync.Map 是并发安全版 map,不过操作数据不再是直接通过 [] 获取插入数据,而需要使用其提供的方法。

方法作用
Map.Store(key, value interface{})存储 key-value 数据
Map.Load(key interface{}) (value interface{}, ok bool)查询 key 对应的 value
Map.LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)查询 key 对应的 value,如果不存在则存储 key-value 数据
Map.LoadAndDelete(key interface{}) (value interface{}, loaded bool)查询并删除 key
Map.Delete(key interface{})删除 key
Map.Range(f func(key, value interface{}) bool)对 map 中的每个 key-value 依次调用 f

使用 sync.Map 修改上述代码,即可正确运行。

func function14() {
	m := sync.Map{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()

			key := strconv.Itoa(i)
			m.Store(key, i)

			v, ok := m.Load(key)
			if ok {
				fmt.Println(key, v)
			}
		}()
	}
	wg.Wait()
}

LoadOrStoreLoadAndDelete 示例代码:

// LoadOrStore、LoadAndDelete
func function15() {
	m := sync.Map{}
	//m.Store("cauchy", 19)
	v, ok := m.LoadOrStore("cauchy", 20)
	fmt.Println(v, ok) // 注释: 20 false;没注释: 19 true
	v, ok = m.Load("cauchy")
	fmt.Println(v, ok) // 注释: 20 true;没注释: 19 true

	v, ok = m.LoadAndDelete("cauchy")
	fmt.Println(v, ok) // 注释: 20 true;没注释: 19 true
	v, ok = m.Load("cauchy")
	fmt.Println(v, ok) // nil false
}

Range 示例代码:

Map.Range 可无序遍历 sync.Map 中的所有 key-value 键值对,如果返回 false 则终止迭代。

func function16() {
	m := sync.Map{}
	m.Store(3, 3)
	m.Store(2, 2)
	m.Store(1, 1)
	cnt := 0
	m.Range(func(key, value any) bool {
		cnt++
		fmt.Println(key, value)
		return true
	})
	fmt.Println(cnt) // 3
}
举报

相关推荐

0 条评论