您现在的位置是:首页 >技术教程 >go源码解读-sync.pool网站首页技术教程

go源码解读-sync.pool

Q_X_Q 慶 2023-06-15 20:00:02
简介go源码解读-sync.pool

go version 1.19.7
sync.pool 是go 内置的对象池技术, 管理临时对象,这些对象可以单独保存和检索, 减少GC次数
特点:1、 池不可以指定大小
2、 Get 没有的话会新生成一个对象
3、对象的周期取决于GC的周期
从go doc可以看到sync.pool 主要暴露Get 和 Put 两个方法, 以及一个New。
使用:用New初始化pool一个实例,获取的调用Get , 释放资源的时候调用Put

C:UsersJosslynn>go doc sync.pool
package sync // import "sync"

type Pool struct {

        // New optionally specifies a function to generate
        // a value when Get would otherwise return nil.
        // It may not be changed concurrently with calls to Get.
        New func() any
        // Has unexported fields.
}
 ...... (描述忽略了)
func (p *Pool) Get() any
func (p *Pool) Put(x any)

使用实例:gin 中使用

// 初始化的时候, 调用New ,来初始化pool 实例, 这边的engine.allocateContext 是新建返回一个*Context 对象,
	engine.pool.New = func() any {
		return engine.allocateContext()
	}
// put get 的时候
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 	// 从pool 中拿出一个context
	c := engine.pool.Get().(*Context)
	c.writermem.reset(w)
	//把http 的req 放到这个context 中 
	c.Request = req
	// 但是为了防止这个context 是没有被GC 过的, 所以需要reset(除request的内容, 其他内容清空
	c.reset()
	// context 处理过了, 可以交给下一步处理了()
	engine.handleHTTPRequest(c)
	// 当handle 处理完了, context 这个临时对象 就需要释放,
	// 如果不调用put, GC 也会去释放这个对象,但是Get 的时候就一直是一个新的对象, 就没有使用pool的意义的了
	engine.pool.Put(c)
}

结构

type Pool struct {
	noCopy noCopy

	// 本地的 Processor(p) 的指针,实际指向 []poolLocal, 
	local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
	// []poolLocal 的大小
	localSize uintptr        // size of the local array

	// GC 时,victim 和 victimSize 会分别接管 local 和 localSize;
   	// victim 的目的是为了减少 GC 后冷启动导致的性能抖动,让分配对象更平滑;
	victim     unsafe.Pointer // local from previous cycle
	victimSize uintptr        // size of victims array

	// 暴露外部使用
	New func() any
}
type poolLocal struct {
	poolLocalInternal
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
	private any       // Can be used only by the respective P.
	shared  poolChain // Local P can pushHead/popHead; any P can popTail.
}
type poolChain struct {

	head *poolChainElt
	tail *poolChainElt
}
type poolChainElt struct {
	poolDequeue
    next, prev *poolChainElt

在这里插入图片描述

GET 流程

上图绿色标签 就是要获取的对象, 由上图的结构所示, 我们可以从这几个地方获取到元素
1、 new ,( 这个应该放在最后, pool 没有获取到对象的时候, 用new)
2、从当前p 中的private 中获取
3、从当前的 p 的share 中获取
4、从其他p 的share 中获取, (为啥不能从其他p 的private 中获取, private 私有的, 只能自己调用)
5、在gc 的时候, victim 接管了 local , 所以还可以从victim 中的cache中获取
6、从其他p 的cache 获取
在这里插入图片描述
有了流程图, 可以初步知道整个get的流程, 然后看代码

func (p *Pool) Get() any {
	// 这边不深入了, 只要注意, poo 不能复制, 不然会有问题
	if race.Enabled {
		race.Disable()
	}
	// p.pin 会返回一个poolLocal和pid , ( 是GMP模型中 每个goroutine 绑定一个 p 才能执行), 这边的p.pin 就是返回当前goroutine 绑定的p, 
	// pin() 中对当前p 加了锁
	l, pid := p.pin()
	// 从private 中获取元素, 是不需要加锁的, 
	// 把p中的private 拿到之后清空该字段,我们要GET一个对象, 这个对象现在在goroutine中并且是私有的
	x := l.private
	l.private = nil
	// 如果x 为空, 就是可能其他goroutine 已经把这个值取走了, 同一个对象x, p队里下的 g1 、g2 都来获取, g1获取到x, 并把这个值置为空, 那g2 来取的时候,就没有了
	if x == nil {
		// private 中没有对象,查看本地p.share 中是否有
		// 当前p 调自己的shared 用popHead 从头部获取
		x, _ = l.shared.popHead()
		if x == nil {
		// 本地p.share  尝试从 其他p 的share, 或者尝试从 victim cache 里取元素
			x = p.getSlow(pid)
		}
	}
	// G-M 锁定解除;上锁的逻辑在 pin() 中
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
	//pool 池中没有 , 找不到复用的对象了, 就NEW 一个返回去
	if x == nil && p.New != nil {
		x = p.New()
	}
	//返回对象
	return x
} 

pprivate 中没有, p.share 中也没有, 去其他p 以及victim 中看
补充:

所谓受害者缓存(Victim Cache),是一个与直接匹配或低相联缓存并用的、容量很小的全相联缓存。当一个数据块被逐出缓存时,并不直接丢弃,而是暂先进入受害者缓存。如果受害者缓存已满,就替换掉其中一项。当进行缓存标签匹配时,在与索引指向标签匹配的同时,并行查看受害者缓存,如果在受害者缓存发现匹配,就将其此数据块与缓存中的不匹配数据块做交换,同时返回给处理器。 – 维基百科

pool 清空机制 看最后的poolCleanup

func (p *Pool) getSlow(pid int) any {
	// 首先重新获取pool的localSize大小并且是原子操作
	size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	locals := p.local                            // load-consume
	// 遍历size 的大小, 找到别的p
	for i := 0; i < int(size); i++ {
		// 尝试从其他p 关联的 的poollocal 中取一个
		// popTail  是双向链表的尾部开始偷一个p
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// 从其他p 中, 没有获取到p , 尝试从victim cache 中获取
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	// gc 中的p.victim private 有这个值, 就返回
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	// 从其他p 的victim cache 中获取 
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}
	atomic.StoreUintptr(&p.victimSize, 0)
	return nil
}
//首次 Pool 需要把自己注册进 allPools 数组;
// Pool.local 数组按照 runtime.GOMAXPROCS(0) 的大小进行分配,如果是默认的,那么这个就是 P 的个数,也就是 CPU 的个数;
func (p *Pool) pinSlow() (*poolLocal, int) {
	// G-M 先解锁
	runtime_procUnpin()
	// 以下逻辑在全局锁 allPoolsMu 内 , 所有的对象池都在这个里面
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	// 重新获取, 因为刚刚解锁了, p 很有可能被抢占了 , 获取P 的 id
	pid := runtime_procPin()
	// poolCleanup won't be called while we are pinned.
	s := p.localSize
	l := p.local
	// 判断一下, 
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	// 如果指针是空, 说名第一次get , 
	if p.local == nil {
	// 首次,Pool 需要把自己注册进 allPools 数组
		allPools = append(allPools, p)
	}
	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
	size := runtime.GOMAXPROCS(0)
	// local 数组的大小就等于 runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	// 把local数组 的首地址, 放入 p 的local 中
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
	// 返回pid 的poolLocal 
	return &local[pid], pid
}


func (c *poolChain) popTail() (any, bool) {
	d := loadPoolChainElt(&c.tail)
	if d == nil {
		return nil, false
	}

	for {
	    // 从双端队列尾部出队
		if val, ok := d.popTail(); ok {
			return val, ok
		}

		if d2 == nil {
			return nil, false
		}

		// 从链表中删除当前节点
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}
func (d *poolDequeue) popTail() (any, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
			// Queue is empty.
			return nil, false
		}
		ptrs2 := d.pack(head, tail+1)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			// Success.
			slot = &d.vals[tail&uint32(len(d.vals)-1)]
			break
		}
	}

	// We now own slot.
	val := *(*any)(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}
	slot.val = nil
	atomic.StorePointer(&slot.typ, nil)

	return val, true
}


PUT 流程

根据结构图中绿色的部分, 现在需要把对象放入池中, 可以这样放
1、优先放在p.private
2、private 已经有值了, 那就放在p.share 中
3、 放东西就没必要放其他p 了,

// Put adds x to the pool.
func (p *Pool) Put(x any) {
	if x == nil {
		return
	}
	if race.Enabled {
		if fastrandn(4) == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
	// 初始化, 禁止p 被抢占, 并获取对应的poollocal 的内容
	l, _ := p.pin()
	// 尝试放到 p.private 的位置
	if l.private == nil {
		l.private = x
	} else {
		// private 放不了, 放p.share 去
		l.shared.pushHead(x)
	}
	// 解锁, p处理完了, 可以自由活动了
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
	}
}

func (c *poolChain) pushHead(val any) {
	d := c.head
	if d == nil {
		// Initialize the chain.
		const initSize = 8 // Must be a power of 2
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		// 当head 节点是空的时候, 创建一个新的节点, 这边主要要用原子操作, poptail 的时候回收缩链表, 清除节点
		storePoolChainElt(&c.tail, d)
	}
	// 然后往head 中push 对象
	if d.pushHead(val) {
		return
	}

	// 如果没有成功, 该节点的ring 满了, 继续新增节点, ring 是之前的2倍, 且有最大长度限制
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		// Can't make it any bigger.
		newSize = dequeueLimit
	}

	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	d2.pushHead(val)
}
func (d *poolDequeue) pushHead(val any) bool {
	// 先获取 headtail 然后解析出head 和tail
	ptrs := atomic.LoadUint64(&d.headTail)
	head, tail := d.unpack(ptrs)
	// 看一下 是否满了, 满了返回false
	if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
		// Queue is full.
		return false
	}
	
	slot := &d.vals[head&uint32(len(d.vals)-1)]
	typ := atomic.LoadPointer(&slot.typ)
	// popTail 会把value 置为nil, 所以这边需要判断一下, popTail 和 pushHead 有竞争关系 popHead 和pushHead 则没有竞争关系
	// popTail 和 pushHead 有竞争关系 pushHead 是对当前p的localcache 操作, 而 popTail 会抢其他p的localcache 操作
	// 而 popHead 和PushHead 都是当前p 的处理, 同一个p只有有一个goroutine 在执行, 所以popHead 和PushHead不可能同时发生
	if typ != nil {
		// 不为空, 对象不能放这里, 返回false
		return false
	}

	// 为空了, 把对象放进去
	if val == nil {
		val = dequeueNil(nil)
	}
	*(*any)(unsafe.Pointer(slot)) = val
	atomic.AddUint64(&d.headTail, 1<<dequeueBits)
	return true
}


func (p *Pool) pin() (*poolLocal, int) {
	// 加锁, 当前p 禁止抢占
	pid := runtime_procPin()
	// In pinSlow we store to local and then to localSize, here we load in opposite order.
	// Since we've disabled preemption, GC cannot happen in between.
	// Thus here we must observe local at least as large localSize.
	// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	l := p.local                              // load-consume
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	return p.pinSlow()
}

GC

var (
   allPoolsMu Mutex  // 全局锁
   allPools []*Pool  // 存放程序运行期间所有的 Pool 实例地址
   oldPools []*Pool  // 配合 victim 机制用的
)

// 在 GC 开始的时候,gcStart() 函数中会调用 clearpools() -> poolCleanup() 函数。也就是说,每一轮 GC 都是对所有的 Pool 做一次清理
func poolCleanup() {
	// 将旧pool 中的 victim 清空, 
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	// 把现有的local cache置为victim cache
	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}
	// 清理掉 allPools 和 oldPools 中所有的 Pool 实例
	oldPools, allPools = allPools, nil
}

两轮回收机制 , victim 将删除缓存的动作由一次操作变成了两次操作, 每次清理的是上上次的缓存内容。本次的先放在victim cache 中, 这样Get 的时候,其他获取不到, 可以从这边再次获取到。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。