您现在的位置是:首页 >技术杂谈 >Go中同/异步与锁的应用~~sync包网站首页技术杂谈
Go中同/异步与锁的应用~~sync包
Go中锁的实现~~sync包
go中sync包中提供了互斥锁;
在前面Go中channel文章中我们使用了time.Sleep()函数使得main函数的Goroutine阻塞至所有协程Goroutine结束,但这并不是一个很好的办法,因为我们实际应用中并不能准确知道协程什么时候结束(这里面要考虑服务器的性能,网络波动以及io等一系列因素;
sysn包中提供了WaitGroup来实现协程之间的协调;
同步等待组
同步的sync与异步的sync;
在go中提供了同步等待组WaitGroup
来看源码:
//等待一组Goroutine完成; 阻塞的直至所有的goroutine完成;
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
//
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}
WaitGroup 实现了一些函数:
Add() 方法来设置应等待Goroutine的数量;
在结构体WaitGroup 中有这么一个属性:
state atomic.Uint64
//高32用于计数,低32为用于统计等待的数量
//如果
func (wg *WaitGroup) Add(delta int) {
if race.Enabled {
if delta < 0 { //判断增加的值
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(&wg.sema))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0. //当等待的协程大于0,设置计数器>0
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}
Add()方法中在结构以WaitGroup{}内部计数器上加上delta,delta可以是负数;如果计数器变为0,那么等待的所有groutine都会被释放;
如果计数器小于0,则会出发panic;
注意: Add()方法参数为正数时的调用应该在Wait()之前,否则如果等待的所有groutine都会被释放(或者没有被全部释放),那么可能只会等待很少的goroutine完成;
通常我们应该在创建新的Goroutine或者其他应该等待的事件之前调用;
结束时应该调用**Done()**方法
Done()用于减少WaitGroup计数器的值,应该在Goroutine的最后执行;
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait()方法阻塞Goroutine 直到WaitGroup计数减为0。
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
if race.Enabled {
race.Disable()
}
for {
state := wg.state.Load()
v := int32(state >> 32) //低32位用于统计等待数
w := uint32(state)
if v == 0 { //如果等待数位0,那么释放所有的Goroutine
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) { //CAS操作
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(&wg.sema))
}
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
一个代码Demo
func main() {
var sy sync.WaitGroup
fmt.Printf("%T
", sy)
fmt.Println(sy)
//增加10个
sy.Add(5)
rand.Seed(time.Now().UnixNano())
go WaitGroupTest(1, &sy)
go WaitGroupTest(2, &sy)
go WaitGroupTest(3, &sy)
go WaitGroupTest(4, &sy)
go WaitGroupTest(5, &sy)
sy.Wait()
defer fmt.Println("main exit")
}
func WaitGroupTest(num int, sy *sync.WaitGroup) {
for i := 0; i < 5; i++ {
fmt.Printf("第%d号子goroutine,%d
", num, i)
time.Sleep(time.Second)
}
sy.Done()
}
当我们往WaitGroup中添加协程时要在定义协程之前
运行结果:
小结:
1,声明一个WaitGroup,
2,调用Add添加期望的计数
3,构建协程
4,等待所有协程运行完成后主协程才退出;
当我们注销掉sy.Done(),再次运行 会出现下面的结果–死锁了
当我们调整一下for循环中数量:
//只读 <-chan 只写的 chan <-
func ChanWaitGroup(ch chan int, sy *sync.WaitGroup) {
for i := 0; i < 10; i++ {
//通道放入数据
ch <- i
}
defer close(ch) //要关闭通道
defer sy.Done() //执行完毕要标记一下执行完毕计数-1
}
func main() {
var wt sync.WaitGroup
var ch chan int
ch = make(chan int)
wt.Add(3)
//这里可以放入一些需要执行的函数,这些函数之间有关联,需要都执行完毕后在执行别的代码
go ChanWaitGroup(ch, &wt)
for i2 := range ch {
fmt.Println(i2)
}
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) //纯粹是为了复习
go WaitGroupTest(1, &wt) //
go WaitGroupTest(2, &wt) //
wt.Wait()
fmt.Println("main exit")
}
所有子Goroutine运行结束以后主Goroutine才退出。
互斥锁
在Go中互斥锁也是一个结构体:
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
//
// In the terminology of the Go memory model,
// the n'th call to Unlock “synchronizes before” the m'th call to Lock
// for any n < m.
// A successful call to TryLock is equivalent to a call to Lock.
// A failed call to TryLock does not establish any “synchronizes before”
// relation at all.
type Mutex struct {
state int32
sema uint32
}
Mutex是一个互斥锁,可以创建为其他结构体的字段;零值为解锁状态。Mutex类型的锁和Goroutine无关,可以由不同的Goroutine加锁和解锁。
再看一下 加锁与解锁的源码:
// Lock locks m. 锁住 m
// If the lock is already in use, the calling goroutine blocks until the mutex is available. //如果该对象被锁住了,那么就阻塞,直到m解锁
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
// Unlock unlocks m. //解锁m
// It is a run-time error if m is not locked on entry to Unlock. 如果m没有被锁住就会报error
// A locked Mutex is not associated with a particular goroutine. //锁与协程无关
// It is allowed for one goroutine to lock a Mutex and then arrange for another goroutine to unlock it. //允许一个协程加锁,另一个协程解锁
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
//尝试加锁
// TryLock tries to lock m and reports whether it succeeded.
//
// Note that while correct uses of TryLock do exist, they are rare,
// and use of TryLock is often a sign of a deeper problem in a particular use of mutexes. //很少用tryLock,而TryLock的使用通常表明在特定的互斥锁使用中存在更深层次的问题。
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
// There may be a goroutine waiting for the mutex, but we are
// running now and can try to grab the mutex before that
// goroutine wakes up.
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return true
}
同时我们根据源码可以直到,如果要对一个对象的方法加锁/解锁,可以在结构体中声明一个匿名对象,比如下面这样:
type ATR struct {
sync.Mutex
}
func (a ATR) LockT() {
}
func main() {
atr := ATR{}
atr.Mutex.Lock()
atr.LockT()
}
实际问题–售票,用go代码实现:
var tickets = 10
var wg sync.WaitGroup
var sm sync.Mutex
func saleTickets(winname string, swg *sync.WaitGroup) {
for {
//上锁
sm.Lock()
if tickets > 0 { //如果有票
tickets-- //卖票
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
} else {
fmt.Println(winname, "窗口---票售完")
sm.Unlock()
break
}
sm.Unlock()
}
defer swg.Done()
}
func main() {
wg.Add(10)
for i := 0; i < 10; i++ {
go saleTickets("火车站", &wg)
}
wg.Wait()
}
实际就是加锁解锁方法的应用;
读写锁
在java中有关于读写锁的一些类和方法,在go中也有读写锁的一些API;
首先我们来看一下
未完待续