Go 的并发模型以 CSP(Communicating Sequential Processes)为基础,主要通过 goroutine、channel 以及各种同步工具来进行并发控制。
Goroutine
每个 goroutine 是由 Go 运行时管理的,使用 go 关键字来启动一个新的 goroutine。Goroutine 运行在相同的地址空间,因此需要同步访问共享资源。
Channel
通过 channel,goroutine 可以安全地共享数据,不需要显式的锁机制。Channel 提供了一种基于消息传递的并发控制方式。
无缓冲 channel: 发送和接收操作会阻塞,直到另一端准备好。
有缓冲 channel: 允许一定数量的消息在发送端和接收端之间排队。
ch := make(chan int) // 无缓冲
ch := make(chan int, 2) // 有缓冲
sync.Mutex
sync.Mutex 是互斥锁,用于保护共享资源的并发访问。通过 Lock() 和 Unlock() 控制对共享数据的独占访问。
package main
import (
"fmt"
"sync"
)
var (
counter int
mu sync.Mutex
)
func increment() {
mu.Lock() // 加锁
defer mu.Unlock() // 确保操作后解锁
counter++
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
increment()
}()
}
wg.Wait()
fmt.Println("Final Counter:", counter)
}
sync.RWMutex
sync.RWMutex 是读写锁,允许多个读取操作并行执行,但写操作必须独占。
sync.WaitGroup
sync.WaitGroup 用于等待一组 goroutine 完成。它的主要用途是确保某些 goroutine 执行结束后再进行下一步操作。与 sync.Cond 不同的是,WaitGroup 不涉及条件,只是等待一组任务完成。
主要方法:
Add(delta int): 增加等待的 goroutine 数量。
Done(): 每个 goroutine 完成任务后调用 Done(),通知 WaitGroup 任务结束。
Wait(): 阻塞直到所有的 goroutine 都调用了 Done(),即任务完成。
sync.Cond
sync.Cond 是一个用于条件变量的机制。它允许 goroutine 在某个条件满足时被唤醒,从而继续执行。主要用于协调多个 goroutine 之间的状态变化。
主要方法:
Wait(): 等待某个条件,直到被其他 goroutine 用 Signal() 或 Broadcast() 唤醒。
Signal(): 唤醒一个正在等待的 goroutine。
Broadcast(): 唤醒所有等待的 goroutine。
需要搭配锁(Mutex)使用: sync.Cond 通常需要与锁(sync.Mutex 或 sync.RWMutex)一起使用,以确保条件检查和等待的原子性。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
ready := false
// 启动等待 goroutine
go func() {
mu.Lock()
for !ready {
cond.Wait()
}
fmt.Println("Goroutine 被唤醒,继续执行")
mu.Unlock()
}()
// 模拟一些操作后,改变状态并唤醒 goroutine
time.Sleep(time.Second)
mu.Lock()
ready = true
cond.Signal() // 唤醒一个等待的 goroutine
mu.Unlock()
}
sync.Once
sync.Once 确保某段代码只执行一次,通常用于初始化操作。
context.Context
用于在多个 goroutine 之间传递取消信号、超时控制和截止时间。它是 Go 进行并发控制的一种高级机制,特别适用于网络请求、I/O 操作等需要取消或超时控制的场景。
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case <-time.After(3 * time.Second):
case <-ctx.Done():
fmt.Println("Task canceled")
}
}