您现在的位置是:首页 >其他 >go 源码解读 - sync.WaitGroup网站首页其他

go 源码解读 - sync.WaitGroup

Q_X_Q 慶 2024-06-14 17:17:40
简介go 源码解读 - sync.WaitGroup

go version 1.19.7

在 Go 语言中,sync.WaitGroup 是一个并发编程的同步工具,用于等待一组 Goroutine 执行完毕。

当需要等待多个 Goroutine 完成任务后才能执行下一步操作时,我们可以使用 sync.WaitGroup 实现协程间的同步。它提供了 Add()、Done() 和 Wait() 三个方法,分别用于计数、计数减一和等待协程执行完毕。

具体来说:

Add(delta int) 方法可以用于增加 WaitGroup 的计数器。每次调用 Add(),计数器就会增加 delta。如果
delta 为负数,则会减少计数器。如果计数器变为0,则表示所有协程执行完毕。

Done() 方法用于减少 WaitGroup 的计数器,相当于 Add(-1),表示一个协程已经执行完毕。

Wait() 方法用于等待所有协程执行完毕。它会阻塞调用它的协程,直到所有协程执行完毕并调用 Done() 方法,或者等待超时。

WaitGroup 的使用示例如下:

var wg sync.WaitGroup

func main() {
    wg.Add(2) // 增加计数器
    go func() {
        // 协程1 执行任务
        wg.Done() // 完成任务,减少计数器
    }()
    go func() {
        // 协程2 执行任务
        wg.Done() // 完成任务,减少计数器
    }()
    wg.Wait() // 等待所有协程执行完毕
}

TIP

信号量
一个信号量(semaphore)S是一个整型变量,它除了初始化外只能通过两个标准的原子操作:wait() 和 signal() 来访问
当信号量>0 时,表示资源可用,则 wait 会对信号量执行减 1 操作。
当信号量<=0 时,表示资源暂时不可用,获取信号量时,当前的进程/线程会阻塞,直到信号量为正时被唤醒。

定义wait()

wait(S) {
    while (S <= 0)
        ; // 忙等待
    S--;
}

定义signal()

signal(S) {
    S++;
}

结构

type WaitGroup struct {
	// 告诉编辑器 waitgroup 对象不可复制,  (sync.pool 也有这个) 
	noCopy noCopy
	// 高32位是计数器(counters)  低32位是等待者数量(waiters)
	// 这边计数器 其实是Add(int) 的数量的总和, 例如 Add(1) 后 再Add(1)  计数器的就是1 +1 =2
	//  等待数量就是现在有多少goroutine 在执行wait() 等待被释放
	state1 uint64 
	//  这个是信号量
	// runtime_Semrelease 表示将信号量递增(对应信号量中的 signal 操作)
	// runtime_Semacquire 表示将信号量递减(对应信号量中的 wait 操作)
	state2 uint32
}

这边 state1 和state2 都是64位, 如果是32位的 需要重新处理获取这俩个值, 如下


func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	// &wg.state1获取wg.state1变量的地址;
    // unsafe.Pointer(&wg.state1)将变量的地址转换为指针类型unsafe.Pointer;
    // uintptr(unsafe.Pointer(&wg.state1))将指针类型unsafe.Pointer转换为uintptr类型,即将指针类型转换为无符号整数类型,以便进行位运算;
    //%8对uintptr类型的地址值进行取模运算,判断地址是否是8的倍数。  
    // 即判断wg.state1 变量的地址 是否对齐了8字节边界, 对齐了说明是64位的编译器
	if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		// 64位系统 原样返回即可
		return &wg.state1, &wg.state2
	} else {
		// 32位系统需要重新获取 counter、waiter、sema
		// &wg.state1获取wg.state1变量的地址;
        // unsafe.Pointer(&wg.state1)将变量的地址转换为指针类型unsafe.Pointer;
        // (*[3]uint32)将指针类型unsafe.Pointer转换为指向一个包含3个uint32类型的数组指针类型*[3]uint32。这里使用(*[3]uint32)是因为我们需要处理的是一个长度为3的数组,而不是单独的uint32值;
         // 即 &wg.state1的地址被转换为类型为*[3]uint32的指针。
		state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
		// 然后 截一下, 数组里第一个是信号量,  
		// 第二第三个 是counter 和 waiter ,把俩个 uint32 转成一个Uint64
		return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
	}
}

在这里插入图片描述

TIP 内存对齐, 不深入

64位对齐是指将数据结构中的成员按照64位对齐方式进行排列的过程。在64位体系结构中,CPU访问内存的最小单位为8字节(64位),也就是说,如果访问的数据不是8字节的整数倍,那么CPU需要进行多次访问才能读取或写入完整的数据,这会导致额外的CPU开销和内存访问延迟。因此,在设计数据结构时,为了避免CPU访问内存的额外开销,需要对数据结构进行64位对齐。
具体来说,64位对齐的原则是,结构体中的每个成员的偏移量必须是8的倍数。如果成员的大小不是8的倍数,那么需要在成员之间填充空白,使得下一个成员的偏移量是8的倍数。

Add

在这里插入图片描述

func (wg *WaitGroup) Add(delta int) {
	// statep 高32位是计数器(counters)  低32位是等待者数量(waiters)
	// semap 是信号量
	// 获取 counters/waiters , 信号量
	statep, semap := wg.state()
	// 判断当前程序是否开启了竞态条件检查, 默认是false  , 不深入, 后面相关代码忽略不显示
	if race.Enabled {
		_ = *statep 
		if delta < 0 {
			race.ReleaseMerge(unsafe.Pointer(wg))
		}
		race.Disable()
		defer race.Enable()
	}
	// 将int 的delta 变成 Uint64 再左移32位, 与statep 进行累加
	// 相当于把 delta  与 state 的高32位计数器 值进行相加
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	// 高32位, 计数器的值
	v := int32(state >> 32)
	// 低32位, 等待者的值
	w := uint32(state)
	// 计数器小于0 , panic
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	//  当 Wait 和 Add 并发执行时,会有概率触发下面的 panic
	// 还有一种, wait  在add 在之前操作的时候, 也会触发panic
	// 需要注意, 不要在被调用的goroutine内部调用Add , 应该在外面调用
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// v > 0: 说明还有待完成的任务数,此时不应该唤醒等待协程
    // w = 0: 说明没有协程在等待
    // 此时可以直接退出
	if v > 0 || w == 0 {
		return
	}
	// 存在等待的协程时,goroutine 已将计数器设置为0。
   // 现在不可能同时出现状态突变:
    // - Add 不能与 Wait 同时发生,
   // - 如果看到计数器==0,则 Wait 不会增加等待的协程。
   // 仍然要做一个廉价的健康检查,以检测 WaitGroup 的误用。
 
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 此时v = 0 所有任务都完成了

	// 将statep 置成0, 将 waiters 设成0 
	*statep = 0
	// 唤醒等待者
	// 可能有一堆等待者,    (一堆协程等一个协程干完活, 一堆协程等一堆协程干完活)
	for ; w != 0; w-- {
	// signal,调用 Wait 的地方会解除阻塞
	// semrelease 原子增加 semap 的值, 并通知阻塞在semacquire中正在等待的goroutine 
	// 第二、第三个参数 不深入
		runtime_Semrelease(semap, false, 0)
	}
}

Done

// 其实只是对 Add 的调用,但是它的效果是,将计数器的值减去 1 , 一个等待的协程 执行完成了

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

Wait

在这里插入图片描述

// 注意点
// 1、 wait 只能在主流程中才能阻塞
// 2、 wait 过程中 不能add  会panic
func (wg *WaitGroup) Wait() {
	// 获取计数器和信号量
	statep, semap := wg.state()
	// 这边 我思考了一下, Q1 : 为啥不把 获取计数器和信号量放进循环里面
	// 看注意点2, wait 过程中 不允许add , 这样每次都一样(注意这边的一样是指地址一样, 为什么强调地址, 看下面 Q2)不用重复获取了
	for {
		// 又卡了, Q2 :既然都一样了, 为啥 counter,waiter 需要重新解析啊
		// add 操作会控制 信号量变化
		// wait 操作会控制state 变化
		// atomic.LoadUint64(statep) 原子加载, 保证statep已经写完了
		// 那 statep 哪边写呢, 在后面
		//  atomic.CompareAndSwapUint64(statep, state, state+1) , 对state 进行自增加操作
		// 看下 参数 func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) 
		// 在这个地址上(statep) 把旧值(state)改成新值(state+1)
		// 地址statep 没变, 存的值变了, 那这边肯定要重新解析, cool 
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)
		w := uint32(state)
		// 计数器是0 了, 不需要等待, 返回
		if v == 0 {
			return
		}
		// 这边 上面Q2 的时候一起分析了
		// state + 1 , 其实是对waiters 加1 (加了低32位上), 等待者加1
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			 // 这会阻塞,直到 sema (信号量)大于 0
			runtime_Semacquire(semap)
			// 防止并发
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			// 解除阻塞状态
			return
		}
	}
}

可继续深入学习的点
1、信号量
2、内存对齐
3、原子操作

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。