您现在的位置是:首页 >技术杂谈 >Go中同/异步与锁的应用~~sync包网站首页技术杂谈

Go中同/异步与锁的应用~~sync包

CodeMartain 2024-08-24 12:01:02
简介Go中同/异步与锁的应用~~sync包

Go中锁的实现~~sync包

go中sync包中提供了互斥锁;

在前面Go中channel文章中我们使用了time.Sleep()函数使得main函数的Goroutine阻塞至所有协程Goroutine结束,但这并不是一个很好的办法,因为我们实际应用中并不能准确知道协程什么时候结束(这里面要考虑服务器的性能,网络波动以及io等一系列因素;

sysn包中提供了WaitGroup来实现协程之间的协调;

同步等待组

同步的sync与异步的sync;

在go中提供了同步等待组WaitGroup
来看源码:

//等待一组Goroutine完成; 阻塞的直至所有的goroutine完成;
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
//
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

WaitGroup 实现了一些函数:

Add() 方法来设置应等待Goroutine的数量;

在结构体WaitGroup 中有这么一个属性:
state atomic.Uint64 //高32用于计数,低32为用于统计等待的数量

//如果
func (wg *WaitGroup) Add(delta int) {
	if race.Enabled {
		if delta < 0 { //判断增加的值
			// Synchronize decrements with Wait.
			race.ReleaseMerge(unsafe.Pointer(wg))
		}
		race.Disable()
		defer race.Enable()
	}
	state := wg.state.Add(uint64(delta) << 32)
	v := int32(state >> 32)
	w := uint32(state)
	if race.Enabled && delta > 0 && v == int32(delta) {
		// The first increment must be synchronized with Wait.
		// Need to model this as a read, because there can be
		// several concurrent wg.counter transitions from 0.
		race.Read(unsafe.Pointer(&wg.sema))
	}
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	if v > 0 || w == 0 {
		return
	}
	// This goroutine has set counter to 0 when waiters > 0.  //当等待的协程大于0,设置计数器>0
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
	if wg.state.Load() != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// Reset waiters count to 0.
	wg.state.Store(0)
	for ; w != 0; w-- {
		runtime_Semrelease(&wg.sema, false, 0)
	}
}

Add()方法中在结构以WaitGroup{}内部计数器上加上delta,delta可以是负数;如果计数器变为0,那么等待的所有groutine都会被释放;
如果计数器小于0,则会出发panic;

注意: Add()方法参数为正数时的调用应该在Wait()之前,否则如果等待的所有groutine都会被释放(或者没有被全部释放),那么可能只会等待很少的goroutine完成;

通常我们应该在创建新的Goroutine或者其他应该等待的事件之前调用;

结束时应该调用**Done()**方法

Done()用于减少WaitGroup计数器的值,应该在Goroutine的最后执行;

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

Wait()方法阻塞Goroutine 直到WaitGroup计数减为0

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
	if race.Enabled {
		race.Disable()
	}
	for {
		state := wg.state.Load()
		v := int32(state >> 32) //低32位用于统计等待数
		w := uint32(state)
		
		if v == 0 { //如果等待数位0,那么释放所有的Goroutine
			// Counter is 0, no need to wait.
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
		// Increment waiters count.
		if wg.state.CompareAndSwap(state, state+1) {  //CAS操作
			if race.Enabled && w == 0 {
				// Wait must be synchronized with the first Add.
				// Need to model this is as a write to race with the read in Add.
				// As a consequence, can do the write only for the first waiter,
				// otherwise concurrent Waits will race with each other.
				race.Write(unsafe.Pointer(&wg.sema))
			}
			runtime_Semacquire(&wg.sema)
			if wg.state.Load() != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
	}
}

一个代码Demo


func main() {
	var sy sync.WaitGroup
	fmt.Printf("%T
", sy)
	fmt.Println(sy)
	//增加10个
	sy.Add(5)
	rand.Seed(time.Now().UnixNano())
	go WaitGroupTest(1, &sy)
	go WaitGroupTest(2, &sy)
	go WaitGroupTest(3, &sy)
	go WaitGroupTest(4, &sy)
	go WaitGroupTest(5, &sy)
	sy.Wait()
	defer fmt.Println("main exit")
}
func WaitGroupTest(num int, sy *sync.WaitGroup) {
	for i := 0; i < 5; i++ {
		fmt.Printf("第%d号子goroutine,%d 
", num, i)
		time.Sleep(time.Second)
		 
	}
	sy.Done()
}

当我们往WaitGroup中添加协程时要在定义协程之前
运行结果:
在这里插入图片描述

小结:
1,声明一个WaitGroup,
2,调用Add添加期望的计数
3,构建协程
4,等待所有协程运行完成后主协程才退出;

当我们注销掉sy.Done(),再次运行 会出现下面的结果–死锁了
在这里插入图片描述
当我们调整一下for循环中数量:
在这里插入图片描述
在这里插入图片描述

//只读 <-chan  只写的 chan <-

func ChanWaitGroup(ch chan int, sy *sync.WaitGroup) {
	for i := 0; i < 10; i++ {
		//通道放入数据
		ch <- i
	}
	defer close(ch) //要关闭通道
	defer sy.Done() //执行完毕要标记一下执行完毕计数-1
}

func main() {
	var wt sync.WaitGroup
	var ch chan int
	ch = make(chan int)
	wt.Add(3)
	//这里可以放入一些需要执行的函数,这些函数之间有关联,需要都执行完毕后在执行别的代码

	go ChanWaitGroup(ch, &wt)

	for i2 := range ch {
		fmt.Println(i2)
	}
	time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) //纯粹是为了复习
	go WaitGroupTest(1, &wt) //
	go WaitGroupTest(2, &wt) //
	wt.Wait()
	fmt.Println("main exit")
}

所有子Goroutine运行结束以后主Goroutine才退出。

互斥锁

在Go中互斥锁也是一个结构体:

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
//
// In the terminology of the Go memory model,
// the n'th call to Unlock “synchronizes before” the m'th call to Lock
// for any n < m.
// A successful call to TryLock is equivalent to a call to Lock.
// A failed call to TryLock does not establish any “synchronizes before”
// relation at all.
type Mutex struct {
	state int32
	sema  uint32
}

Mutex是一个互斥锁,可以创建为其他结构体的字段;零值为解锁状态。Mutex类型的锁和Goroutine无关,可以由不同的Goroutine加锁和解锁。

再看一下 加锁与解锁的源码:

// Lock locks m.   锁住 m 
// If the lock is already in use, the calling goroutine blocks until the mutex is available.   //如果该对象被锁住了,那么就阻塞,直到m解锁
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}


// Unlock unlocks m. //解锁m 
// It is a run-time error if m is not locked on entry to Unlock.   如果m没有被锁住就会报error
// A locked Mutex is not associated with a particular goroutine.  //锁与协程无关
// It is allowed for one goroutine to lock a Mutex and then  arrange for another goroutine to unlock it. //允许一个协程加锁,另一个协程解锁
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

//尝试加锁
// TryLock tries to lock m and reports whether it succeeded. 
//
// Note that while correct uses of TryLock do exist, they are rare,
// and use of TryLock is often a sign of a deeper problem  in a particular use of mutexes. //很少用tryLock,而TryLock的使用通常表明在特定的互斥锁使用中存在更深层次的问题。
func (m *Mutex) TryLock() bool {
	old := m.state
	if old&(mutexLocked|mutexStarving) != 0 {
		return false
	}

	// There may be a goroutine waiting for the mutex, but we are
	// running now and can try to grab the mutex before that
	// goroutine wakes up.
	if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
		return false
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
	return true
}

同时我们根据源码可以直到,如果要对一个对象的方法加锁/解锁,可以在结构体中声明一个匿名对象,比如下面这样:

type ATR struct {
	sync.Mutex
}

func (a ATR) LockT() {

}

func main() {
	atr := ATR{}
	atr.Mutex.Lock()
	atr.LockT()
}

实际问题–售票,用go代码实现:

var tickets = 10
var wg sync.WaitGroup
var sm sync.Mutex

func saleTickets(winname string, swg *sync.WaitGroup) {
	for {
		//上锁
		sm.Lock()
		if tickets > 0 { //如果有票
			tickets-- //卖票
			time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
		} else {
			fmt.Println(winname, "窗口---票售完")
			sm.Unlock()
			break
		}
		sm.Unlock()
	}

	defer swg.Done()
}

func main() {

	wg.Add(10)
	for i := 0; i < 10; i++ {
		go saleTickets("火车站", &wg)
	}

	wg.Wait()
}

实际就是加锁解锁方法的应用;

读写锁

在java中有关于读写锁的一些类和方法,在go中也有读写锁的一些API;

首先我们来看一下

未完待续

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。