sync pool使用来存放临时变量的一个缓冲区,但是这个缓冲区并不可靠,每次gc的时候,都会首先清除缓冲区,所以,假如一个slice仅仅存放在 Pool 中,而没有其他地方引用,则会被当成垃圾清理掉。
概念
图示
sync.pool的结构组成如上图所示,在这里可能会有两个问题
- 我们实例化 Sync.Pool的时候,为什么实例化了一个LocalPool数组,怎么确定我的数据应该存储在LocalPool数组的哪个单元?
- PoolLocalInternal 里面的成员有private和shared,为什么要做这两种区分?
源码分析
Put
Put()
func (p *Pool) Put(x interface{}) { if x == nil { return } // race检测 先忽略这一块 if race.Enabled { if fastrand()%4 == 0 { // Randomly drop x on floor. return } race.ReleaseMerge(poolRaceAddr(x)) race.Disable() } // 根据自身的goroutine的id,获取对应的PoolLocal的地址,后面具体分析 l := p.pin() // 如果private字段为空的话,首先给private字段赋值 if l.private == nil { l.private = x x = nil } runtime_procUnpin() // 如果private字段, 则添加到shared字段,因为 shared字段可以被其他goroutine获取,所以这里需要加锁 if x != nil { l.Lock() l.shared = append(l.shared, x) l.Unlock() } if race.Enabled { race.Enable() } }
pin()
func (p *Pool) pin() *poolLocal { // 获取当前的Pid/P,数量由 pid := runtime_procPin() // LocalPool的数量 s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume // 如果获取到的pid比LocalPool数组的长度小,返回对应的LocalPool if uintptr(pid) < s { return indexLocal(l, pid) } // 如果pid比LocalPool数组的长度大,进一步确认,这个函数后面讨论 return p.pinSlow() }
runtime_procPin()
这个是获取当前运行的pid,具体实现没有查到, 但是runtime_procPin()
返回的数值范围是由 runtime.GOMAXPROCS(0)
决定的。网上有篇文章可以参考一下《Golang 的 协程调度机制 与 GOMAXPROCS 性能调优》,这里暂不深入
PinSlow()
func (p *Pool) pinSlow() *poolLocal { // Retry under the mutex. // Can not lock the mutex while pinned. runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() // poolCleanup won't be called while we are pinned. // 再次检查是否LocalPool是否有对应的索引,避免其他的线程造成影响 s := p.localSize l := p.local if uintptr(pid) < s { return indexLocal(l, pid) } // 如果local为nil,说明是新构建的Pool结构体,加紧allPools slice里面 if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. // 重新获取 GOMAXPROCS,并根据这个设置PoolLocal的大小 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release // 找到当前goroutine对应的地址,并返回 return &local[pid] }
Put 逻辑
综上,Put的基本操作逻辑就是
- 获取当前执行的
Pid
- 根据Pid,找到对应的
PoolLocal
,接着使用里面PoolLocalInternal
- 优先存入
PoolLocalInternal
的private
属性,其次粗如PoolLocalInternal
的shared
这个slice里面
Get
Get()
func (p *Pool) Get() interface{} { if race.Enabled { race.Disable() } // 获取到LocalPool l := p.pin() // 把private数据值拷贝一份,然后把private设置为nil,因为如果private有数据,把private数据返回后,要把private设置为nil,如果private没有数据,则原先就是nil,添加这一步也没有关系 x := l.private l.private = nil runtime_procUnpin() // 如果private里面没有数据,则从shared里面去找 if x == nil { l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] } l.Unlock() // 如果当前线程下对应的LocalPool,没有数据,则调用getSlow(),从其他的LocalPool的shared里面获取数据,后面解析 getSlow if x == nil { x = p.getSlow() } } if race.Enabled { race.Enable() if x != nil { race.Acquire(poolRaceAddr(x)) } } // 如果 从private shared及其他的LocalPool的shared里面都获取不到数据,且注册的New函数不为空,则执行注册的New函数 if x == nil && p.New != nil { x = p.New() } return x }
getSlow()
func (p *Pool) getSlow() (x interface{}) { // See the comment in pin regarding ordering of the loads. // 获取LocalPool的size size := atomic.LoadUintptr(&p.localSize) // load-acquire local := p.local // load-consume // Try to steal one element from other procs. pid := runtime_procPin() runtime_procUnpin() // 便利LocalPool,获取shared里面的数据,找到就返回 for i := 0; i < int(size); i++ { l := indexLocal(local, (pid+i+1)%int(size)) l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] l.Unlock() break } l.Unlock() } return x }
通过上面的逻辑可以看出,shared
里面的数据是会被其他的P检索到的,而 private
里面的数据是不会的,所以在获取shared
里面数据的时候,需要加锁
poolCleanup
这个函数是Pool包里面提供的,用来清理Pool的,但是官方的实现略显粗暴
func poolCleanup() { // This function is called with the world stopped, at the beginning of a garbage collection. // It must not allocate and probably should not call any runtime functions. // Defensively zero out everything, 2 reasons: // 1. To prevent false retention of whole Pools. // 2. If GC happens while a goroutine works with l.shared in Put/Get, // it will retain whole Pool. So next cycle memory consumption would be doubled. // 便利所有的Sync.Pool for i, p := range allPools { allPools[i] = nil // 遍历Pool里面的LocalPool,并清空里面的数据 for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local, i) l.private = nil for j := range l.shared { l.shared[j] = nil } l.shared = nil } p.local = nil p.localSize = 0 } // 清空allPools allPools = []*Pool{} }
这个函数会在GC之前调用,这也就解释了官方的下面一句话
如果一个数据仅仅在Pool中有引用,那么就需要担心这个数据被GC清理掉
问题分析
针对于上面提出的两个问题,做一下简单的分析
这里的LocalPool是根据不同的pid来区分的,保证private数据的线程安全,程序运行的时候可以获取到pid,然后使用pid作为LocalPool的索引,找到对应的地址即可
private
是 P 专属的, shared
是可以被其他的P获取到的
参考文档
《sync.Pool源码实现》
《Go 语言学习笔记 - 雨痕》
转自:https://segmentfault.com/a/1190000019973632