您现在的位置是:首页 >技术教程 >go源码解读-sync.pool网站首页技术教程
go源码解读-sync.pool
go version 1.19.7
sync.pool 是go 内置的对象池技术, 管理临时对象,这些对象可以单独保存和检索, 减少GC次数
特点:1、 池不可以指定大小
2、 Get 没有的话会新生成一个对象
3、对象的周期取决于GC的周期
从go doc可以看到sync.pool 主要暴露Get 和 Put 两个方法, 以及一个New。
使用:用New初始化pool一个实例,获取的调用Get , 释放资源的时候调用Put
C:UsersJosslynn>go doc sync.pool
package sync // import "sync"
type Pool struct {
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() any
// Has unexported fields.
}
...... (描述忽略了)
func (p *Pool) Get() any
func (p *Pool) Put(x any)
使用实例:gin 中使用
// 初始化的时候, 调用New ,来初始化pool 实例, 这边的engine.allocateContext 是新建返回一个*Context 对象,
engine.pool.New = func() any {
return engine.allocateContext()
}
// put get 的时候
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// 从pool 中拿出一个context
c := engine.pool.Get().(*Context)
c.writermem.reset(w)
//把http 的req 放到这个context 中
c.Request = req
// 但是为了防止这个context 是没有被GC 过的, 所以需要reset(除request的内容, 其他内容清空
c.reset()
// context 处理过了, 可以交给下一步处理了()
engine.handleHTTPRequest(c)
// 当handle 处理完了, context 这个临时对象 就需要释放,
// 如果不调用put, GC 也会去释放这个对象,但是Get 的时候就一直是一个新的对象, 就没有使用pool的意义的了
engine.pool.Put(c)
}
结构
type Pool struct {
noCopy noCopy
// 本地的 Processor(p) 的指针,实际指向 []poolLocal,
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
// []poolLocal 的大小
localSize uintptr // size of the local array
// GC 时,victim 和 victimSize 会分别接管 local 和 localSize;
// victim 的目的是为了减少 GC 后冷启动导致的性能抖动,让分配对象更平滑;
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// 暴露外部使用
New func() any
}
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
private any // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
type poolChain struct {
head *poolChainElt
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
next, prev *poolChainElt
GET 流程
上图绿色标签 就是要获取的对象, 由上图的结构所示, 我们可以从这几个地方获取到元素
1、 new ,( 这个应该放在最后, pool 没有获取到对象的时候, 用new)
2、从当前p 中的private 中获取
3、从当前的 p 的share 中获取
4、从其他p 的share 中获取, (为啥不能从其他p 的private 中获取, private 私有的, 只能自己调用)
5、在gc 的时候, victim 接管了 local , 所以还可以从victim 中的cache中获取
6、从其他p 的cache 获取
有了流程图, 可以初步知道整个get的流程, 然后看代码
func (p *Pool) Get() any {
// 这边不深入了, 只要注意, poo 不能复制, 不然会有问题
if race.Enabled {
race.Disable()
}
// p.pin 会返回一个poolLocal和pid , ( 是GMP模型中 每个goroutine 绑定一个 p 才能执行), 这边的p.pin 就是返回当前goroutine 绑定的p,
// pin() 中对当前p 加了锁
l, pid := p.pin()
// 从private 中获取元素, 是不需要加锁的,
// 把p中的private 拿到之后清空该字段,我们要GET一个对象, 这个对象现在在goroutine中并且是私有的
x := l.private
l.private = nil
// 如果x 为空, 就是可能其他goroutine 已经把这个值取走了, 同一个对象x, p队里下的 g1 、g2 都来获取, g1获取到x, 并把这个值置为空, 那g2 来取的时候,就没有了
if x == nil {
// private 中没有对象,查看本地p.share 中是否有
// 当前p 调自己的shared 用popHead 从头部获取
x, _ = l.shared.popHead()
if x == nil {
// 本地p.share 尝试从 其他p 的share, 或者尝试从 victim cache 里取元素
x = p.getSlow(pid)
}
}
// G-M 锁定解除;上锁的逻辑在 pin() 中
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
//pool 池中没有 , 找不到复用的对象了, 就NEW 一个返回去
if x == nil && p.New != nil {
x = p.New()
}
//返回对象
return x
}
pprivate 中没有, p.share 中也没有, 去其他p 以及victim 中看
补充:
所谓受害者缓存(Victim Cache),是一个与直接匹配或低相联缓存并用的、容量很小的全相联缓存。当一个数据块被逐出缓存时,并不直接丢弃,而是暂先进入受害者缓存。如果受害者缓存已满,就替换掉其中一项。当进行缓存标签匹配时,在与索引指向标签匹配的同时,并行查看受害者缓存,如果在受害者缓存发现匹配,就将其此数据块与缓存中的不匹配数据块做交换,同时返回给处理器。 – 维基百科
pool 清空机制 看最后的poolCleanup
func (p *Pool) getSlow(pid int) any {
// 首先重新获取pool的localSize大小并且是原子操作
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 遍历size 的大小, 找到别的p
for i := 0; i < int(size); i++ {
// 尝试从其他p 关联的 的poollocal 中取一个
// popTail 是双向链表的尾部开始偷一个p
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 从其他p 中, 没有获取到p , 尝试从victim cache 中获取
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
// gc 中的p.victim private 有这个值, 就返回
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
// 从其他p 的victim cache 中获取
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
//首次 Pool 需要把自己注册进 allPools 数组;
// Pool.local 数组按照 runtime.GOMAXPROCS(0) 的大小进行分配,如果是默认的,那么这个就是 P 的个数,也就是 CPU 的个数;
func (p *Pool) pinSlow() (*poolLocal, int) {
// G-M 先解锁
runtime_procUnpin()
// 以下逻辑在全局锁 allPoolsMu 内 , 所有的对象池都在这个里面
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 重新获取, 因为刚刚解锁了, p 很有可能被抢占了 , 获取P 的 id
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
// 判断一下,
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 如果指针是空, 说名第一次get ,
if p.local == nil {
// 首次,Pool 需要把自己注册进 allPools 数组
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
// local 数组的大小就等于 runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
// 把local数组 的首地址, 放入 p 的local 中
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
// 返回pid 的poolLocal
return &local[pid], pid
}
func (c *poolChain) popTail() (any, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
// 从双端队列尾部出队
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
return nil, false
}
// 从链表中删除当前节点
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
func (d *poolDequeue) popTail() (any, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// We now own slot.
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
return val, true
}
PUT 流程
根据结构图中绿色的部分, 现在需要把对象放入池中, 可以这样放
1、优先放在p.private
2、private 已经有值了, 那就放在p.share 中
3、 放东西就没必要放其他p 了,
// Put adds x to the pool.
func (p *Pool) Put(x any) {
if x == nil {
return
}
if race.Enabled {
if fastrandn(4) == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
// 初始化, 禁止p 被抢占, 并获取对应的poollocal 的内容
l, _ := p.pin()
// 尝试放到 p.private 的位置
if l.private == nil {
l.private = x
} else {
// private 放不了, 放p.share 去
l.shared.pushHead(x)
}
// 解锁, p处理完了, 可以自由活动了
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
func (c *poolChain) pushHead(val any) {
d := c.head
if d == nil {
// Initialize the chain.
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
// 当head 节点是空的时候, 创建一个新的节点, 这边主要要用原子操作, poptail 的时候回收缩链表, 清除节点
storePoolChainElt(&c.tail, d)
}
// 然后往head 中push 对象
if d.pushHead(val) {
return
}
// 如果没有成功, 该节点的ring 满了, 继续新增节点, ring 是之前的2倍, 且有最大长度限制
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
func (d *poolDequeue) pushHead(val any) bool {
// 先获取 headtail 然后解析出head 和tail
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 看一下 是否满了, 满了返回false
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
typ := atomic.LoadPointer(&slot.typ)
// popTail 会把value 置为nil, 所以这边需要判断一下, popTail 和 pushHead 有竞争关系 popHead 和pushHead 则没有竞争关系
// popTail 和 pushHead 有竞争关系 pushHead 是对当前p的localcache 操作, 而 popTail 会抢其他p的localcache 操作
// 而 popHead 和PushHead 都是当前p 的处理, 同一个p只有有一个goroutine 在执行, 所以popHead 和PushHead不可能同时发生
if typ != nil {
// 不为空, 对象不能放这里, 返回false
return false
}
// 为空了, 把对象放进去
if val == nil {
val = dequeueNil(nil)
}
*(*any)(unsafe.Pointer(slot)) = val
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
func (p *Pool) pin() (*poolLocal, int) {
// 加锁, 当前p 禁止抢占
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
GC
var (
allPoolsMu Mutex // 全局锁
allPools []*Pool // 存放程序运行期间所有的 Pool 实例地址
oldPools []*Pool // 配合 victim 机制用的
)
// 在 GC 开始的时候,gcStart() 函数中会调用 clearpools() -> poolCleanup() 函数。也就是说,每一轮 GC 都是对所有的 Pool 做一次清理
func poolCleanup() {
// 将旧pool 中的 victim 清空,
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 把现有的local cache置为victim cache
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// 清理掉 allPools 和 oldPools 中所有的 Pool 实例
oldPools, allPools = allPools, nil
}
两轮回收机制 , victim 将删除缓存的动作由一次操作变成了两次操作, 每次清理的是上上次的缓存内容。本次的先放在victim cache 中, 这样Get 的时候,其他获取不到, 可以从这边再次获取到。